无法使套接字接受非阻塞ruby 2.2

我一直在寻找socket接受非阻塞。 我发现recv没有阻塞,但无论如何这对我没有好处。 我的脚本首先启动一个新的套接字类。 它使用ip 127.0.0.1和端口6112绑定到客户端。然后它启动multithreading。 multithreading需要@sock.accept。 “那就是封锁。 然后我使用了accept_nonblock。 虽然,这会给我带来以下错误:

IO::EWOULDBLOCKWaitReadable : A non-blocking socket operation could not be completed immediately. - accept(2) would block 

我使用的是Ruby 2.2。 注意:我不打算使用Rails来解决我的问题,或者给我一个捷径。 我坚持使用纯Ruby 2.2。 这是我的脚本:

 require 'socket' include Socket::Constants @sock = Socket.new(AF_INET, SOCK_STREAM, 0) @sockaddr = Socket.sockaddr_in(6112, '127.0.0.1') @sock.bind(@sockaddr) @sock.listen(5) Thread.new(@sock.accept_nonblock) do |connection| @client = Client.new(ip, connection, self) @clients.push(@client) begin while connection packet = connection.recv(55555) if packet == nil DeleteClient(connection) else @toput = "[RECV]: #{packet}" puts @toput end end rescue Exception => e if e.class != IOError line1 = e.backtrace[0].split(".rb").last line = line1.split(":")[1] @Log.Error(e, e.class, e.backtrace[0].split(".rb").first + ".rb",line) puts "#{ e } (#{ e.class })" end end def DeleteClient(connection) @clients.delete(@client) connection.close end 

http://docs.ruby-lang.org/en/2.2.0/Socket.html#method-i-accept_nonblock

accept_nonblock在无法立即接受连接时引发exception。 您需要抢救此exception,然后IO.select插槽。

 begin # emulate blocking accept client_socket, client_addrinfo = socket.accept_nonblock rescue IO::WaitReadable, Errno::EINTR IO.select([socket]) retry end 

最近接受了一个补丁,它将添加一个exception: false accept_nonblock exception: false选项,允许您在不使用流控制exception的情况下使用它。 不过,我不知道它已经发货了。

我在这里,并发布了大量的代码。

我希望它能回答你的问题以及其他读这个答案的人可能提出的相关问题。

如果我过火了,我很抱歉,我只是觉得这几乎都是相关的。

循环事件堆栈,使用IO.selectIO.select方式推送事件以及其他性能问题等问题都与套接字编程的非阻塞概念有关(在我看来)。

所以我发布了一个ruby模块,它充当一个带有反应器的服务器,使用有限数量的线程,而不是每个连接的数千个线程(12个线程将提供比100个更好的性能)。 一旦处理IO.select所有活动事件,反应器就会使用IO.select方法并超时。

该模块可以设置多个使用#accept_nonblock侦听套接字,它们目前都充当回显服务器。

它基本上与我用于Plezi框架核心的代码相同……具有一些精简function。

以下是一个包​​含12个工作线程的线程池+主线程(将睡眠并等待"TERM"信号)…

…这是一个带有exception处理和线程池的accept_nonblock示例。

它是一个简单的套接字回送服务器,使用telnet将其作为远程客户端进行测试:

 > telnet localhost 3000 Hi! # => Hi! bye #=> will disconnect 

