线程和队列

我有兴趣知道实现基于线程的队列的最佳方法是什么。

例如:

我有10个动作,我想用4个线程执行。 我想创建一个队列,其中所有10个动作都是线性放置的,并且用4个线程开始前4个动作,一旦一个线程完成执行,下一个将启动等 – 所以一次,线程的数量是4或小于4。

标准库中的thread中有一个Queue类。 使用它你可以做这样的事情:

 require 'thread' queue = Queue.new threads = [] # add work to the queue queue << work_unit 4.times do threads << Thread.new do # loop until there are no more things to do until queue.empty? # pop with the non-blocking flag set, this raises # an exception if the queue is empty, in which case # work_unit will be set to nil work_unit = queue.pop(true) rescue nil if work_unit # do work end end # when there is no more work, the thread will stop end end # wait until all threads have completed processing threads.each { |t| t.join } 

我用非阻塞标志弹出的原因是until queue.empty?之间until queue.empty? 并且pop另一个线程可能已经弹出了队列,所以除非设置了非阻塞标志,否则我们可能永远陷入该行。

如果你正在使用MRI,默认的Ruby解释器,请记住线程不是绝对并发的。 如果您的工作受CPU限制,您也可以运行单线程。 如果你有一些阻止IO的操作你可能会得到一些并行性,但是YMMV。 或者,您可以使用允许完全并发的解释器,例如jRuby或Rubinius。

有一些gem可以为你实现这种模式; 平行,桃子和我的被称为threach (或jruby下的jruby_threach )。 它是#each的直接替代品,但允许您指定要运行的线程数,使用下面的SizedQueue来防止螺旋式失控。

所以…

 (1..10).threach(4) {|i| do_my_work(i) } 

不要推我自己的东西; 有很多很好的实现可以让事情变得更容易。

如果您正在使用JRuby, jruby_threach是一个更好的实现 – Java只提供了更丰富的线程原始数据和数据结构。

可执行的描述性示例:

 require 'thread' p tasks = [ {:file => 'task1'}, {:file => 'task2'}, {:file => 'task3'}, {:file => 'task4'}, {:file => 'task5'} ] tasks_queue = Queue.new tasks.each {|task| tasks_queue << task} # run workers workers_count = 3 workers = [] workers_count.times do |n| workers << Thread.new(n+1) do |my_n| while (task = tasks_queue.shift(true) rescue nil) do delay = rand(0) sleep delay task[:result] = "done by worker ##{my_n} (in #{delay})" p task end end end # wait for all threads workers.each(&:join) # output results puts "all done" p tasks 

您可以使用线程池。 对于这类问题,这是一种相当常见的模式。
http://en.wikipedia.org/wiki/Thread_pool_pattern

Github似乎有一些你可以尝试的实现:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool

赛璐珞有一个工作池示例 ,可以做到这一点。

我使用名为work_queue的gem。 它真的很实用。

例:

 require 'work_queue' wq = WorkQueue.new 4, 10 (1..10).each do |number| wq.enqueue_b("Thread#{number}") do |thread_name| puts "Hello from the #{thread_name}" end end wq.join