无法使套接字接受非阻塞ruby 2.2

我一直在寻找socket接受非阻塞。 我发现recv没有阻塞,但无论如何这对我没有好处。 我的脚本首先启动一个新的套接字类。 它使用ip和端口6112绑定到客户端。然后它启动multithreading。 multithreading需要@sock.accept。 “那就是封锁。 然后我使用了accept_nonblock。 虽然,这会给我带来以下错误:

IO::EWOULDBLOCKWaitReadable : A non-blocking socket operation could not be completed immediately. - accept(2) would block 

我使用的是Ruby 2.2。 注意:我不打算使用Rails来解决我的问题,或者给我一个捷径。 我坚持使用纯Ruby 2.2。 这是我的脚本:

 require 'socket' include Socket::Constants @sock = Socket.new(AF_INET, SOCK_STREAM, 0) @sockaddr = Socket.sockaddr_in(6112, '') @sock.bind(@sockaddr) @sock.listen(5) Thread.new(@sock.accept_nonblock) do |connection| @client = Client.new(ip, connection, self) @clients.push(@client) begin while connection packet = connection.recv(55555) if packet == nil DeleteClient(connection) else @toput = "[RECV]: #{packet}" puts @toput end end rescue Exception => e if e.class != IOError line1 = e.backtrace[0].split(".rb").last line = line1.split(":")[1] @Log.Error(e, e.class, e.backtrace[0].split(".rb").first + ".rb",line) puts "#{ e } (#{ e.class })" end end def DeleteClient(connection) @clients.delete(@client) connection.close end 


accept_nonblock在无法立即接受连接时引发exception。 您需要抢救此exception,然后IO.select插槽。

 begin # emulate blocking accept client_socket, client_addrinfo = socket.accept_nonblock rescue IO::WaitReadable, Errno::EINTR IO.select([socket]) retry end 

最近接受了一个补丁,它将添加一个exception: false accept_nonblock exception: false选项,允许您在不使用流控制exception的情况下使用它。 不过,我不知道它已经发货了。





所以我发布了一个ruby模块,它充当一个带有反应器的服务器,使用有限数量的线程,而不是每个连接的数千个线程(12个线程将提供比100个更好的性能)。 一旦处理IO.select所有活动事件,反应器就会使用IO.select方法并超时。






 > telnet localhost 3000 Hi! # => Hi! bye #=> will disconnect 

这是代码 – 祝你好运!

 require 'socket' module SmallServer module_function #### # Replace this method with your actual server logic. # # this code will be called when a socket recieves data. # # For now, we will just echo. def got_data io, io_params begin got = io.recv_nonblock( 1048576 ) # with maximum number of bytes to read at a time... puts "echoing: #{got}" if got.match /^(exit|bye|q)\R/ puts 'closing connection.' io.puts "bye bye!" remove_connection io else io.puts "echoing: #{got}" end rescue => e # should also log error remove_connection io end end ######### # main loop and activation code # # This will create a thread pool and set them running. def start # prepare threads exit_flag = false max_threads = 12 threads = [] thread_cycle = Proc.new do io_review rescue false true while fire_event end (max_threads).times { Thread.new { thread_cycle.call until exit_flag } } # set signal tarps trap('INT'){ exit_flag = true; raise "close!" } trap('TERM'){ exit_flag = true; raise "close!" } puts "Services running. Press ^C to stop" # sleep until trap raises exception (cycling might cause the main thread to loose signals that might be caught inside rescue clauses) (sleep unless SERVICES.empty?) rescue true # start shutdown. exit_flag = true # set fallback tarps trap('INT'){ puts 'Forced exit.'; Kernel.exit } trap('TERM'){ puts 'Forced exit.'; Kernel.exit } puts 'Started shutdown process. Press ^C to force quit.' # shut down listening sockets stop_services # disconnect active connections stop_connections # cycle down threads puts "Waiting for workers to cycle down" threads.each {|t| t.join if t.alive?} # rundown any active events thread_cycle.call end ####################### ## Events (Callbacks) / Multi-tasking Platform EVENTS = [] E_LOCKER = Mutex.new # returns true if there are any unhandled events def events? E_LOCKER.synchronize {!EVENTS.empty?} end # pushes an event to the event's stack # if a block is passed along, it will be used as a callback: the block will be called with the values returned by the handler's `call` method. def push_event handler, *args, &block if block E_LOCKER.synchronize {EVENTS << [(Proc.new {|a| push_event block, handler.call(*a)} ), args]} else E_LOCKER.synchronize {EVENTS << [handler, args]} end end # Runs the block asynchronously by pushing it as an event to the event's stack # def run_async *args, &block E_LOCKER.synchronize {EVENTS << [ block, args ]} if block !block.nil? end # creates an asynchronous call to a method, with an optional callback (shortcut) def callback object, method, *args, &block push_event object.method(method), *args, &block end # event handling FIFO def fire_event event = E_LOCKER.synchronize {EVENTS.shift} return false unless event begin event[0].call(*event[1]) rescue OpenSSL::SSL::SSLError => e puts "SSL Bump - SSL Certificate refused?" rescue Exception => e raise if e.is_a?(SignalException) || e.is_a?(SystemExit) error e end true end ##### # Reactor # # IO review code will review the connections and sockets # it will accept new connections and react to socket input IO_LOCKER = Mutex.new def io_review IO_LOCKER.synchronize do return false unless EVENTS.empty? united = SERVICES.keys + IO_CONNECTION_DIC.keys return false if united.empty? io_r = (IO.select(united, nil, united, 0.1) ) if io_r io_r[0].each do |io| if SERVICES[io] begin callback self, :add_connection, io.accept_nonblock, SERVICES[io] rescue Errno::EWOULDBLOCK => e rescue => e # log end elsif IO_CONNECTION_DIC[io] callback(self, :got_data, io, IO_CONNECTION_DIC[io] ) else puts "what?!" remove_connection(io) SERVICES.delete(io) end end io_r[2].each { |io| (remove_connection(io) || SERVICES.delete(io)).close rescue true } end end callback self, :clear_connections true end ####################### # IO - listening sockets (services) SERVICES = {} S_LOCKER = Mutex.new def add_service port = 3000, parameters = {} parameters[:port] ||= port parameters.update port if port.is_a?(Hash) service = TCPServer.new(parameters[:port]) S_LOCKER.synchronize {SERVICES[service] = parameters} callback Kernel, :puts, "Started listening on port #{port}." true end def stop_services puts 'Stopping services' S_LOCKER.synchronize {SERVICES.each {|s, p| (s.close rescue true); puts "Stoped listening on port #{p[:port]}"}; SERVICES.clear } end ##################### # IO - Active connections handling IO_CONNECTION_DIC = {} C_LOCKER = Mutex.new def stop_connections C_LOCKER.synchronize {IO_CONNECTION_DIC.each {|io, params| io.close rescue true} ; IO_CONNECTION_DIC.clear} end def add_connection io, more_data C_LOCKER.synchronize {IO_CONNECTION_DIC[io] = more_data} if io end def remove_connection io C_LOCKER.synchronize { IO_CONNECTION_DIC.delete io; io.close rescue true } end # clears closed connections from the stack def clear_connections C_LOCKER.synchronize { IO_CONNECTION_DIC.delete_if {|c| c.closed? } } end end 


 SmallServer.add_service(3000) ; SmallServer.start