如何处理ZeroMQ + Ruby中的线程问题?

阅读关于线程安全的ZeroMQ常见问题时偶然发现。

我的multithreading程序在ZeroMQ库中的奇怪位置不断崩溃。 我究竟做错了什么?

ZeroMQ套接字不是线程安全的。 “指南”中对此进行了详细介绍。

简短版本是不应在线程之间共享套接字。 我们建议为每个线程创建一个专用套接字。

对于每个线程的专用套接字不可行的情况,当且仅当每个线程在访问套接字之前执行完整的内存屏障时,才可以共享套接字。 大多数语言都支持Mutex或Spinlock,它将代表您执行完整的内存屏障。

我的multithreading程序在ZeroMQ库中的奇怪位置不断崩溃。
我究竟做错了什么?

以下是我的以下代码:

Celluloid::ZMQ.init module Scp module DataStore class DataSocket include Celluloid::ZMQ def pull_socket(socket) @read_socket = Socket::Pull.new.tap do |read_socket| ## IPC socket read_socket.connect(socket) end end def push_socket(socket) @write_socket = Socket::Push.new.tap do |write_socket| ## IPC socket write_socket.connect(socket) end end def run pull_socket and push_socket and loopify! end def loopify! loop { async.evaluate_response(read_socket.read_multipart) } end def evaluate_response(data) return_response(message_id,routing,Parser.parser(data)) end def return_response(message_id,routing,object) data = object.to_response write_socket.send([message_id,routing,data]) end end end end DataSocket.new.run 

现在,有几件我不清楚的事情:

1)假设async生成一个新Thread (每次)并且write_socket在所有线程之间共享,而ZeroMQ表示它们的套接字不是线程安全的。 我当然看到write_socket运行到线程安全问题。
(顺便说一句,到目前为止,在所有端到端测试中都没有遇到过这个问题。)

问题1 :我的理解是否正确?

为了解决这个问题,ZeroMQ要求我们使用Mutex,Semaphore实现这一目标。

这导致问题2

2)上下文切换。

鉴于线程应用程序可以随时切换上下文。 查看ffi-rzmq代码Celluloid::ZMQ .send()内部调用send_strings() ,内部调用send_multiple()

问题2:上下文切换可以在任何地方发生(甚至在关键部分)(这里)[ https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L510 ]

这也可能导致数据排序问题。

我的以下观察是否正确?

注意:

 Operating system ( MacOS, Linux and CentOS ) Ruby - MRI 2.2.2/2.3.0 

没有人应该把应用程序的稳健性放在薄薄的冰上

原谅这个故事是一个相当长的阅读,但作者的终身经验表明, 为什么比任何少数SLOC(可能是可疑的或神秘的或根本原因无知的)试图通过实验找到

初步说明

虽然ZeroMQ几十年来一直被推广为零共享(零阻塞,(几乎) – 零延迟和更多设计准则。阅读优缺点的最佳地点是Pieter HINTJENS的书籍,而不仅仅是神话般的“Code Connected,第1卷”,以及真正的社交领域的高级设计和工程)哲学,最近的API文档引入并宣传了一些恕我直言的function,放松了与分布式计算的这些基石原则的关系,Zero-Sharing上没有那么尖锐的哨声如此响亮。 这就是说,我仍然是一个零分享的家伙,所以请从这个角度看这篇文章的其余部分。

答案1:
不,先生。 – 或者更好 – 是和否,先生。

ZeroMQ不要求人们使用Mutex / Semaphore障碍。 这与ZeroMQ设计格言相矛盾。

是的,最近的API更改开始提到(在某些附加条件下)可能会开始使用共享套接字……有(许多)其他措施……所以暗示相反。 如果一个人“想要”,那个人还会采取所有额外的步骤和措施(并支付所有最初隐藏的设计和实施成本,以“允许”共享玩具(希望)在与其他人的主要(非必要)战斗中生存无法控制的分布式系统环境 – 因此突然也承担了失败的风险(由于许多明智的原因而不是初始ZeroMQ零共享福音传播的情况) – 因此,用户决定走哪条路。这是公平的。)

声音和强大的设计根据最初的ZeroMQ API和传福音,恕我直言仍然有更好的发展,其中零共享是一个原则。

答案2:
对于ZeroMQ数据流排序,设计总是存在一个主要的不确定性,其中一个ZeroMQ设计 – 格言使设计者不再依赖于对消息排序的不支持的假设以及许多其他假设(例外情况适用)。 可以肯定的是,分配到ZeroMQ基础架构的任何消息都是作为完整消息传递的,或者根本不传递。 因此,人们可以确定这样一个事实,即交付时不会出现零碎的残骸。 有关详细信息,请阅读以下内容。


ThreadId没有任何证据(除非使用了inproc传输类)

