Sidekiq:确保队列中的所有作业都是唯一的

我有一些更新触发器,将作业推送到Sidekiq队列。 因此,在某些情况下,可以有多个作业来处理同一个对象。

有几个独特的插件( “中间件” , 独特的工作 ),它们没有太多记录,但它们似乎更像是节流器,以防止重复处理 ; 我想要的是一个阻止重复创建相同工作的节流器。 这样,对象将始终以最新鲜的状态进行处理。 是否有插件或技术?


更新:我没有时间制作中间件,但我最终得到了一个相关的清理function,以确保队列是唯一的: https : //gist.github.com/mahemoff/bf419c568c525f0af903

我的建议是在安排新的标准之前,根据一些选择标准搜索先前的预定作业并删除。 当我想要一个特定的Object和/或其中一个方法的单个预定作业时,这对我很有用。

此上下文中的一些示例方法:

find_jobs_for_object_by_method(klass, method) jobs = Sidekiq::ScheduledSet.new jobs.select { |job| job.klass == 'Sidekiq::Extensions::DelayedClass' && ((job_klass, job_method, args) = YAML.load(job.args[0])) && job_klass == klass && job_method == method } end ## # delete job(s) specific to a particular class,method,particular record # will only remove djs on an object for that method # def self.delete_jobs_for_object_by_method(klass, method, id) jobs = Sidekiq::ScheduledSet.new jobs.select do |job| job.klass == 'Sidekiq::Extensions::DelayedClass' && ((job_klass, job_method, args) = YAML.load(job.args[0])) && job_klass == klass && job_method == method && args[0] == id end.map(&:delete) end ## # delete job(s) specific to a particular class and particular record # will remove any djs on that Object # def self.delete_jobs_for_object(klass, id) jobs = Sidekiq::ScheduledSet.new jobs.select do |job| job.klass == 'Sidekiq::Extensions::DelayedClass' && ((job_klass, job_method, args) = YAML.load(job.args[0])) && job_klass == klass && args[0] == id end.map(&:delete) end 

那个简单的客户端中间件怎么样?

 module Sidekiq class UniqueMiddleware def call(worker_class, msg, queue_name, redis_pool) if msg["unique"] queue = Sidekiq::Queue.new(queue_name) queue.each do |job| if job.klass == msg['class'] && job.args == msg['args'] return false end end end yield end end end 

只需注册它

  Sidekiq.configure_client do |config| config.client_middleware do |chain| chain.add Sidekiq::UniqueMiddleware end end 

然后在你的工作中设置unique: true在需要时在sidekiq_options中为unique: true

看看这个: https : //github.com/mhenrixon/sidekiq-unique-jobs

这是sidekiq添加了独特的工作

也许你可以使用Queue Classic将Postgres数据库中的作业排列(以一种非常开放的方式),因此可以扩展(开源)以检查唯一性,然后再这样做。

Interesting Posts