Upgrade to PRO for Only $50/Year—Limited-Time Offer! 🔥

Concurrency Model for MySQL data processing

Concurrency Model for MySQL data processing

My LT slide on RubyConf Taiwan 2012

Ryudo Awaru

December 08, 2012
Tweet

More Decks by Ryudo Awaru

Other Decks in Programming

Transcript

  1. Legacy environment • A Mysql Database with 2.3gb data with

    Big5 charset and ISO-8859-1 encoding. • The biggest table in DB is 1.5gb. 13年1月9⽇日星期三
  2. Work Flow 1. mysqldump with -default-character- set=latin1 parameter to generate

    SQL file. 2. Transcoding SQL file with tool like iconv/ bsdconv. 3. Edit transcoded SQL file to avoid 「slash」 problem. 4. Restore SQL file to new DB. 13年1月9⽇日星期三
  3. Cause • Too big size for most text editor. •

    Many mis-encoding text. 13年1月9⽇日星期三
  4. The new work flow • Connect DB • Transcode •

    Output db rows to SQL insert statement. • Write SQL file 13年1月9⽇日星期三
  5. CORES_COUNT = 4 LIMIT = ARGV[0].to_i || 10000 sqls =

    CORES_COUNT.times.map do |x| sprintf("SELECT * FROM cdb_posts ORDER BY pid LIMIT %d OFFSET %d;", LIMIT, (x * LIMIT)) end class String def to_my_val "'#{Mysql2::Client.escape self.force_encoding(‘Big5- UAO’).encode('UTF-8', :invalid => :replace, :undef => :replace, :replace => '??')}'" end end procs = sqls.map do |sql| Proc.new do |out| Mysql2::Client.new(database: DBNAME, reconnect: true, encoding: 'latin1').query(sql).each(as: :array) do |row| out.print "INSERT INTO `cdb_posts` VALUES (#{row.map(&:to_my_val).join(',')});\n" end end end procs.each{|p| p.call(OUT)} 13年1月9⽇日星期三
  6. But the file size is too big to wait for

    transcoding! 13年1月9⽇日星期三
  7. So I have to find the concurrency model to make

    it faster. 13年1月9⽇日星期三
  8. H&W Platform • 4 Cores Core2Quad [email protected] • 8GB RAM

    • 1*SSD • MacOS 10.8 • MRI 1.9.3p194 13年1月9⽇日星期三
  9. DBNAME = 'wwwfsc' CORES_COUNT = 4 ForceEncoding = 'Big5-UAO' LIMIT

    = ARGV[0].to_i || 10000 OUT = '/dev/null' sqls = CORES_COUNT.times.map do |x| sprintf("SELECT * FROM cdb_posts ORDER BY pid LIMIT %d OFFSET %d;", LIMIT, (x * LIMIT)) end class String def to_my_val "'#{Mysql2::Client.escape self.force_encoding(ForceEncoding).encode('UTF-8', :invalid => :replace, :undef => :replace, :replace => '??')}'" end end procs = sqls.map do |sql| Proc.new do |out| open(out,'w') do |io| Mysql2::Client.new(database: DBNAME, reconnect: true, encoding: 'latin1').query(sql).each(as: :array) do |row| io.print "INSERT INTO `cdb_posts` VALUES (#{row.map(&:to_my_val).join(',')});\n" end end end end 13年1月9⽇日星期三
  10. Circumstance • Thread ‣ CPU utilization rate between 105 and

    125 percent. • Fork ‣ The rate changes frequently between processes. 13年1月9⽇日星期三
  11. DBNAME = 'wwwfsc' CORES_COUNT = 4 limit = ARGV[0].to_i ||

    10000 sqls = CORES_COUNT.times.map do |x| sprintf("SELECT * FROM cdb_posts ORDER BY pid LIMIT %d OFFSET %d;", limit, (x * limit)) end procs = CORES_COUNT.times.map do |x| Proc.new do client = Mysql2::Client.new(database: DBNAME, reconnect: true) result = client.query(sqls[x]) end end Benchmark.bmbm(15) do |x| x.report("Thread"){procs.map{|p| Thread.new{p.call} }.each(&:join)} x.report("Fork"){procs.each{|p| fork{p.call} }; Process.waitall} x.report("Normal"){procs.each(&:call)} end 13年1月9⽇日星期三
  12. client = Mysql2::Client.new(database: DBNAME, reconnect: true, encoding: 'latin1') sql_raws =

    sqls.map do |sql| arr = [] client.query(sql).each(as: :array) do |row| arr << "(#{row.map(&:to_my_val).join(',')})" end arr end procs = sql_raws.map do |arr| proc do io = open('/dev/null','w') io.write "INSERT INTO `cdb_posts` VALUES " io.write arr.join(',') io.write "\n" io.close end end Benchmark.bm(15) do |x| x.report("Thread"){procs.map{|p| Thread.new{p.call} }.each(&:join)} x.report("Fork"){procs.each{|p| fork{p.call} }; Process.waitall} x.report("Normal"){procs.each{|p| p.call}} end 13年1月9⽇日星期三
  13. procs = sql_raws.map do |arr| proc do io = Tempfile.new(SecureRandom.uuid)#open('/dev/null','w')

    puts io.path io.write "INSERT INTO `cdb_posts` VALUES " io.write arr.join(',') io.write "\n" io.close end end 13年1月9⽇日星期三
  14. procs = sqls.map do |sql| Proc.new do io = Tempfile.new(SecureRandom.uuid)

    Mysql2::Client.new(database: DBNAME, reconnect: true, encoding: 'latin1').query(sql).each(as: :array) do |row| io.write "INSERT INTO `cdb_posts` VALUES (#{row.map(&:to_my_val).join(',')});\n" end io.close end end 13年1月9⽇日星期三
  15. Conclusion Thread fork normal MySQL2-read Fast Fast x Transcoding &

    iteration Slow Very fast x Write to the same I/O Very slow Slow Fast Write to the different I/O Fast Slow Fast 13年1月9⽇日星期三