diff --git a/perf/server_latency.py b/perf/server_latency.py index bdcdaf2..2b11d7e 100644 --- a/perf/server_latency.py +++ b/perf/server_latency.py @@ -31,11 +31,7 @@ def client(port): ping = sum(durations) / len(durations) print(1000 * ping) # <1ms - portal.setup( - host='localhost', - serverkw=dict(idle_sleep=0.0), - clientkw=dict(idle_sleep=0.0), - ) + portal.setup(host='localhost') port = portal.free_port() portal.run([ portal.Process(server, port), diff --git a/perf/socket_latency.py b/perf/socket_latency.py index 5bb3d90..b201d1e 100644 --- a/perf/socket_latency.py +++ b/perf/socket_latency.py @@ -29,11 +29,7 @@ def client(port): ping = sum(durations) / len(durations) print(1000 * ping) # <1ms - portal.setup( - host='localhost', - serverkw=dict(idle_sleep=0.0), - clientkw=dict(idle_sleep=0.0), - ) + portal.setup(host='localhost') port = portal.free_port() portal.run([ portal.Process(server, port), diff --git a/perf/socket_throughput.py b/perf/socket_throughput.py index ead5758..5b41648 100644 --- a/perf/socket_throughput.py +++ b/perf/socket_throughput.py @@ -9,7 +9,7 @@ def main(): size = 1024 ** 3 // 4 parts = 64 prefetch = 8 - twoway = True # False + twoway = False assert size % parts == 0 def server(port): diff --git a/portal/__init__.py b/portal/__init__.py index fd6705e..f07b1e5 100644 --- a/portal/__init__.py +++ b/portal/__init__.py @@ -1,4 +1,4 @@ -__version__ = '3.4.4' +__version__ = '3.5.0' import multiprocessing as mp try: diff --git a/portal/client_socket.py b/portal/client_socket.py index 997ee57..2187e06 100644 --- a/portal/client_socket.py +++ b/portal/client_socket.py @@ -1,5 +1,6 @@ import collections import dataclasses +import os import queue import select import socket @@ -30,7 +31,6 @@ class Options: logging: bool = True logging_color: str = 'yellow' connect_wait: float = 0.1 - idle_sleep: float = 0.0001 class ClientSocket: @@ -52,6 +52,7 @@ def __init__(self, addr, name='Client', start=True, **kwargs): self.wantconn = threading.Event() self.sendq = collections.deque() self.recvq = queue.Queue() + self.get_signal, self.set_signal = os.pipe() self.running = True self.thread = thread.Thread(self._loop, name=f'{name}Loop') @@ -76,6 +77,7 @@ def send(self, *data, timeout=None): self.require_connection(timeout) maxsize = self.options.max_msg_size self.sendq.append(buffers.SendBuffer(*data, maxsize=maxsize)) + os.write(self.set_signal, bytes(1)) def recv(self, timeout=None): assert self.running @@ -98,6 +100,8 @@ def close(self, timeout=None): self.running = False self.thread.join(timeout) self.thread.kill() + os.close(self.get_signal) + os.close(self.set_signal) def require_connection(self, timeout): if self.connected: @@ -111,7 +115,9 @@ def _loop(self): recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size) sock = None poll = select.poll() + poll.register(self.get_signal, select.POLLIN) isconn = False # Local mirror of self.isconn without the lock. + writing = False while self.running or (self.sendq and isconn): @@ -121,7 +127,7 @@ def _loop(self): sock = self._connect() if not sock: break - poll.register(sock, select.POLLIN | select.POLLOUT) + poll.register(sock, select.POLLIN) self.isconn.set() isconn = True if not self.options.autoconn: @@ -130,36 +136,37 @@ def _loop(self): try: - idle = True - pairs = poll.poll(0.2) - if pairs: - _, mask = pairs[0] - - if mask & select.POLLIN: - try: - recvbuf.recv(sock) - idle = False - if recvbuf.done(): - if self.recvq.qsize() > self.options.max_recv_queue: - raise RuntimeError('Too many incoming messages enqueued') - msg = recvbuf.result() - self.recvq.put(msg) - [x(msg) for x in self.callbacks_recv] - recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size) - except BlockingIOError: - pass - - if self.sendq and mask & select.POLLOUT: - try: - self.sendq[0].send(sock) - idle = False - if self.sendq[0].done(): - self.sendq.popleft() - except BlockingIOError: - pass - - if idle and self.options.idle_sleep: - time.sleep(self.options.idle_sleep) + if not writing: + fds = [fd for fd, _ in poll.poll(0.2)] + if self.get_signal in fds: + writing = True + os.read(self.get_signal, 1) + + try: + recvbuf.recv(sock) + if recvbuf.done(): + if self.recvq.qsize() > self.options.max_recv_queue: + raise RuntimeError('Too many incoming messages enqueued') + msg = recvbuf.result() + self.recvq.put(msg) + [x(msg) for x in self.callbacks_recv] + recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size) + except BlockingIOError: + pass + + if self.sendq: + try: + self.sendq[0].send(sock) + if self.sendq[0].done(): + self.sendq.popleft() + if not self.sendq: + writing = False + except BlockingIOError: + pass + except ConnectionResetError: + # The server is gone but we may have buffered messages left to + # read, so we keep the socket open until recv() fails. + pass except OSError as e: detail = f'{type(e).__name__}' diff --git a/portal/server_socket.py b/portal/server_socket.py index 15a5cab..de17bff 100644 --- a/portal/server_socket.py +++ b/portal/server_socket.py @@ -1,9 +1,9 @@ import collections import dataclasses +import os import queue import selectors import socket -import time from . import buffers from . import contextlib @@ -32,7 +32,6 @@ class Options: max_send_queue: int = 4096 logging: bool = True logging_color: str = 'blue' - idle_sleep: float = 0.0001 class ServerSocket: @@ -56,7 +55,9 @@ def __init__(self, port, name='Server', **kwargs): self.sock.bind(self.addr) self.sock.setblocking(False) self.sock.listen(8192) + self.get_signal, self.set_signal = os.pipe() self.sel = selectors.DefaultSelector() + self.sel.register(self.get_signal, selectors.EVENT_READ, data='signal') self.sel.register(self.sock, selectors.EVENT_READ, data=None) self._log(f'Listening at {self.addr[0]}:{self.addr[1]}') self.conns = {} @@ -89,6 +90,7 @@ def send(self, addr, *data): try: self.conns[addr].sendbufs.append( buffers.SendBuffer(*data, maxsize=maxsize)) + os.write(self.set_signal, bytes(1)) except KeyError: self._log('Dropping message to disconnected client') @@ -101,38 +103,38 @@ def close(self, timeout=None): [conn.sock.close() for conn in self.conns.values()] self.sock.close() self.sel.close() + os.close(self.get_signal) + os.close(self.set_signal) def _loop(self): + writing = False try: while self.running or self._numsending(): - idle = True - writeable = [] for key, mask in self.sel.select(timeout=0.2): - if key.data is None and self.reading: + if key.data == 'signal': + writing = True + os.read(self.get_signal, 1) + elif key.data is None and self.reading: assert mask & selectors.EVENT_READ self._accept(key.fileobj) - idle = False elif mask & selectors.EVENT_READ and self.reading: self._recv(key.data) - idle = False - elif mask & selectors.EVENT_WRITE: - writeable.append(key.data) - for conn in writeable: - if not conn.sendbufs: - continue + if not writing: + continue + pending = [conn for conn in self.conns.values() if conn.sendbufs] + for conn in pending: try: conn.sendbufs[0].send(conn.sock) - idle = False if conn.sendbufs[0].done(): conn.sendbufs.popleft() + if not any(conn.sendbufs for conn in pending): + writing = False except BlockingIOError: pass except ConnectionResetError: - # The client is gone but we may have buffered messages left to - # read, so we keep the socket open until recv() fails. + # The client is gone but we may have buffered messages left + # to read, so we keep the socket open until recv() fails. pass - if idle and self.options.idle_sleep: - time.sleep(self.options.idle_sleep) except Exception as e: self.error = e @@ -141,8 +143,7 @@ def _accept(self, sock): self._log(f'Accepted connection from {addr[0]}:{addr[1]}') sock.setblocking(False) conn = Connection(sock, addr) - self.sel.register( - sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=conn) + self.sel.register(sock, selectors.EVENT_READ, data=conn) self.conns[addr] = conn def _recv(self, conn):