diff --git a/perf/server_latency.py b/perf/server_latency.py index 2b11d7e..bdcdaf2 100644 --- a/perf/server_latency.py +++ b/perf/server_latency.py @@ -31,7 +31,11 @@ def client(port): ping = sum(durations) / len(durations) print(1000 * ping) # <1ms - portal.setup(host='localhost') + portal.setup( + host='localhost', + serverkw=dict(idle_sleep=0.0), + clientkw=dict(idle_sleep=0.0), + ) port = portal.free_port() portal.run([ portal.Process(server, port), diff --git a/perf/socket_latency.py b/perf/socket_latency.py index b201d1e..5bb3d90 100644 --- a/perf/socket_latency.py +++ b/perf/socket_latency.py @@ -29,7 +29,11 @@ def client(port): ping = sum(durations) / len(durations) print(1000 * ping) # <1ms - portal.setup(host='localhost') + portal.setup( + host='localhost', + serverkw=dict(idle_sleep=0.0), + clientkw=dict(idle_sleep=0.0), + ) port = portal.free_port() portal.run([ portal.Process(server, port), diff --git a/portal/__init__.py b/portal/__init__.py index f3f5c87..fd6705e 100644 --- a/portal/__init__.py +++ b/portal/__init__.py @@ -1,4 +1,4 @@ -__version__ = '3.4.3' +__version__ = '3.4.4' import multiprocessing as mp try: diff --git a/portal/client_socket.py b/portal/client_socket.py index a6daa82..997ee57 100644 --- a/portal/client_socket.py +++ b/portal/client_socket.py @@ -30,7 +30,7 @@ class Options: logging: bool = True logging_color: str = 'yellow' connect_wait: float = 0.1 - loop_sleep: float = 0.0 + idle_sleep: float = 0.0001 class ClientSocket: @@ -130,33 +130,36 @@ def _loop(self): try: - # TODO: According to the py-spy profiler, the GIL is held during - # polling. Is there a way to avoid that? + idle = True pairs = poll.poll(0.2) - if not pairs: - continue - _, mask = pairs[0] - - if mask & select.POLLIN: - try: - recvbuf.recv(sock) - if recvbuf.done(): - if self.recvq.qsize() > self.options.max_recv_queue: - raise RuntimeError('Too many incoming messages enqueued') - msg = recvbuf.result() - self.recvq.put(msg) - [x(msg) for x in self.callbacks_recv] - recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size) - except BlockingIOError: - pass - - if self.sendq and mask & select.POLLOUT: - try: - self.sendq[0].send(sock) - if self.sendq[0].done(): - self.sendq.popleft() - except BlockingIOError: - pass + if pairs: + _, mask = pairs[0] + + if mask & select.POLLIN: + try: + recvbuf.recv(sock) + idle = False + if recvbuf.done(): + if self.recvq.qsize() > self.options.max_recv_queue: + raise RuntimeError('Too many incoming messages enqueued') + msg = recvbuf.result() + self.recvq.put(msg) + [x(msg) for x in self.callbacks_recv] + recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size) + except BlockingIOError: + pass + + if self.sendq and mask & select.POLLOUT: + try: + self.sendq[0].send(sock) + idle = False + if self.sendq[0].done(): + self.sendq.popleft() + except BlockingIOError: + pass + + if idle and self.options.idle_sleep: + time.sleep(self.options.idle_sleep) except OSError as e: detail = f'{type(e).__name__}' @@ -176,9 +179,6 @@ def _loop(self): [x() for x in self.callbacks_disc] continue - if self.options.loop_sleep: - time.sleep(self.options.loop_sleep) - if sock: sock.close() diff --git a/portal/server_socket.py b/portal/server_socket.py index 5b8ff0a..15a5cab 100644 --- a/portal/server_socket.py +++ b/portal/server_socket.py @@ -32,7 +32,7 @@ class Options: max_send_queue: int = 4096 logging: bool = True logging_color: str = 'blue' - loop_sleep: float = 0.0 + idle_sleep: float = 0.0001 class ServerSocket: @@ -105,15 +105,16 @@ def close(self, timeout=None): def _loop(self): try: while self.running or self._numsending(): + idle = True writeable = [] - # TODO: According to the py-spy profiler, the GIL is held during - # polling. Is there a way to avoid that? for key, mask in self.sel.select(timeout=0.2): if key.data is None and self.reading: assert mask & selectors.EVENT_READ self._accept(key.fileobj) + idle = False elif mask & selectors.EVENT_READ and self.reading: self._recv(key.data) + idle = False elif mask & selectors.EVENT_WRITE: writeable.append(key.data) for conn in writeable: @@ -121,6 +122,7 @@ def _loop(self): continue try: conn.sendbufs[0].send(conn.sock) + idle = False if conn.sendbufs[0].done(): conn.sendbufs.popleft() except BlockingIOError: @@ -129,8 +131,8 @@ def _loop(self): # The client is gone but we may have buffered messages left to # read, so we keep the socket open until recv() fails. pass - if self.options.loop_sleep: - time.sleep(self.options.loop_sleep) + if idle and self.options.idle_sleep: + time.sleep(self.options.idle_sleep) except Exception as e: self.error = e