いまさらRubyでMapReduce

以下の記事を見つけていまさら書いてみた.
http://romeda.org/blog/2007/04/mapreduce-in-36-lines-of-ruby.html
Hadoopとかいろいろあるけどさ.

実行環境

正確かな?
OS: VMWare上のCentOS
Ruby: 1.9.1

% uname -a
Linux localhost.localdomain 2.6.18-128.1.6.el5 #1 SMP Wed Apr 1 09:19:18 EDT 2009 i686 i686 i386 GNU/Linux
% ruby -v
ruby 1.9.1p0 (2009-01-30 revision 21907) [i686-linux]

必要なgem?

RingyDingy

% sudo gem install RingyDingy

他にもいるのかもしれないけど.

実行例

サーバ側
% ring_server &

% ruby mapreduce_runner.rb &

% ruby mapreduce_runner.rb &

とりあえず2つ起動させた

クライアント側
% irb -r mapreduce_enumerator.rb
irb(main):001:0> (0..10).to_a.mapreduce(->(x){x*2}, 0, ->(sum,x){sum+=x})
=> 110

引数として(map用Proc,reduce初期値,reduce用Proc)を渡す.
blockを複数渡すにはどうすればいいの?

コード

mapreduce_runner.rb
require 'rubygems'
require 'ringy_dingy'

ringy_dingy = RingyDingy.new nil
ring_server = ringy_dingy.ring_server

loop do
  identifier, pid, block, element, idx = ring_server.take([nil, nil, nil, nil, nil])
  begin
    result = block.call(element)
  rescue Object => err
    result = err
  end
  puts "Got #{result.inspect} from #{element} for pid:#{pid}."
  ring_server.write([identifier, pid, result, idx])
end
mapreduce_enumerator.rb
require 'rubygems'
require 'ringy_dingy'

module Enumerable
  def dmap(&block)
    self.each_with_index do |element,idx|
      ring_server.write([:dmap, Process.pid, block, element, idx])
    end

    results = []
    self.size.times do
      result, idx = ring_server.take([:dmap, Process.pid, nil, nil]).last(2)
      results[idx] = result
    end

    results
  end

  def mapreduce(map_proc, initial_value, reduce_proc)
    self.each_with_index do |element,idx|
      ring_server.write([:mapreduce, Process.pid, map_proc, element, idx])
    end

    results = initial_value
    self.size.times do
      result, idx = ring_server.take([:mapreduce, Process.pid, nil, nil]).last(2)
      results = reduce_proc.call(results, result)
    end
    return results
  end

  def ring_server
    return @ring_server if @ring_server

    ringy_dingy = RingyDingy.new nil
    @ring_server = ringy_dingy.ring_server
  end
end

戯言

実行するmapreduce毎にユニークな識別子を指定しないと結果が混ざりそうです.