鉴于ZeroMQ数据泵浦引擎的内部设计,实例化了一个
zmq.Context( number_of_IO_threads )决定生成多少个线程来处理未来的数据流。 这可能是{0,1:default,2,..}的任何地方,几乎耗尽了内核修复的最大线程数。 0的值给出了一个合理的选择,在不浪费资源的情况下,其中inproc:// transport-class实际上是一个直接内存区域映射处理数据流(实际上从来没有流动直接固定到登陆 -接收套接字抽象的填充:o))并且此类作业不需要任何线程。
接下来, .setsockopt( zmq.AFFINITY, )允许微调与数据相关的IO-“液压”,以便优先处理,负载平衡,性能调整线程 -加载到zmq.Context()zmq.Context()的IO线程的枚举池中,并从上面列出的设计和数据流操作方面的更好和最佳设置中获益。


基石元素是Context() s’实例,
不是Socket()

一旦Context()的实例被实例化和配置(参见上面的原因和方法),它(几乎)可以自由共享(如果设计无法抵抗共享或需要避免设置完全成熟的分布式计算基础设施)。

换句话说, 大脑总是在zmq.Context()的实例中 – 所有与套接字相关的dFSA引擎都在那里设置/配置/操作(是的,即使语法是.setsockopt(...)这种效果在大脑内部实现 – 在相应的zmq.Context – 而不是在从A到B的某些线路中。

最好不要共享 (即使API-4.2.2 +承诺你也可以)

到目前为止,人们可能已经看到了很多代码片段,其中ZeroMQ Context和它的套接字被实例化并快速处理掉,只连续服务几个SLOC-s,但是 – 这并不意味着,这样的实践是明智的或通过任何其他需要进行调整,而不是一个非常学术的例子(由于图书出版商的政策,仅需要在尽可能少的SLOC中打印)。

即使在这种情况下,也应该存在关于zmq.Context基础设施设置/拆除的巨大成本的公平警告,因此为了避免任何泛化,这种代码的任何复制/粘贴副本的使用就越少,仅仅是用于短时间的出于说明的目的。

想象一下为任何单个Context实例发生的现实设置 – 准备好各自的dFSA引擎池,维护它们各自的配置设置以及所有与套接字端点池相关的传输类特定硬件+外部O / S服务处理程序,循环事件扫描程序,缓冲区内存池分配+动态分配器等等。这都需要时间和O / S资源,因此明智地处理这些(自然)成本如果性能不受影响,则需要考虑调整后的管理费用。

如果仍然有疑问为什么要提这个,想象一下,如果有人坚持在发送数据包之后立即拆除所有LAN电缆,并且需要等到需要发送下一个数据包之前安装新的电缆出现。 希望这个“合理实例化”视图现在可以被更好地识别,并且可以分享(如果有的话) zmq.Context() -instance(s),而不需要再尝试共享ZeroMQ套接字实例(即使新成为(几乎)线程安全的本身)。

如果将ZeroMQ理念视为高性能分布式计算基础架构的高级设计传播,那么它就是强大的。 仅调整一个(次要)方面通常不会调整所有的努力和成本,就像如何设计安全和高性能系统的全局观点一样,结果不会更好一点(甚至是绝对可分享的风险 – 免费(如果可能的话)套接字实例不会改变这一点,而声音设计,清洁代码和合理可行的测试能力和调试的所有好处将会丢失)如果这一个细节发生变化 – 所以而是将现有大脑中的另一根电线拉到这样一个新线程,或者装备一个新线程,用它自己的大脑,在本地处理它的资源,并允许它将自己的电线连接回所有其他大脑 – 根据需要进行通信 – 在分布式系统中)。

如果还有疑问的话,试着想象一下,如果你们在比赛期间只分享一个单曲棍球棒,那么你的全国奥运会曲棍球队会发生什么。 或者你想怎么样,如果你家乡的所有邻居都会分享同一个电话号码来接听所有来电(是的,同时拨打所有电话和手机,共享同一号码)。 这有多好?


语言绑定无需反映所有可用的APIfunction

