ThreadPool中的死锁

我找不到适合Ruby的ThreadPool实现,所以我写了我的(部分基于这里的代码: http : //web.archive.org/web/20081204101031/http : //snippets.dzone.com : 80/ posts / show / 3276 ,但更改为等待/信号和ThreadPool关闭的其他实现。但是经过一段时间的运行(有100个线程并处理大约1300个任务),它在第25行死机 – 它等待一个新的工作那里。任何想法,为什么会发生?

require 'thread' begin require 'fastthread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(callback) @mutex = Mutex.new @cv = ConditionVariable.new @callback = callback @mutex.synchronize {@running = true} @thread = Thread.new do while @mutex.synchronize {@running} block = get_block if block block.call reset_block # Signal the ThreadPool that this worker is ready for another job @callback.signal else # Wait for a new job @mutex.synchronize {@cv.wait(@mutex)} # = @max_size worker = Worker.new(self) @workers << worker worker end end 

好的,所以实现的主要问题是:如何确保没有信号丢失并避免死锁?

根据我的经验,使用条件变量和互斥量很难实现,但使用信号量很容易。 事实上,ruby实现了一个名为Queue(或SizedQueue)的对象,可以解决问题。 这是我建议的实现:

 require 'thread' begin require 'fasttread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(thread_queue) @mutex = Mutex.new @cv = ConditionVariable.new @queue = thread_queue @running = true @thread = Thread.new do @mutex.synchronize do while @running @cv.wait(@mutex) block = get_block if block @mutex.unlock block.call @mutex.lock reset_block end @queue << self end end end end def name @thread.inspect end def get_block @block end def set_block(block) @mutex.synchronize do raise RuntimeError, "Thread already busy." if @block @block = block # Signal the thread in this class, that there's a job to be done @cv.signal end end def reset_block @block = nil end def busy? @mutex.synchronize { !@block.nil? } end def stop @mutex.synchronize do @running = false @cv.signal end @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @queue = Queue.new @workers = [] end def size @workers.size end def busy? @queue.size < @workers.size end def shutdown @workers.each { |w| w.stop } @workers = [] end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? worker = get_worker worker.set_block(block) end private def get_worker if !@queue.empty? or @workers.size == @max_size return @queue.pop else worker = Worker.new(@queue) @workers << worker worker end end end 

这是一个简单的测试代码:

 tp = ThreadPool.new 500 (1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } tp.shutdown 

您可以尝试使用work_queue gem,它旨在协调生产者和工作线程池之间的工作。

我在这里略有偏见,但我建议用一些过程语言和模型检查它。 例如,可自由使用的工具是mCRL2工具集(使用基于ACP的语言),Mobility Workbench(pi-calculus)和Spin(PROMELA)。

否则,我建议删除对问题不重要的每一段代码,并找出发生死锁的最小情况。 我怀疑它是100个线程和1300个任务对于解决僵局至关重要。 使用较小的shell,您可以添加一些调试打印,提供足够的信息来解决问题。

好吧,问题似乎出现在你的ThreadPool#signal方法中。 可能发生的事情是:

1 – 你所有的工人都很忙,你试图处理一份新工作

2 – 第90行得到一个零工人

3 – 工作人员被释放并发出信号,但由于ThreadPool没有等待它,信号也会丢失

4 – 你落在第95行,即使有一个自由工人也在等待。

这里的错误是,即使没有人在听,你也可以向自由工作者发出信号。 这个ThreadPool#signal方法应该是:

 def signal @mutex.synchronize { @cv.signal } end 

而Worker对象中的问题是相同的。 可能发生的事情是:

1 – 工人刚刚完成了一份工作

2 – 检查(第17行)是否有作业等待:没有

3 – 线程池发送一个新作业并发出信号…但信号丢失

4 – 工作人员等待信号,即使它被标记为忙碌

你应该把你的initialize方法放在:

 def initialize(callback) @mutex = Mutex.new @cv = ConditionVariable.new @callback = callback @mutex.synchronize {@running = true} @thread = Thread.new do @mutex.synchronize do while @running block = get_block if block @mutex.unlock block.call @mutex.lock reset_block # Signal the ThreadPool that this worker is ready for another job @callback.signal else # Wait for a new job @cv.wait(@mutex) end end end end end 

接下来,不应再同步Worker#get_block和Worker#reset_block方法。 这样,在块测试和等待信号之间就不能为工作者分配块。

多年来,最佳评论者的代码帮助了很多。 这里更新了ruby 2.x并通过线程识别进行了改进。 这有什么改进? 当每个线程都有一个ID时,你可以用一个存储任意信息的数组组成ThreadPool。 一些想法:

  • 没有数组:典型的ThreadPool用法。 即使使用GIL,它也使得线程编码变得容易编码,对于高容量网络爬行等高延迟应用程序非常有用,
  • ThreadPool和Array的大小与CPU数量相同:易于fork进程以使用所有CPU,
  • ThreadPool和Array的大小与资源数量相同:例如,每个数组元素代表一个实例池中的一个处理器,因此如果您有10个实例,每个实例有4个CPU,则TP可以管理40个子进程的工作。

使用后两者,而不是考虑线程正在进行工作,而不是考虑ThreadPool管理正在进行工作的子进程。 管理任务是轻量级的,当与子进程结合时,谁关心GIL。

使用这个类,您可以在大约一百行代码中编写基于集群的MapReduce! 这段代码非常简短,虽然它可能有点令人费解。 希望能帮助到你。

 # Usage: # # Thread.abort_on_exception = true # help localize errors while debugging # pool = ThreadPool.new(thread_pool_size) # 50.times {|i| # pool.process { ... } # or # pool.process {|id| ... } # worker identifies itself as id # } # pool.shutdown() class ThreadPool require 'thread' class ThreadPoolWorker attr_accessor :id def initialize(thread_queue, id) @id = id # worker id is exposed thru tp.process {|id| ... } @mutex = Mutex.new @cv = ConditionVariable.new @idle_queue = thread_queue @running = true @block = nil @thread = Thread.new { @mutex.synchronize { while @running @cv.wait(@mutex) # block until there is work to do if @block @mutex.unlock begin @block.call(@id) ensure @mutex.lock end @block = nil end @idle_queue << self end } } end def set_block(block) @mutex.synchronize { raise RuntimeError, "Thread is busy." if @block @block = block @cv.signal # notify thread in this class, there is work to be done } end def busy? @mutex.synchronize { ! @block.nil? } end def stop @mutex.synchronize { @running = false @cv.signal } @thread.join end def name @thread.inspect end end attr_accessor :max_size, :queue def initialize(max_size = 10) @process_mutex = Mutex.new @max_size = max_size @queue = Queue.new # of idle workers @workers = [] # array to hold workers # construct workers @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) } # queue up workers (workers in queue are idle and available to # work). queue blocks if no workers are available. @max_size.times {|i| @queue << @workers[i] } sleep 1 # important to give threads a chance to initialize end def size @workers.size end def idle @queue.size end # are any threads idle def busy? # @queue.size < @workers.size @queue.size == 0 && @workers.size == @max_size end # block until all threads finish def shutdown @workers.each {|w| w.stop } @workers = [] end alias :join :shutdown def process(block = nil, &blk) @process_mutex.synchronize { block = blk if block_given? worker = @queue.pop # assign to next worker; block until one is ready worker.set_block(block) # give code block to worker and tell it to start } end end