Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Hacking Sidekiq for Fun (and Profit) - RubyConf...

Hacking Sidekiq for Fun (and Profit) - RubyConf Australia 2014

Get an introduction to the history of background processing in Ruby, What Sidekiq is and how you can use sidekiq to do some excellent things. Presented at RubyConf Australia 2014 in Sydney on the 21st of February, 2014.

Darcy Laycock

February 21, 2014
Tweet

More Decks by Darcy Laycock

Other Decks in Programming

Transcript

  1. class HelloPersonWorker! ! include Sidekiq::Worker! sidekiq_options queue: "onboarding"! ! def

    perform(name)! Rails.logger.info "Saying hello to #{name}"! end! ! end!
  2. {! "class": "HelloPersonWorker",! "args": ["Darcy"],! "retry": true,! "queue": "onboarding",! "jid":

    "2dbfbc3f28d26be12107db84",! "enqueued_at": 1392701981.091299! }
  3. class BasicFetch! def initialize(options)! @strictly_ordered_queues = !!options[:strict]! @queues = options[:queues].map

    { |q| "queue:#{q}" }! @unique_queues = @queues.uniq! end! ! def retrieve_work! work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }! UnitOfWork.new(*work) if work! end! ! # ... more goes here ...! end!
  4. UnitOfWork = Struct.new(:queue, :message) do! def acknowledge! # nothing to

    do! end! ! def queue_name! queue.gsub(/.*queue:/, '')! end! ! def requeue! Sidekiq.redis do |conn|! conn.rpush("queue:#{queue_name}", message)! end! end! end!
  5. module Sidekiq! module Middleware! module Server! class Logging! ! def

    call(worker, item, queue)! Sidekiq::Logging.with_context("#{worker.class.to_s} JID-#{item['jid']}") do! begin! start = Time.now! logger.info { "start" }! yield! logger.info { "done: #{elapsed(start)} sec" }! rescue Exception! logger.info { "fail: #{elapsed(start)} sec" }! raise! end! end! end! ! def elapsed(start)! (Time.now - start).to_f.round(3)! end! ! def logger! Sidekiq.logger! end! end! end! end! end!
  6. def call(worker, item, queue)! Sidekiq::Logging.with_context("#{worker.class.to_s} JID-#{item['jid']}") do! begin! start =

    Time.now! logger.info { "start" }! yield! logger.info { "done: #{elapsed(start)} sec" }! rescue Exception! logger.info { "fail: #{elapsed(start)} sec" }! raise! end! end! end!
  7. module Sidekiq! module Middleware! module Client! class UniqueJobs! def call(worker_class,

    item, queue)! worker_class = worker_class.constantize if worker_class.is_a?(String)! enabled = Sidekiq::Middleware::Helpers.unique_enabled?(worker_class, item)! ! if enabled! expiration = Sidekiq::Middleware::Helpers.unique_exiration(worker_class)! job_id = item['jid']! unique = false! ! # Scheduled! if item.has_key?('at')! # Use expiration period as specified in configuration,! # but relative to job schedule time! expiration += (item['at'].to_i - Time.now.to_i)! end! ! unique_key = Sidekiq::Middleware::Helpers.unique_digest(worker_class, item)! ! Sidekiq.redis do |conn|! conn.watch(unique_key)! ! locked_job_id = conn.get(unique_key)! if locked_job_id && locked_job_id != job_id! conn.unwatch! else! unique = conn.multi do! conn.setex(unique_key, expiration, job_id)! end! end! end! ! yield if unique! else! yield! end! end! end! end! end! end!
  8. unique_key = Sidekiq::Middleware::Helpers.unique_digest(worker_class, item)! ! Sidekiq.redis do |conn|! conn.watch(unique_key)! !

    locked_job_id = conn.get(unique_key)! if locked_job_id && locked_job_id != job_id! conn.unwatch! else! unique = conn.multi do! conn.setex(unique_key, expiration, job_id)! end! end! end! ! yield if unique!
  9. 1. Generate a key from the job structure 2. Check

    if locked 3. Set lock (with expiry) if not locked
  10. module Sidekiq! module Middleware! module Server! class UniqueJobs! def call(worker_instance,

    item, queue)! worker_class = worker_instance.class! enabled = Sidekiq::Middleware::Helpers.unique_enabled?(worker_class, item)! ! if enabled! begin! yield! ensure! unless Sidekiq::Middleware::Helpers.unique_manual?(worker_class)! clear(worker_class, item)! end! end! else! yield! end! end! ! def clear(worker_class, item)! Sidekiq.redis do |conn|! conn.del Sidekiq::Middleware::Helpers.unique_digest(worker_class, item)! end! end! end! end! end! end!
  11. “I always want at least 5 worker servers, but I

    can happily jump to 100 if need be”
  12. Sidekiq CloudWatch! (Metrics) Publish Sidekiq Queue Sizes AutoScaling Scale Up

    / Down based on # of waiting jobs EC2 Instances Launch Instances w/ Queue or Groups specified via User Data
  13. Sidekiq CloudWatch! (Metrics) Publish Sidekiq Queue Sizes AutoScaling Scale Up

    / Down based on # of waiting jobs EC2 Instances Launch Instances w/ Queue or Groups specified via User Data Read Queues / Group from the config, start processing jobs
  14. ---! queues:! <% if ENV['SIDEKIQ_GROUP'] == 'onboarding' %>! - [hello,

    10]! - [world, 5]! <% else %>! - [hello, 2]! - [world, 2]! - [default, 1]! <% end %>!