如何管理ruby线程以便完成所有工作?

我有一个可以分成独立单元的计算,我现在处理它的方式是创建一个固定数量的线程,然后在每个线程中分发要完成的工作块。 所以在伪代码中这就是它的样子

# main thread work_units.take(10).each {|work_unit| spawn_thread_for work_unit} def spawn_thread_for(work) Thread.new do do_some work more_work = work_units.pop spawn_thread_for more_work unless more_work.nil? end end 

基本上,一旦创建了初始线程数,每个线程都会完成一些工作,然后继续从工作堆中完成工作,直到没有剩下任何东西。 当我在irb中运行时,一切正常,但是当我使用解释器执行脚本时,事情并没有那么好用。 我不确定如何使主线程等到所有工作完成。 有没有一种很好的方法可以做到这一点,或者我一直坚持执行sleep 10 until work_units.empty? 在主线程中

如果修改spawn_thread_for以保存对创建的Thread的引用,则可以在线程上调用Thread#join以等待完成:

 x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" } a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" } x.join # Let the threads finish before a.join # main thread exits... 

生产:

 abxyzc 

(从ri Thread.new文档中ri Thread.new 。有关更多详细信息,请参阅ri Thread.join文档。)

因此,如果修改spawn_thread_for以保存线程引用,则可以将它们全部加入:

(未经测试,但应该给出味道)

 # main thread work_units = Queue.new # and fill the queue... threads = [] 10.downto(1) do threads << Thread.new do loop do w = work_units.pop Thread::exit() if w.nil? do_some_work(w) end end end # main thread continues while work threads devour work threads.each(&:join) 

在ruby 1.9(和2.0)中,您可以使用stdlib中的ThreadsWait来实现此目的:

 require 'thread' require 'thwait' threads = [] threads << Thread.new { } threads << Thread.new { } ThreadsWait.all_waits(*threads) 

看起来你正在复制Parallel Each( Peach )库提供的内容。

 Thread.list.each{ |t| t.join unless t == Thread.current } 

您可以使用Thread#join

加入(p1 = v1)public

调用线程将暂停执行并运行thr。 直到thr退出或直到极限秒已经过去才返回。 如果时间限制到期,则返回nil,否则返回thr。

您还可以使用Enumerable#each_slice批量迭代工作单元

 work_units.each_slice(10) do |batch| # handle each work unit in a thread threads = batch.map do |work_unit| spawn_thread_for work_unit end # wait until current batch work units finish before handling the next batch threads.each(&:join) end 
    Interesting Posts