这是代码 – 祝你好运!

 require 'socket' module SmallServer module_function #### # Replace this method with your actual server logic. # # this code will be called when a socket recieves data. # # For now, we will just echo. def got_data io, io_params begin got = io.recv_nonblock( 1048576 ) # with maximum number of bytes to read at a time... puts "echoing: #{got}" if got.match /^(exit|bye|q)\R/ puts 'closing connection.' io.puts "bye bye!" remove_connection io else io.puts "echoing: #{got}" end rescue => e # should also log error remove_connection io end end ######### # main loop and activation code # # This will create a thread pool and set them running. def start # prepare threads exit_flag = false max_threads = 12 threads = [] thread_cycle = Proc.new do io_review rescue false true while fire_event end (max_threads).times { Thread.new { thread_cycle.call until exit_flag } } # set signal tarps trap('INT'){ exit_flag = true; raise "close!" } trap('TERM'){ exit_flag = true; raise "close!" } puts "Services running. Press ^C to stop" # sleep until trap raises exception (cycling might cause the main thread to loose signals that might be caught inside rescue clauses) (sleep unless SERVICES.empty?) rescue true # start shutdown. exit_flag = true # set fallback tarps trap('INT'){ puts 'Forced exit.'; Kernel.exit } trap('TERM'){ puts 'Forced exit.'; Kernel.exit } puts 'Started shutdown process. Press ^C to force quit.' # shut down listening sockets stop_services # disconnect active connections stop_connections # cycle down threads puts "Waiting for workers to cycle down" threads.each {|t| t.join if t.alive?} # rundown any active events thread_cycle.call end ####################### ## Events (Callbacks) / Multi-tasking Platform EVENTS = [] E_LOCKER = Mutex.new # returns true if there are any unhandled events def events? E_LOCKER.synchronize {!EVENTS.empty?} end # pushes an event to the event's stack # if a block is passed along, it will be used as a callback: the block will be called with the values returned by the handler's `call` method. def push_event handler, *args, &block if block E_LOCKER.synchronize {EVENTS << [(Proc.new {|a| push_event block, handler.call(*a)} ), args]} else E_LOCKER.synchronize {EVENTS << [handler, args]} end end # Runs the block asynchronously by pushing it as an event to the event's stack # def run_async *args, &block E_LOCKER.synchronize {EVENTS << [ block, args ]} if block !block.nil? end # creates an asynchronous call to a method, with an optional callback (shortcut) def callback object, method, *args, &block push_event object.method(method), *args, &block end # event handling FIFO def fire_event event = E_LOCKER.synchronize {EVENTS.shift} return false unless event begin event[0].call(*event[1]) rescue OpenSSL::SSL::SSLError => e puts "SSL Bump - SSL Certificate refused?" rescue Exception => e raise if e.is_a?(SignalException) || e.is_a?(SystemExit) error e end true end ##### # Reactor # # IO review code will review the connections and sockets # it will accept new connections and react to socket input IO_LOCKER = Mutex.new def io_review IO_LOCKER.synchronize do return false unless EVENTS.empty? united = SERVICES.keys + IO_CONNECTION_DIC.keys return false if united.empty? io_r = (IO.select(united, nil, united, 0.1) ) if io_r io_r[0].each do |io| if SERVICES[io] begin callback self, :add_connection, io.accept_nonblock, SERVICES[io] rescue Errno::EWOULDBLOCK => e rescue => e # log end elsif IO_CONNECTION_DIC[io] callback(self, :got_data, io, IO_CONNECTION_DIC[io] ) else puts "what?!" remove_connection(io) SERVICES.delete(io) end end io_r[2].each { |io| (remove_connection(io) || SERVICES.delete(io)).close rescue true } end end callback self, :clear_connections true end ####################### # IO - listening sockets (services) SERVICES = {} S_LOCKER = Mutex.new def add_service port = 3000, parameters = {} parameters[:port] ||= port parameters.update port if port.is_a?(Hash) service = TCPServer.new(parameters[:port]) S_LOCKER.synchronize {SERVICES[service] = parameters} callback Kernel, :puts, "Started listening on port #{port}." true end def stop_services puts 'Stopping services' S_LOCKER.synchronize {SERVICES.each {|s, p| (s.close rescue true); puts "Stoped listening on port #{p[:port]}"}; SERVICES.clear } end ##################### # IO - Active connections handling IO_CONNECTION_DIC = {} C_LOCKER = Mutex.new def stop_connections C_LOCKER.synchronize {IO_CONNECTION_DIC.each {|io, params| io.close rescue true} ; IO_CONNECTION_DIC.clear} end def add_connection io, more_data C_LOCKER.synchronize {IO_CONNECTION_DIC[io] = more_data} if io end def remove_connection io C_LOCKER.synchronize { IO_CONNECTION_DIC.delete io; io.close rescue true } end # clears closed connections from the stack def clear_connections C_LOCKER.synchronize { IO_CONNECTION_DIC.delete_if {|c| c.closed? } } end end 

irb启动echo服务器:

 SmallServer.add_service(3000) ; SmallServer.start