ruby块内的赛璐珞异步不起作用

尝试在我的工作示例中实现Celluloid 异步似乎表现出奇怪的行为。

这里我的代码看起来

class Indefinite include Celluloid def run! loop do [1].each do |i| async.on_background end end end def on_background puts "Running in background" end end Indefinite.new.run! 

但是当我运行上面的代码时,我从未看到过“ 在后台运行

但是,如果我睡觉 ,代码似乎工作。

 class Indefinite include Celluloid def run! loop do [1].each do |i| async.on_background end sleep 0.5 end end def on_background puts "Running in background" end end Indefinite.new.run! 

任何的想法? 为什么在上述两个场景中存在这样的差异。

谢谢。

你的主循环主宰了actor /应用程序的线程。

您的所有程序正在执行的是生成后台进程,但从不运行它们。 你需要在循环中完全sleep以允许后台线程得到关注。

拥有无条件循环产生无限的后台进程通常不是一个好主意,就像你在这里一样。 应该有一个延迟或条件语句放在那里……否则你只有一个无限循环产生永远不会被调用的东西。

想想这样:如果你把put puts "looping"放在你的循环中,而你没有看到Running in the background ……你会看到一遍又一遍地looping


方法#1:使用everyevery块。

解决这个问题的最好方法是不要在loop使用sleep ,而是使用after或者every块,如下所示:

 every(0.1) { on_background } 

或者最重要的是,如果要在再次运行之前确保进程完全运行,请使用after

 def run_method @running ||= false unless @running @running = true on_background @running = false end after(0.1) { run_method } end 

使用loop不是一个好主意,除非有某种流程控制完成,或阻塞过程,如@server.accept …否则它只会拉出100%的CPU核心没有充分的理由。

顺便说一下,你也可以使用now_and_every以及now_and_after ……这会立即运行块,然后在你需要的时间后再次运行它。

使用every在这个要点中显示:


在我看来,理想的情况是:

这是一个粗略但可立即使用的示例:


 require 'celluloid/current' class Indefinite include Celluloid INTERVAL = 0.5 ONE_AT_A_TIME = true def self.run! puts "000a Instantiating." indefinite = new indefinite.run puts "000b Running forever:" sleep end def initialize puts "001a Initializing." @mutex = Mutex.new if ONE_AT_A_TIME @running = false puts "001b Interval: #{INTERVAL}" end def run puts "002a Running." unless ONE_AT_A_TIME && @running if ONE_AT_A_TIME @mutex.synchronize { puts "002b Inside lock." @running = true on_background @running = false } else puts "002b Without lock." on_background end end puts "002c Setting new timer." after(INTERVAL) { run } end def on_background if ONE_AT_A_TIME puts "003 Running background processor in foreground." else puts "003 Running in background" end end end Indefinite.run! puts "004 End of application." 

如果ONE_AT_A_TIMEtrue ,这将是它的输出:

 000a Instantiating. 001a Initializing. 001b Interval: 0.5 002a Running. 002b Inside lock. 003 Running background processor in foreground. 002c Setting new timer. 000b Running forever: 002a Running. 002b Inside lock. 003 Running background processor in foreground. 002c Setting new timer. 002a Running. 002b Inside lock. 003 Running background processor in foreground. 002c Setting new timer. 002a Running. 002b Inside lock. 003 Running background processor in foreground. 002c Setting new timer. 002a Running. 002b Inside lock. 003 Running background processor in foreground. 002c Setting new timer. 002a Running. 002b Inside lock. 003 Running background processor in foreground. 002c Setting new timer. 002a Running. 002b Inside lock. 003 Running background processor in foreground. 002c Setting new timer. 

如果ONE_AT_A_TIMEfalse ,这将是它的输出:

 000a Instantiating. 001a Initializing. 001b Interval: 0.5 002a Running. 002b Without lock. 003 Running in background 002c Setting new timer. 000b Running forever: 002a Running. 002b Without lock. 003 Running in background 002c Setting new timer. 002a Running. 002b Without lock. 003 Running in background 002c Setting new timer. 002a Running. 002b Without lock. 003 Running in background 002c Setting new timer. 002a Running. 002b Without lock. 003 Running in background 002c Setting new timer. 002a Running. 002b Without lock. 003 Running in background 002c Setting new timer. 002a Running. 002b Without lock. 003 Running in background 002c Setting new timer. 