在这里,人们可以提出并且在某些情况下是正确的,并非所有ZeroMQ语言绑定或所有流行的框架包装器都将所有API细节都暴露给用户进行应用程序级编程(这篇文章的作者已经挣扎了很长时间有这样的遗留冲突,这仍然是无法解决的,并且不得不为了找到任何可行的方法来解决这个问题而抓挠头脑 – 所以它(几乎)总是可行的


结语:

值得注意的是,ZeroMQ API 4.2.2+的最新版本开始悄悄传播福音传播原则。

尽管如此,值得记住忧郁的纪念品

(重点补充,不是大写)

线程安全

ØMQ 既有线程安全套接字类型,也没有线程安全套接字类型。 除了将套接字从一个线程迁移到另一个具有“全栅栏”内存屏障的线程之外,应用程序绝不能使用来自多个线程的非线程安全套接字。

以下是线程安全套接字:* ZMQ_CLIENT * ZMQ_SERVER * ZMQ_DISH * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER

虽然这篇文章可能听起来很有希望,但在设计高性能分布式计算系统时,人们可以做的最糟糕的事情就是设置服务障碍。

人们希望看到的最后一件事就是阻止一个人自己的代码,因为这样的代理进入了一个主要无法控制的阻塞状态,在这种状态下,没有人可以跟踪它(内部代理本身也不是外部的任何人),如果远程代理从不提供正常预期的事件(在分布式系统中可能由于很多原因或在很多情况下不在一个人的控制范围内)。

构建一个易于自我挂起的系统(具有支持(但天真使用)语法可能性的广泛微笑)确实没什么好开心的,不是一个严肃的设计工作。

在这里,人们也不会感到惊讶,许多额外的(最初不可见的)限制适用于使用共享新动作 – {hockey-stick | 电话} API:

ZMQ_CLIENT套接字线程安全的。 它们不接受发送时的ZMQ_SNDMORE选项而不接受 ZMQ_SNDMORE这将它们限制为单个零件数据 。 目的是扩展API以允许分散/收集多部分数据。

C / A

Celluloid::ZMQ在其支持的套接字类型的部分中没有报告任何这些新的API-(共享几乎宽容的Celluloid::ZMQ )套接字类型因此没有预期的先验和Celluloid::ZMQ主活动似乎没有好消息在2015年已经逐渐消失,因此从这个角落来看,预期应该是现实的。

这就是说,通知背后可能会发现一个有趣的观点:

在您使用Celluloid::ZMQ构建自己的分布式赛璐珞系统Celluloid::ZMQ ,请务必先看看DCell并确定它是否适合您的目的。


最后但同样重要的是,在另一个事件循环中组合事件循环系统是一项痛苦的工作。 试图将嵌入式硬实时系统集成到另一个硬实时系统中甚至可以在数学上certificate自己是不可能的。

类似地,使用另一个基于代理的组件构建多代理系统会带来其他类型的冲突和竞争条件,如果遇到相同的资源,则可以利用这些资源(无论是有意识的还是“只是”某些function性副作用)(多个)基于代理的框架。

不可挽回的相互死锁只是这些碰撞中的一种,它引发了一些不知不觉的设计尝试中最初未见过的麻烦。 单一代理系统设计之外的第一步使得人们失去了更多的保证,这些保证在进入多智能体(分布式)之前就没有被注意到,因此开放思想并准备好学习许多“新”概念和注意力关于需要仔细观察并努力避免的许多新问题是非常重要的先决条件,以免(无意识地)引入模式,现在实际上是分布式系统(多智能体)域中的反模式。

至少
你被警告了
:O)

这个答案对你的问题不是一个好的解决方案,并且肯定与user3666197建议的一致。 我认为这种解决方案有可能发挥作用,但是由于互斥拥塞,可能会出现大规模的性能成本。

问题1:假设异步生成新线程(每次)和write_socket在所有线程之间共享,并且zeromq说它们的套接字没有线程安全。 我当然看到write_socket运行到线程安全问题。 (顺便说一下到目前为止,在所有端到端测试中都没有遇到过这个问题。)我的理解是否正确?

根据我对文档的理解,是的,这可能是一个问题,因为套接字不是线程安全的。 即使您没有遇到此问题,也可能会在以后弹出。

问题2:上下文切换可以在任何地方发生(甚至在关键部分)

是的,所以我们可以解决这个问题的一种方法是使用互斥锁/信号量来确保我们没有在错误的时间发生上下文切换。

我会做这样的事情,但根据所调用的方法不是线程安全的,可能会有更好的方法:

 Celluloid::ZMQ.init module Scp module DataStore class DataSocket include Celluloid::ZMQ def initialize @mutex = Mutex.new end def pull_socket(socket) Thread.new do @mutex.synchronize do @read_socket = Socket::Pull.new.tap do |read_socket| ## IPC socket read_socket.connect(socket) end end end.join end def push_socket(socket) Thread.new do @mutex.synchronize do @write_socket = Socket::Push.new.tap do |write_socket| ## IPC socket write_socket.connect(socket) end end end.join end def run # Missing socket arguments here pull_socket and push_socket and loopify! end def loopify! Thread.new do @mutex.synchronize do loop { async.evaluate_response(read_socket.read_multipart) } end end.join end def evaluate_response(data) return_response(message_id,routing,Parser.parser(data)) end def return_response(message_id,routing,object) data = object.to_response write_socket.send([message_id,routing,data]) end end end end DataSocket.new.run