reduce_func = payload.split(/\;\s+reduce/, 2) reduce_func = "reduce#{reduce_func}" socket.send( Reduce.new(reduce_func, call_maps(map_func)).call.to_s ) end def map(socket, payload) socket.send( Map.new(payload, @data).call.to_s ) end # run in parallel, then join results def call_maps(map_func) results = [] nodes = @ring.nodes -‐ [@name] nodes.map {|node| Thread.new do res = remote_call(node, "map #{map_func}") results += eval(res) end }.each{|w| w.join} results += Map.new(map_func, @data).call end end Tuesday, May 28, 13