您需要更“正常”而不是“线程化”才能正确发布任务并保留范围和状态,而不是在线程/角色之间发出命令……这就是every块和after块提供的内容。 除此之外,无论哪种方式,这都是很好的做法,即使你没有Global Interpreter Lock来处理,因为在你的例子中,你似乎并没有处理阻塞过程。 如果你有一个阻塞过程,那么无论如何都要有一个无限循环。 但是,由于你只是在处理之前最终会产生无数个后台任务,你需要像开始时的问题那样使用sleep ,或者完全使用不同的策略,并使用之后的every在处理任何类型sockets上的数据时, Celluloid本身如何鼓励您操作。


方法#2:使用递归方法调用。

这恰好出现在Google Group中。 下面的示例代码实际上允许执行其他任务,即使它是无限循环。

这种方法不太理想,因为它可能会产生更多的开销,产生一系列纤维。

 def work # ... async.work end 

问题#2: ThreadFiber行为。

第二个问题是为什么以下方法可行: loop { Thread.new { puts "Hello" } }

这产生了无数个进程线程,这些线程由RVM直接管理。 即使你正在使用的RVM有一个Global Interpreter Lock …这只意味着没有使用green threads ,这是由操作系统本身提供的……而是由进程本身处理。 进程的CPU调度程序毫不犹豫地运行每个Thread本身。 在示例的情况下, Thread运行非常快,然后死亡。

async任务相比,使用Fiber 。 所以在默认情况下发生了什么:

  1. 流程开始。
  2. 演员实例化。
  3. 方法调用调用循环。
  4. Loop调用async方法。
  5. async方法将任务添加到邮箱。
  6. 不调用邮箱,循环继续。
  7. 另一个async任务添加到邮箱。
  8. 这无限继续。

上面是因为循环方法本身是一个Fiber调用,它永远不会被挂起(除非调用sleep !)因此添加到邮箱的附加任务永远不会调用新的FiberFiber行为与Thread不同。 这是讨论差异的一篇很好的参考资料:


问题#3: CelluloidCelluloid::ZMQ行为。

第三个问题是为什么include Celluloid行为与Celluloid::ZMQ不同…

这是因为Celluloid::ZMQ使用基于反应堆的事件邮箱,而Celluloid使用基于条件变量的邮箱。

阅读有关流水线和执行模式的更多信息:

这就是两个例子之间的区别。 如果您对这些邮箱的行为有其他疑问,请随时在Google群组上发帖…您面临的主要动态是GILFiber vs. Thread vs. Reactor行为相互作用的独特性质。

您可以在此处阅读有关reactor-pattern的更多信息:

并在此处查看Celluloid::ZMQ使用的特定反应器:

因此,在事件邮箱场景中发生的事情是,当命中sleep时,这是阻塞调用,这会导致反应器移动到邮箱中的下一个任务。

但是,这也是你的情况所特有的, Celluloid::ZMQ使用的特定反应器正在使用一个永恒的C库……特别是0MQ库。 该反应堆在您的应用程序外部,其行为与Celluloid::IOCelluloid本身不同,这也是行为发生的原因与您预期的不同。

多核支持替代方案

如果维护状态和范围对您来说并不重要,如果使用不限于一个操作系统线程的jRubyRubinius ,而使用具有Global Interpreter Lock MRI ,则可以实例化多个actor并在actor之间发出async调用同时。

但我的拙见是,使用一个非常高频率的计时器会更好,例如在我的例子中为0.0010.1 ,这对于所有意图和目的而言似乎是瞬间的,但也允许演员有足够的时间来切换光纤并在邮箱中运行其他任务。

让我们做一个实验,通过稍微修改你的例子(我们修改它,因为这样我们得到相同的“怪异”行为,同时使事情变得清晰):

 class Indefinite include Celluloid def run! (1..100).each do |i| async.on_background i end puts "100 requests sent from #{Actor.current.object_id}" end def on_background(num) (1..100000000).each {} puts "message #{num} on #{Actor.current.object_id}" end end Indefinite.new.run! sleep # => # 100 requests sent from 2084 # message 1 on 2084 # message 2 on 2084 # message 3 on 2084 # ... 

您可以在任何Ruby解释器上运行它,使用CelluloidCelluloid::ZMQ ,结果总是相同的。 还要注意, Actor.current.object_id输出在两种方法中是相同的,给我们提供线索,我们在实验中处理单个actor。

