いまさらRubyでMapReduce
以下の記事を見つけていまさら書いてみた.
http://romeda.org/blog/2007/04/mapreduce-in-36-lines-of-ruby.html
Hadoopとかいろいろあるけどさ.
実行環境
正確かな?
OS: VMWare上のCentOS
Ruby: 1.9.1
% uname -a Linux localhost.localdomain 2.6.18-128.1.6.el5 #1 SMP Wed Apr 1 09:19:18 EDT 2009 i686 i686 i386 GNU/Linux % ruby -v ruby 1.9.1p0 (2009-01-30 revision 21907) [i686-linux]
必要なgem?
RingyDingy
% sudo gem install RingyDingy
他にもいるのかもしれないけど.
実行例
サーバ側
% ring_server & % ruby mapreduce_runner.rb & % ruby mapreduce_runner.rb &
とりあえず2つ起動させた
クライアント側
% irb -r mapreduce_enumerator.rb irb(main):001:0> (0..10).to_a.mapreduce(->(x){x*2}, 0, ->(sum,x){sum+=x}) => 110
引数として(map用Proc,reduce初期値,reduce用Proc)を渡す.
blockを複数渡すにはどうすればいいの?
コード
mapreduce_runner.rb
require 'rubygems' require 'ringy_dingy' ringy_dingy = RingyDingy.new nil ring_server = ringy_dingy.ring_server loop do identifier, pid, block, element, idx = ring_server.take([nil, nil, nil, nil, nil]) begin result = block.call(element) rescue Object => err result = err end puts "Got #{result.inspect} from #{element} for pid:#{pid}." ring_server.write([identifier, pid, result, idx]) end
mapreduce_enumerator.rb
require 'rubygems' require 'ringy_dingy' module Enumerable def dmap(&block) self.each_with_index do |element,idx| ring_server.write([:dmap, Process.pid, block, element, idx]) end results = [] self.size.times do result, idx = ring_server.take([:dmap, Process.pid, nil, nil]).last(2) results[idx] = result end results end def mapreduce(map_proc, initial_value, reduce_proc) self.each_with_index do |element,idx| ring_server.write([:mapreduce, Process.pid, map_proc, element, idx]) end results = initial_value self.size.times do result, idx = ring_server.take([:mapreduce, Process.pid, nil, nil]).last(2) results = reduce_proc.call(results, result) end return results end def ring_server return @ring_server if @ring_server ringy_dingy = RingyDingy.new nil @ring_server = ringy_dingy.ring_server end end
戯言
実行するmapreduce毎にユニークな識別子を指定しないと結果が混ざりそうです.