Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Clojure at Attendify

Clojure at Attendify

Experience report (constantly updating)

Avatar for Oleksii Kachaiev

Oleksii Kachaiev

October 26, 2017
Tweet

More Decks by Oleksii Kachaiev

Other Decks in Programming

Transcript

  1. Who We Are • 3k+ mobile apps for events •

    Content management system • Social network for attendees • Leads retrieval mobile application • Marketing automation platform • Event registration system (coming soon)
  2. In Figures • 6.5k events with 492k speakers • 3.2M

    mobile devices • 1.5M+ users shared 1.3M posts with 10.7M likes • Mobile API with 19K req/min at peak • 118 repos and 25+ services (micro!) • 4B+ messages in Kafka (weekly) • 230M+ documents in ElasticSearch
  3. Elaborate? • Application servers • Data pipeline • ETL jobs

    • Internal tools • UI • Scheduling, talking to databases, data manipulation, … you name it
  4. Clojure at Attendify • Main & default language • 4+

    years in production • 12 engineers (and hiring) • CLOC: 97K+ Clojure, 68K+ ClojureScript • It’s still a technology, not magic
  5. Elaborate? • Clojure = ideas + access to Java libraries

    and JVM - immutability, epochal time model, atoms, transformation chains, EDN, uniform domain modeling, declarative approach, runtime polymorphism with custom dispatch etc • It’s easier to use Clojure ideas in Clojure - no, seriously - “haskell in python” << “python” & “haskell in python” << “haskell” - btw, “java in clojure” >> “java” - discipline & convention don’t work, language core does
  6. Agenda • Libraries & ecosystem • Servers & microservices •

    Data • Errors handling • Project setup & dev tools
  7. Libraries & ecosystem • A lot of stuff - Clojars,

    Clojure Toolbox, ClojureWerkz • You still have Java - There are a lot of wrappers, but when it’s necessary… - You still have tooling: GC logs, OOM analyzers, profilers and more • You still work with Maven - lein, wagon, checkouts
  8. Servers & Microservices • JSON-RPC and GraphQL, not REST -

    Might be EDN or MsgPack instead of JSON • aleph: smart wrapper for Netty - With ring & compojure • manifold, not core.async - Futures & chaining, executors, scheduling • Own library to define/use RPC - There are a few open-source libraries, like slacker and castra, tho’
  9. Servers & Microservices (defservice “audience.fetchTimeline" :allowed-auth #{:builder} :validate-params {:profileId s/Str

    (s/optional-key :includeTypes) [s/Str] (s/optional-key :pageSize) s/Int} :response-format Timeline (fn [{:keys [timeline-handler]} {:keys [profileId]}] (timeline/timeline-entries …)))
  10. Servers & Microservices (defservice “audience.fetchTimeline" :allowed-auth #{:builder} :validate-params {:profileId s/Str

    (s/optional-key :includeTypes) [s/Str] (s/optional-key :pageSize) s/Int} :response-format Timeline (fn [{:keys [timeline-handler]} {:keys [profileId]}] (timeline/timeline-entries …))) macro method name component(s) call params schema enforced
  11. Servers & Microservices • defmulti rpc/execute • defservice compiles down

    to defmethod • ring route to call rpc/execute • ring handlers to take care about - Logging - Errors handling - Serialization format (JSON by default)
  12. Servers & Microservices (defservice “audience.fetchTimeline" :allowed-auth #{:builder} :validate-params {:profileId s/Str

    (s/optional-key :includeTypes) [s/Str] (s/optional-key :pageSize) s/Int} :response-format Timeline :dispatch-on rpc/execute (fn [{:keys [timeline-handler]} {:keys [profileId]}] (timeline/timeline-entries …))) :dispatch-on rpc/execute “magic”
  13. Data • korma & hikari to work with PostgreSQL -

    Not really happy • Java API for Kafka producers/consumers - Own wrapper, but there are a few open-sourced, like kinsky & clj-kafka • carmine to work with Redis - Used to use built-in tasks/jobs queue, not now • Event sourcing with our own library - There are a few open-source libraries, like rill and cqrs-server, tho’
  14. Data: Korma (In Theory) (defentity applications (table “application”) (pk :id)

    (transform #(update % :name capitalize))) (defn select-app [{:keys [conn]} app-id] (-> (select applications (db conn) (fields :name :icon) (where {:id app-id})) first))
  15. Data: Korma (In Practice) • Connections pooling • Sophisticated queries

    • Serialization handling (array, hstore, jsonb) • Transactions, retries, rollbacks • java.sql.SQLException • Tables partitioning
  16. Data: Korma (In Practice) (defn fetch-profile-with-sources* [db apikey id] (->>

    (models/execute-query db [(format "SELECT ps.*, ss.integration_id as source_integration_id, ss.remote_id as source_remote_id, ss.payload as source_payload, ss.is_deactivated as source_is_deactivated FROM %s ps LEFT OUTER JOIN %s ss on (ps.id = ss.profile_id) WHERE ps.apikey = ? AND ps.id = ?" (tables/route tables/profile-tbl apikey) (tables/route tables/profile-source-tbl apikey)) [apikey id]] {:raw-results? true}) (functor/fmap (fn [profiles] (->> profiles utils/aggregate-profiles (map transform-profile) first (#(dissoc % :password)))))))
  17. Data: Korma (In Practice) (defn fetch-profile-with-sources* [db apikey id] (->>

    (models/execute-query db [(format "SELECT ps.*, ss.integration_id as source_integration_id, ss.remote_id as source_remote_id, ss.payload as source_payload, ss.is_deactivated as source_is_deactivated FROM %s ps LEFT OUTER JOIN %s ss on (ps.id = ss.profile_id) WHERE ps.apikey = ? AND ps.id = ?" (tables/route tables/profile-tbl apikey) (tables/route tables/profile-source-tbl apikey)) [apikey id]] {:raw-results? true}) (functor/fmap (fn [profiles] (->> profiles utils/aggregate-profiles (map transform-profile) first (#(dissoc % :password))))))) query, not DSL component not entities either manual transform partitioning
  18. Data: Kafka Consumer • Internal library • Manage threads, subscribe/unsubscribe

    • Handle serialization • Exceptions, errors • Commit offsets
  19. Data: Kafka Consumer (component/using (kafka/new-whisper {:name "pulse-audience-latest" :topic “attendify.s.a.v1" :on-message

    (partial pulse-audience/notify! pulse-audience-expiration-threshold pulse-audience-cache-ttl-ms) :threads 32 :kafka-config (assoc kafka-config :offset-reset "latest")}) [:redis-connector :db])
  20. Data: Kafka Consumer (component/using (kafka/new-whisper {:name "pulse-audience-latest" :topic “attendify.s.a.v1" :on-message

    (partial pulse-audience/notify! pulse-audience-expiration-threshold pulse-audience-cache-ttl-ms) :threads 32 :kafka-config (assoc kafka-config :offset-reset "latest")}) [:redis-connector :db]) internal library sub/unsub consumers called on each message passed as a first arg
  21. Errors Handling • Most of your production code - No,

    seriously • No good story here - Clojure gives us exceptions and nil, only • Own library for either - cats seems to be the standard here, but no one cares about monads, right? • Typing your data, not your code - schema instead of spec
  22. Errors Handling (In Practice) (->> (pg-profile/fetch-profile-by-email* db' apikey email) (either/bind

    #(check-profile-nil kp apikey token email %)) (either/bind #(check-profile-deleted kp apikey token email %)) (either/bind #(check-profile-claimed-or-registered kp apikey token email %)) (either/bind #(check-valid-password kp apikey token email % password)) (either/bind #(deactivate-old-sessions db' apikey % token)) (either/bind (fn [profile] (let [session-id (or session-id (generate-session-id)) now (Timestamp/from (Instant/now))] (—>> (insert-session db' …) (either/fmap (fn [session] {…})))))))
  23. Errors Handling (In Practice) (->> (pg-profile/fetch-profile-by-email* db' apikey email) (either/bind

    #(check-profile-nil kp apikey token email %)) (either/bind #(check-profile-deleted kp apikey token email %)) (either/bind #(check-profile-claimed-or-registered kp apikey token email %)) (either/bind #(check-valid-password kp apikey token email % password)) (either/bind #(deactivate-old-sessions db' apikey % token)) (either/bind (fn [profile] (let [session-id (or session-id (generate-session-id)) now (Timestamp/from (Instant/now))] (—>> (insert-session db' …) (either/fmap (fn [session] {…}))))))) errors handling
  24. Errors Handling (In Practice) (lete [profiles (fetch-access-keys-by-events db apikey restricted-event-ids)

    labels (labels/fetch-labels {:db db} apikey {:type "access-key"}) events-access-keys (->> labels (filter #(some (set (:events %)) restricted-event-ids)) (map :id)) profile-ids-blacklist (->> profiles (remove #(some events-access-keys (vec (.getArray (:access_keys %))))) (map :id))] (perform-fetching-by-events db apikey event-ids profile-ids-blacklist))
  25. Errors Handling (In Practice) (lete [profiles (fetch-access-keys-by-events db apikey restricted-event-ids)

    labels (labels/fetch-labels {:db db} apikey {:type "access-key"}) events-access-keys (->> labels (filter #(some (set (:events %)) restricted-event-ids)) (map :id)) profile-ids-blacklist (->> profiles (remove #(some events-access-keys (vec (.getArray (:access_keys %))))) (map :id))] (perform-fetching-by-events db apikey event-ids profile-ids-blacklist)) macro deferred[either[a’]] deferred[either[a’]]
  26. Project Setup • Configuration is always tricky - env library,

    .lein-env-dev files • components - Manage (stateful) world around you - Show code dependencies • dev/user.clj - With (go) and (reset) • nREPL - Try ultra (plugin for lein)
  27. Project Setup: component (defrecord CSVExporter [] component/Lifecycle (start [component] (if

    (:executor component) component (assoc component :executor (e/fixed-thread-executor threads-count)))) (stop [component] (when-let [executor (:executor component)] (.shutdownNow executor)) (assoc component :executor nil)))
  28. Project Setup: component (defrecord CSVExporter [] component/Lifecycle (start [component] (if

    (:executor component) component (assoc component :executor (e/fixed-thread-executor threads-count)))) (stop [component] (when-let [executor (:executor component)] (.shutdownNow executor)) (assoc component :executor nil))) prepare cleanup
  29. Project Setup: component (component/system-map … :csv-exporter (component/using (export/new-csv-exporter {:immediate-export-limit csv-immediate-export-limit

    :immediate-export-timeout csv-immediate-export-timeout :threads-count csv-export-threads-count}) [:s3-client :email-notifier :profiles-search-handler :elastic-client :settings-event-store]) … ) (e/export csv-exporter account-id other-params)
  30. Project Setup: component (component/system-map … :csv-exporter (component/using (export/new-csv-exporter {:immediate-export-limit csv-immediate-export-limit

    :immediate-export-timeout csv-immediate-export-timeout :threads-count csv-export-threads-count}) [:s3-client :email-notifier :profiles-search-handler :elastic-client :settings-event-store]) … ) (e/export csv-exporter account-id other-params) component config deps convention
  31. Project Setup: REPL $ lein repl nREPL server started on

    port 63248 on host 127.0.0.1 - nrepl://127.0.0.1:63248 user=> (go) :started user=> (keys @system) (:mailchimp-client :app-component :kafka-producer …) user=> (:s3-client @system) #augustine.s3.S3Client{:url …} user=> (reset) :reloading (…) :resumed user=>
  32. Project Setup: REPL $ lein repl nREPL server started on

    port 63248 on host 127.0.0.1 - nrepl://127.0.0.1:63248 user=> (go) :started user=> (keys @system) (:mailchimp-client :app-component :kafka-producer …) user=> (:s3-client @system) #augustine.s3.S3Client{:url …} user=> (reset) :reloading (…) :resumed user=> try this once
  33. Q&A