Ruby中的线程安全枚举器

TLDR:Ruby中是否有一个线程安全的Enumerator类版本?


我想做什么:

我在Ruby On Rails应用程序中有一个方法,我希望同时运行。 该方法应该创建一个包含来自站点的报告的zip文件,其中zip中的每个文件都是PDF。 从html到PDF的转换有点慢,因此需要multithreading。

我的期望如何:

我想使用5个线程,所以我想我会在线程之间有一个共享的枚举器。 每个线程都会从Enumerator中弹出一个值,并运行do stuff。 以下是我认为它会起作用的方式:

t = Zip::OutputStream::write_buffer do |z| mutex = Mutex.new gen = Enumerator.new{ |g| Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).find_each do |report| g.yield report end } 5.times.map { Thread.new do begin loop do mutex.synchronize do @report = gen.next end title = @report.title + "_" + @report.id.to_s title += ".pdf" unless title.end_with?(".pdf") pdf = PDFKit.new(render_to_string(:template => partial_url, locals: {array: [@report]}, :layout => false)).to_pdf mutex.synchronize do z.put_next_entry(title) z.write(pdf) end end rescue StopIteration # do nothing end end }.each {|thread| thread.join } end 

我尝试时发生了什么:

当我运行上面的代码时,我收到以下错误:

 FiberError at /generate_report fiber called across threads 

经过一些搜索,我发现这篇文章 ,建议我使用队列而不是枚举器,因为队列是线程安全的,而枚举器不是。 虽然这对非Rails应用程序来说可能是合理的,但这对我来说是不切实际的。

为什么我不能只使用队列:

关于Rails 4 ActiveRecord的好处是它在迭代之前不会加载查询。 并且,如果你使用像find_each这样的方法迭代它,它会以1000的批量进行,所以你不必一次将所有表存储在ram中。 我正在使用的查询结果: Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]})很大。 很大。 我需要能够动态加载它,而不是像下面那样:

 gen = Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).map(&queue.method(:push)) 

这会将整个查询加载到ram中。

最后问题是:

是否有一种线程安全的方法:

 gen = Enumerator.new{ |g| Report.all.includes(...).find_each do |report| g.yield report end } 

这样我就可以跨多个线程从gen弹出数据,而不必将我的整个Report (以及所有包含的)表加载到ram中?

如果在填充队列之前启动工作线程,它们将在填充时开始使用队列,并且因为根据经验 – 网络比CPU慢,所以每个批次应该(大部分)在时间消耗下一批到货:

 queue = Queue.new t1 = Thread.new do while !queue.empty? p queue.pop(true) sleep(0.1) end end t2 = Thread.new do while !queue.empty? p queue.pop(true) sleep(0.1) end end (0..1000).map(&queue.method(:push)) t1.join t2.join 

如果certificate它太慢,你可以选择使用SizedQueue ,如果队列达到足够大的大小,它将阻止push

 queue = SizedQueue.new(100) t1 = Thread.new do while !queue.empty? p "#{queue.pop(true)} - #{queue.size}" sleep(0.1) end end t2 = Thread.new do while !queue.empty? p queue.pop(true) sleep(0.1) end end (0..300).map(&queue.method(:push)) t1.join t2.join 
    Interesting Posts