在Heroku上重启后,长时间运行的delayed_job作业保持锁定状态

当重新启动Heroku工作程序时(无论是在命令上还是作为部署的结果),Heroku都会将SIGTERM发送到工作进程。 在delayed_job的情况下, 捕获 SIGTERM 信号 ,然后在当前作业(如果有)停止后工作程序停止执行。

如果工人需要很长时间才能完成,那么Heroku将发送SIGKILL 。 在delayed_job的情况下,这会在数据库中delayed_job一个锁定的作业,该作业不会被另一个工作人员接收。

我想确保工作最终完成(除非出现错误)。 鉴于此,采用这种方法的最佳方法是什么?

我看到两个选择。 但是我想得到其他的意见:

  1. 修改delayed_job以在收到SIGTERM时停止处理当前作业(并释放锁定)。
  2. 找出(程序化的)方法来检测孤立的锁定作业,然后解锁它们。

有什么想法吗?

TLDR:

把它放在你的工作方法的顶部:

 begin term_now = false old_term_handler = trap 'TERM' do term_now = true old_term_handler.call end 

确保每十秒至少调用一次:

  if term_now puts 'told to terminate' return true end 

在你的方法结束时,把这个:

 ensure trap 'TERM', old_term_handler end 

说明:

我遇到了同样的问题并且发现了这篇Heroku文章 。

该作业包含一个外循环,所以我按照文章并添加了一个trap('TERM')exit 。 但是, delayed_job选择了failed with SystemExit并将任务标记为失败。

由于SIGTERM现在被我们的trap困住, 因此不会调用worker的处理程序 ,而是立即重新启动作业,然后在几秒钟之后获得SIGKILL 。 回到原点。

我尝试了一些exit

  • return true表示作业成功(并将其从队列中删除),但如果队列中有另一个作业正在等待,则会遇到同样的问题。

  • exit! 将成功退出作业和工作人员, 它不允许工作人员从队列中删除该作业,因此您仍然有“孤立锁定作业”问题。

我的最终解决方案是在我的答案顶部给出的解决方案,它由三部分组成:

  1. 在我们开始潜在的长作业之前,我们通过执行trap (如Heroku文章中所述)为'TERM'添加一个新的中断处理程序,我们使用它来设置term_now = true

    但是我们还必须抓住old_term_handler ,它将延迟的作业工作者代码设置(由trap返回) 记得call它。

  2. 我们仍然必须确保我们将控制权返回给Delayed:Job:Worker有足够的时间进行清理和关闭,因此我们应该每隔十秒检查term_now (至少),如果为true ,则return

    您可以return truereturn false具体取决于您是否希望将作业视为成功。

  3. 最后,记住删除处理程序并在完成后安装Delayed:Job:Worker一个是至关重要的 。 如果你没有这样做,你将保留对我们添加的那个的悬空引用,如果你在其上添加另一个(例如,当工人再次启动这个工作时),可能会导致内存泄漏。

在SIGTERM上彻底中止工作

现在,一个更好的解决方案内置于delayed_job中。 使用此设置通过在初始化程序中添加此项来在TERM信号上引发exception:

 Delayed::Worker.raise_signal_exceptions = :term 

使用该设置,作业将在heroku发出针对非合作进程的最终KILL信号之前正确清理并退出:

您可能需要在SIGTERM信号上引发exception,Delayed :: Worker.raise_signal_exceptions =:term将导致worker引发SignalException,导致正在运行的作业中止并被解锁,这使得该作业可供其他工作人员使用。 此选项的默认值为false。

raise_signal_exceptions可能值为:

  • false – 不会引发exception(默认)
  • :term – 仅在TERM信号上引发exception,但INT将等待当前作业完成。
  • true – 将在TERM和INT上引发exception

从3.0.5版开始提供。

请参阅: https : //github.com/collectiveidea/delayed_job/commit/90579c3047099b6a58595d4025ab0f4b7f0aa67a

这就是max_run_time的用途:从作业被锁定之后经过max_run_time之后,其他进程将能够获取锁。

请参阅Google群组中的此讨论

新网站,所以不能评论戴夫的post,并需要添加一个新的答案。

我与戴夫的方法有关的问题是我的任务很长(几分钟到8小时),根本不重复。 我不能每隔10秒“确保打电话”。 此外,我已经尝试了Dave的答案,无论我返回什么内容,总是从队列中删除作业 – 无论是真还是假。 我不清楚如何将工作保留在队列中。

看到这个拉动请求 。 我认为这对我有用。 请随时评论并支持拉取请求。

我目前正在尝试陷阱,然后拯救出口信号……到目前为止没有运气。

我最终不得不在几个地方这样做,所以我创建了一个模块,我坚持使用lib /,然后从我的延迟作业的执行块中运行ExitOnTermSignal.execute {long_running_task}。

 # Exits whatever is currently running when a SIGTERM is received. Needed since # Delayed::Job traps TERM, so it does not clean up a job properly if the # process receives a SIGTERM then SIGKILL, as happens on Heroku. module ExitOnTermSignal def self.execute(&block) original_term_handler = Signal.trap 'TERM' do original_term_handler.call # Easiest way to kill job immediately and having DJ mark it as failed: exit end begin yield ensure Signal.trap 'TERM', original_term_handler end end end 

我使用状态机来跟踪作业的进度,并使进程具有幂等性,以便我可以多次调用给定作业/对象上的执行,并确信它不会重新应用破坏性操作。 然后更新rake task / delayed_job以释放TERM上的日志。

当进程重新启动时,它将按预期继续。