使用sidekiq处理两个单独的redis实例?

下午好,

我有两个独立但相关的应用程序。 它们都应该有自己的后台队列(读取:单独的Sidekiq和Redis进程) 。 但是,我偶尔能够从app1将作业推送到app2的队列中。

从简单的队列/推送角度来看,如果app1没有现有的Sidekiq / Redis堆栈,那么很容易做到这一点:

 # In a process, far far away # Configure client Sidekiq.configure_client do |config| config.redis = { :url => 'redis://redis.example.com:7372/12', :namespace => 'mynamespace' } end # Push jobs without class definition Sidekiq::Client.push('class' => 'Example::Workers::Trace', 'args' => ['hello!']) # Push jobs overriding default's Sidekiq::Client.push('queue' => 'example', 'retry' => 3, 'class' => 'Example::Workers::Trace', 'args' => ['hello!']) 

但是考虑到我已经从app1调用了Sidekiq.configure_clientSidekiq.configure_server ,可能会在这里介入需要发生的事情。

显然,我可以直接从Sidekiq内部获取序列化和规范化代码,并手动推送到app2的redis队列,但这似乎是一个脆弱的解决方案。 我希望能够使用Client.pushfunction。

我想我的理想解决方案就像:

SidekiqTWO.configure_client { remote connection..... } SidekiqTWO::Client.push(job....)

甚至:

$redis_remote = remote_connection.....

Sidekiq::Client.push(job, $redis_remote)

显然有点滑稽,但那是我理想的用例。

谢谢!

所以有一点是根据FAQ ,“Sidekiq消息格式非常简单和稳定 :它只是一种JSON格式的哈希。” 强调我的 – 我不认为向sidekiq发送JSON太脆弱了。 特别是当你想要对你发送作业的Redis实例进行细粒度控制时,就像OP的情况一样,我可能只是写一个小包装器,让我指示一个Redis实例以及正在排队的作业。

对于Kevin Bedell更常见的情况来将作业循环到Redis实例中,我想你不想控制使用哪个Redis实例 – 你只想排队并自动管理分发。 看起来到目前为止只有一个人请求了这个 , 他们想出了一个使用Redis::Distributed 的解决方案 :

 datastore_config = YAML.load(ERB.new(File.read(File.join(Rails.root, "config", "redis.yml"))).result) datastore_config = datastore_config["defaults"].merge(datastore_config[::Rails.env]) if datastore_config[:host].is_a?(Array) if datastore_config[:host].length == 1 datastore_config[:host] = datastore_config[:host].first else datastore_config = datastore_config[:host].map do |host| host_has_port = host =~ /:\d+\z/ if host_has_port "redis://#{host}/#{datastore_config[:db] || 0}" else "redis://#{host}:#{datastore_config[:port] || 6379}/#{datastore_config[:db] || 0}" end end end end Sidekiq.configure_server do |config| config.redis = ::ConnectionPool.new(:size => Sidekiq.options[:concurrency] + 2, :timeout => 2) do redis = if datastore_config.is_a? Array Redis::Distributed.new(datastore_config) else Redis.new(datastore_config) end Redis::Namespace.new('resque', :redis => redis) end end 

为了获得高可用性和故障转移,需要考虑的另一件事是让Sidekiq Pro包含可靠性function:“Sidekiq Pro客户端可以承受瞬态Redis中断。它会在出错时将本地作业排入队列并尝试提供这些作业一旦恢复连接。“ 由于sidekiq无论如何都是用于后台进程,如果Redis实例出现故障,短暂延迟不应影响您的应用程序。 如果您的两个Redis实例中的一个出现故障并且您正在使用循环法,那么除非您使用此function,否则您仍然会丢失一些工作。

正如carols10cents所说的非常简单,但我总是喜欢封装这个function并能够在其他项目中重复使用它,我从Hotel Tonight的博客更新了一个想法。 以下解决方案改进了Hotel Tonight,它无法在Rails 4.1和Spring预加载器中生存。

目前我将以下文件添加到lib/remote_sidekiq/

remote_sidekiq.rb

 class RemoteSidekiq class_attribute :redis_pool end 

remote_sidekiq_worker.rb

 require 'sidekiq' require 'sidekiq/client' module RemoteSidekiqWorker def client pool = RemoteSidekiq.redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool Sidekiq::Client.new(pool) end def push(worker_name, attrs = [], queue_name = "default") client.push('args' => attrs, 'class' => worker_name, 'queue' => queue_name) end end 

您需要创建一个设置redis_pool的初始化程序

配置/初始化/ remote_sidekiq.rb

 url = ENV.fetch("REDISCLOUD_URL") namespace = 'primary' redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url)) RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis } 

然后,您可以根据需要随意include RemoteSidekiqWorker模块。 任务完成!

**** 更大的环境 ****

添加RemoteWorker模型可以增加额外的好处:

  1. 您可以在任何地方重复使用RemoteWorkers,包括可以访问目标sidekiq worker的系统。 这对来电者来说是透明的。 要在目标sidekiq系统中使用“RemoteWorkers”表单,只需不使用初始化程序,因为它默认使用本地Sidekiq客户端。
  2. 使用RemoteWorkers确保始终发送正确的参数(代码=文档)
  3. 通过创建更复杂的Sidekiq架构进行扩展对于调用者来说是透明的。

这是一个示例RemoteWorker

 class RemoteTraceWorker include RemoteSidekiqWorker include ActiveModel::Model attr_accessor :message validates :message, presence: true def perform_async if valid? push(worker_name, worker_args) else raise ActiveModel::StrictValidationFailed, errors.full_messages end end private def worker_name :TraceWorker.to_s end def worker_args [message] end end 

我遇到了这个并遇到了一些问题,因为我正在使用ActiveJob ,这会使消息从队列中读出变得复杂。

基于ARO的答案,您仍然需要redis_pool设置:

remote_sidekiq.rb

 class RemoteSidekiq class_attribute :redis_pool end 

配置/初始化/ remote_sidekiq.rb

 url = ENV.fetch("REDISCLOUD_URL") namespace = 'primary' redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url)) RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis } 

现在我们将创建一个ActiveJob适配器来排队请求,而不是工作者:

LIB / active_job / queue_adapters / remote_sidekiq_adapter.rb

 require 'sidekiq' module ActiveJob module QueueAdapters class RemoteSidekiqAdapter def enqueue(job) #Sidekiq::Client does not support symbols as keys job.provider_job_id = client.push \ "class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper, "wrapped" => job.class.to_s, "queue" => job.queue_name, "args" => [ job.serialize ] end def enqueue_at(job, timestamp) job.provider_job_id = client.push \ "class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper, "wrapped" => job.class.to_s, "queue" => job.queue_name, "args" => [ job.serialize ], "at" => timestamp end def client @client ||= ::Sidekiq::Client.new(RemoteSidekiq.redis_pool) end end end end 

我现在可以使用适配器对事件进行排队:

 require 'active_job/queue_adapters/remote_sidekiq_adapter' class RemoteJob < ActiveJob::Base self.queue_adapter = :remote_sidekiq queue_as :default def perform(_event_name, _data) fail " This job should not run here; intended to hook into ActiveJob and run in another system " end end 

我现在可以使用普通的ActiveJob api对作业进行排队。 无论什么应用程序从队列中读取此内容,都需要具有匹配的RemoteJob来执行操作。