Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Threadsafe Upsert

Threadsafe Upsert

By implementing a simple csv import use case, this talk discusses the thread-safety of upsert on MYSQL.

Avatar for Myles Megyesi

Myles Megyesi

November 14, 2014
Tweet

More Decks by Myles Megyesi

Other Decks in Technology

Transcript

  1. CSV Import Name Email Last Updated John j@j.com 2014-11-14 12:42:24

    Mary m@m.com 2014-11-13 12:42:24 Sam s@s.com 2014-11-12 12:42:24
  2. Requirements • If the person does not exist, create them

    • If the person’s email already exists, update their name and last updated timestamp and do not create a new row
  3. Setup before :each do with_db_connection do |connection| connection.create_table(:people) do primary_key

    :id String :vendor_name String :vendor_email, unique: true Time :vendor_updated_at end end end
  4. it 'creates a person if they do not exist' do

    with_db_connection do |connection| result = upsert(connection, 'John', 'j@j.com', yesterday) expect(result).to eq(status: :success) people = all_people(connection) expect(people.size).to eq(1) row = people.first expect(row[:vendor_name]).to eq('John') expect(row[:vendor_email]).to eq('j@j.com') expect(row[:vendor_updated_at]).to eq(yesterday) end end
  5. % bundle exec rspec F Failures: 1) Upsert::Mysql creates a

    person if they do not exist Failure/Error: result = upsert(connection, 'John', 'j@j.com', yesterday) NoMethodError: undefined method `upsert' for Upsert::Mysql:Class Finished in 0.02278 seconds (files took 0.29508 seconds to load) 1 example, 1 failure
  6. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) connection.from(:people).insert({

    vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end
  7. % bundle exec rspec . Finished in 0.04644 seconds (files

    took 0.29773 seconds to load) 1 example, 0 failures
  8. it "updates the person's name and updated_at if the email

    already exists" do with_db_connection do |connection| create_result = upsert(connection, 'John', 'j@j.com', yesterday) update_result = upsert(connection, 'Johnathon', 'j@j.com', now) expect(update_result).to eq(status: :success) people = all_people(connection) expect(people.size).to eq(1) row = people.first expect(row[:vendor_name]).to eq('Johnathon') expect(row[:vendor_email]).to eq('j@j.com') expect(row[:vendor_updated_at]).to eq(now) end end
  9. % bundle exec rspec spec/upsert/mysql_spec.rb Compiled extensions not installed, pure

    Ruby Atomic will be used. .F Failures: 1) Upsert::Mysql updates the person's name and updated_at if the email already exists Failure/Error: update_result = upsert(connection, 'Johnathon', 'j@j.com', now) Sequel::UniqueConstraintViolation: Mysql2::Error: Duplicate entry 'j@j.com' for key 'vendor_email' Finished in 0.05595 seconds (files took 0.28592 seconds to load) 2 examples, 1 failure Failed examples: rspec ./spec/upsert/shared_examples.rb:44 # Upsert::Mysql updates the person's name and updated_at if the email already exists
  10. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) connection.from(:people).insert({

    vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end
  11. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) row

    = connection.from(:people). where(vendor_email: email). first if row connection.from(:people). where(vendor_email: email). update({ vendor_name: name, vendor_updated_at: updated_at }) {status: :success} else connection.from(:people).insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end end
  12. % bundle exec rspec ... Finished in 0.06233 seconds (files

    took 0.27974 seconds to load) 2 examples, 0 failures
  13. it 'does not override newer data with old data' do

    with_db_connection do |connection| create_result = upsert(connection, 'John', 'j@j.com', now) update_result = upsert(connection, 'Johnathon', 'j@j.com', yesterday) expect(update_result).to eq({ status: :failure, reason: :stale_data }) people = all_people(connection) expect(people.size).to eq(1) row = people.first expect(row[:vendor_name]).to eq('John') expect(row[:vendor_email]).to eq('j@j.com') expect(row[:vendor_updated_at]).to eq(now) end end
  14. % bundle exec rspec ..F Failures: 1) Upsert::Mysql does not

    override newer data with old data Failure/Error: expect(update_result).to eq({ expected: {:status=>:failure, :reason=>:stale_data} got: {:status=>:success} (compared using ==) Finished in 0.07561 seconds (files took 0.28662 seconds to load) 3 examples, 1 failure
  15. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) row

    = connection.from(:people). where(vendor_email: email). first if row connection.from(:people). where(vendor_email: email). update({ vendor_name: name, vendor_updated_at: updated_at }) {status: :success} else connection.from(:people).insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end end
  16. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) row

    = connection.from(:people). where(vendor_email: email). first if row if updated_at > row[:vendor_updated_at] connection.from(:people). where(vendor_email: email). update({ vendor_name: name, vendor_updated_at: updated_at }) {status: :success} else {status: :failure, reason: :stale_data} end else connection.from(:people).insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end end
  17. % bundle exec rspec ... Finished in 0.06233 seconds (files

    took 0.27974 seconds to load) 3 examples, 0 failures
  18. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) row

    = connection.from(:people). where(vendor_email: email). first if row if updated_at > row[:vendor_updated_at] connection.from(:people). where(vendor_email: email). update({ vendor_name: name, vendor_updated_at: updated_at }) {status: :success} else {status: :failure, reason: :stale_data} end else connection.from(:people).insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end end
  19. A race condition occurs when two or more threads can

    access shared data and they try to change it at the same time. — Some guy on StackOverflow
  20. it 'handles many writers trying to update' do writers =

    100 updated_at_times = generate_n_times_between(yesterday, now, writers) in_parallel_options = { times: writers, concurrency: writers / 5, args: updated_at_times.shuffle } results = in_parallel(in_parallel_options) do |connection, updated_at| upsert(connection, 'John', 'j@j.com', updated_at) end expect(results).to have(writers).items successful_writes, failed_writes = results.partition do |result| result[:status] == :success end expect(successful_writes).to have_at_least(1).item with_db_connection do |connection| people = all_people(connection) expect(people.size).to eq(1) row = people.first expect(row[:vendor_name]).to eq('John') expect(row[:vendor_email]).to eq('j@j.com') expect(row[:vendor_updated_at]).to eq(updated_at_times.max) end end
  21. % bundle exec rspec .... Finished in 0.12343 seconds (files

    took 0.28886 seconds to load) 4 examples, 0 failures
  22. % bundle exec rspec ...F Failures: 1) Upsert::Mysql handles many

    writers trying to update Failure/Error: expect(row[:vendor_updated_at]).to eq(updated_at_times.max) expected: 2014-11-14 12:42:24.000000000 +0000 got: 2014-11-14 10:32:48.000000000 +0000 Finished in 12.62 seconds (files took 1.49 seconds to load) 4 examples, 1 failure
  23. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) row

    = connection.from(:people). where(vendor_email: email). first if row if updated_at > row[:vendor_updated_at] connection.from(:people). where(vendor_email: email). update({ vendor_name: name, vendor_updated_at: updated_at }) {status: :success} else {status: :failure, reason: :stale_data} end else connection.from(:people).insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end end
  24. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) row

    = connection.from(:people). where(vendor_email: email). first if row affected_rows = connection.from(:people). where(vendor_email: email). where(Sequel.expr(updated_at) > :vendor_updated_at). update({ vendor_name: name, vendor_updated_at: updated_at }) if affected_rows == 0 {status: :failure, reason: :stale_data} else {status: :success} end else connection.from(:people).insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end end
  25. % bundle exec rspec .... Finished in 0.12343 seconds (files

    took 0.28886 seconds to load) 4 examples, 0 failures
  26. it 'handles many writers trying to insert the same piece

    of data' do writers = 20 results = in_parallel(times: writers) do |connection| upsert(connection, 'John', 'j@j.com', now) end expect(results.size).to eq(writers) successful_writes, failed_writes = results.partition do |result| result[:status] == :success end expect(successful_writes.size).to eq(1) expect(failed_writes.size).to eq(writers - 1) expect(failed_writes.map{|r| r[:reason]}).to all(eq(:stale_data)) with_db_connection do |connection| people = all_people(connection) expect(people.size).to eq(1) row = people.first expect(row[:vendor_name]).to eq('John') expect(row[:vendor_email]).to eq('j@j.com') expect(row[:vendor_updated_at]).to eq(now) end end
  27. % bundle exec rspec ....F Failures: 1) Upsert::Mysql handles many

    writers trying to insert the same piece of data Failure/Error: expect(results.size).to eq(writers) expected: 100 got: 85 Finished in 2.02 seconds (files took 1.64 seconds to load) 5 examples, 1 failure
  28. it 'handles many writers trying to insert the same piece

    of data' do writers = 20 results = in_parallel(times: writers) do |connection| upsert(connection, 'John', 'j@j.com', now) end expect(results.size).to eq(writers) successful_writes, failed_writes = results.partition do |result| result[:status] == :success end expect(successful_writes.size).to eq(1) expect(failed_writes.size).to eq(writers - 1) expect(failed_writes.map{|r| r[:reason]}).to all(eq(:stale_data)) with_db_connection do |connection| people = all_people(connection) expect(people.size).to eq(1) row = people.first expect(row[:vendor_name]).to eq('John') expect(row[:vendor_email]).to eq('j@j.com') expect(row[:vendor_updated_at]).to eq(now) end end
  29. results = in_parallel(times: writers, concurrency: writers / 5) do |connection|

    begin upsert(connection, 'John', 'j@j.com', now) rescue => e puts "#{Thread.current.object_id} #{e.message}" raise end end
  30. % bundle exec rspec .... 2724 Sequel::UniqueConstraintViolation: Duplicate entry 'j@j.com'

    for key 'vendor_email' 2728 Sequel::UniqueConstraintViolation: Duplicate entry 'j@j.com' for key 'vendor_email' 2732 Sequel::UniqueConstraintViolation: Duplicate entry 'j@j.com' for key 'vendor_email' 2730 Sequel::UniqueConstraintViolation: Duplicate entry 'j@j.com' for key 'vendor_email' F Failures: 1) Upsert::Mysql handles many writers trying to insert the same piece of data Failure/Error: expect(results.size).to eq(writers) expected: 100 got: 96 Finished in 2.31 seconds (files took 1.35 seconds to load) 5 examples, 1 failures
  31. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) row

    = connection.from(:people). where(vendor_email: email). first if row affected_rows = connection.from(:people). where(vendor_email: email). where(Sequel.expr(updated_at) > :vendor_updated_at). update({ vendor_name: name, vendor_updated_at: updated_at }) if affected_rows == 0 {status: :failure, reason: :stale_data} else {status: :success} end else connection.from(:people).insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} end end end end
  32. module Upsert class Mysql def self.upsert(connection, name, email, updated_at) table

    = connection.from(:people) begin table.insert({ vendor_name: name, vendor_email: email, vendor_updated_at: updated_at }) {status: :success} rescue Sequel::UniqueConstraintViolation => e affected_rows = table. where(Sequel.expr(updated_at) > :vendor_updated_at). where(vendor_email: email). update({ vendor_name: name, vendor_updated_at: updated_at, }) if affected_rows == 0 {status: :failure, reason: :stale_data} else {status: :success} end end end end end
  33. % bundle exec rspec ..... Finished in 2.15 seconds (files

    took 1.42 seconds to load) 5 examples, 0 failures