因此,只要涉及此实验,ruby和Celluloid实现之间没有太大区别。

让我们首先解释为什么这段代码以这种方式运行?

不难理解为什么会这样。 赛璐珞正在接收传入的请求并将其保存在适当的actor的任务队列中。 注意,我们原来打电话来run! 在队列的顶部。

赛璐珞然后一次一个地处理这些任务。 如果恰好有阻塞调用或sleep调用,根据文档 ,将调用下一个任务,而不是等待当前任务完成。

请注意,在我们的实验中,没有阻止调用。 这意味着, run! 方法将从开始到结束执行,并且只有在完成之后,才会以完美的顺序调用每个on_background调用。

它应该是如何工作的。

如果在代码中添加sleep调用,它将通知Celluloid,它应该开始处理队列中的下一个任务。 因此,你在第二个例子中的行为。

现在让我们继续讨论如何设计系统,以便它不依赖于sleep呼叫,这至少是奇怪的。

实际上在Celluloid-ZMQ项目页面上有一个很好的例子。 注意这个循环:

 def run loop { async.handle_message @socket.read } end 

它首先做的是@socket.read 。 请注意,这是一个阻止操作。 因此,Celluloid将处理队列中的下一条消息(如果有的话)。 @socket.read响应后,将生成一个新任务。 但是,在再次调用@socket.read之前不会执行此任务,从而阻止执行,并通知Celluloid处理队列中的下一个项目。

您可能会看到与您的示例有所不同。 你没有阻止任何东西,因此没有给Celluloid一个机会处理队列。

我们怎样才能获得Celluloid::ZMQ示例中给出的行为?

第一个(在我看来,更好)解决方案是实际阻塞调用,如@socket.read

如果您的代码中没有阻塞调用,并且您仍需要在后台处理事物,那么您应该考虑Celluloid提供的其他机制。

赛璐珞有几种选择。 可以使用条件 , 期货 , 通知或仅在低级别调用wait / signal ,如下例所示:

 class Indefinite include Celluloid def run! loop do async.on_background result = wait(:background) #=> 33 end end def on_background puts "background" # notifies waiters, that they can continue signal(:background, 33) end end Indefinite.new.run! sleep # ... # background # background # background # ... 

使用与Celluloid::ZMQ sleep(0)

我还注意到你在评论中提到的working.rb文件。 它包含以下循环:

 loop { [1].each { |i| async.handle_message 'hello' } ; sleep(0) } 

看起来它正在做正确的工作。 实际上,在jRuby下运行它显示,它正在泄漏内存。 为了使其更加明显,请尝试在handle_message主体中添加一个sleep调用:

 def handle_message(message) sleep 0.5 puts "got message: #{message}" end 

高内存使用率可能与以下事实有关:队列填充速度非常快,无法在给定时间内处理。 如果handle_message更大,那么现在就会出现问题。

sleep解决方案

我对sleep解决方案持怀疑态度。 它们可能需要大量内存甚至产生内存泄漏。 并且不清楚你应该将什么作为参数传递给sleep方法以及为什么。

线程如何与Celluloid一起使用

Celluloid没有为每个异步任务创建新线程。 它有一个线程池,可以在其中运行每个任务,同步和异步任务。 关键是图书馆看到了run! 作为同步任务运行,并在与异步任务相同的上下文中执行。

默认情况下,Celluloid 在一个线程中运行所有内容,使用队列系统为以后安排异步任务。 它仅在需要时才创建新线程。

除此之外,赛璐珞覆盖了sleepfunction。 这意味着每次在扩展Celluloid类的类中调用sleep时,库都会检查其池中是否存在非hibernate线程。 在您的情况下,第一次调用sleep 0.5 ,它将创建一个新的Thread,以便在第一个线程处于sleep 0.5执行队列中的异步任务。

因此,在您的第一个示例中,只有一个Celluloid线程正在运行,执行循环。 在第二个示例中,两个Celluloid线程正在运行,第一个执行循环并在每次迭代时hibernate,另一个执行后台任务。

例如,您可以更改第一个示例以执行有限数量的迭代:

 def run! (0..100).each do [1].each do |i| async.on_background end end puts "Done!" end 

使用此run! function,你会看到Done! 在所有Running in background之前打印,意味着Celluloid完成了run!的执行run! 在同一个线程中启动异步任务之前的函数。