Skip to content

Commit

Permalink
Add idle sleep to reduce CPU load
Browse files Browse the repository at this point in the history
  • Loading branch information
danijar committed Sep 20, 2024
1 parent b54d44c commit cf6543e
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 38 deletions.
6 changes: 5 additions & 1 deletion perf/server_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def client(port):
ping = sum(durations) / len(durations)
print(1000 * ping) # <1ms

portal.setup(host='localhost')
portal.setup(
host='localhost',
serverkw=dict(idle_sleep=0.0),
clientkw=dict(idle_sleep=0.0),
)
port = portal.free_port()
portal.run([
portal.Process(server, port),
Expand Down
6 changes: 5 additions & 1 deletion perf/socket_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ def client(port):
ping = sum(durations) / len(durations)
print(1000 * ping) # <1ms

portal.setup(host='localhost')
portal.setup(
host='localhost',
serverkw=dict(idle_sleep=0.0),
clientkw=dict(idle_sleep=0.0),
)
port = portal.free_port()
portal.run([
portal.Process(server, port),
Expand Down
2 changes: 1 addition & 1 deletion portal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '3.4.3'
__version__ = '3.4.4'

import multiprocessing as mp
try:
Expand Down
60 changes: 30 additions & 30 deletions portal/client_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Options:
logging: bool = True
logging_color: str = 'yellow'
connect_wait: float = 0.1
loop_sleep: float = 0.0
idle_sleep: float = 0.0001


class ClientSocket:
Expand Down Expand Up @@ -130,33 +130,36 @@ def _loop(self):

try:

# TODO: According to the py-spy profiler, the GIL is held during
# polling. Is there a way to avoid that?
idle = True
pairs = poll.poll(0.2)
if not pairs:
continue
_, mask = pairs[0]

if mask & select.POLLIN:
try:
recvbuf.recv(sock)
if recvbuf.done():
if self.recvq.qsize() > self.options.max_recv_queue:
raise RuntimeError('Too many incoming messages enqueued')
msg = recvbuf.result()
self.recvq.put(msg)
[x(msg) for x in self.callbacks_recv]
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
except BlockingIOError:
pass

if self.sendq and mask & select.POLLOUT:
try:
self.sendq[0].send(sock)
if self.sendq[0].done():
self.sendq.popleft()
except BlockingIOError:
pass
if pairs:
_, mask = pairs[0]

if mask & select.POLLIN:
try:
recvbuf.recv(sock)
idle = False
if recvbuf.done():
if self.recvq.qsize() > self.options.max_recv_queue:
raise RuntimeError('Too many incoming messages enqueued')
msg = recvbuf.result()
self.recvq.put(msg)
[x(msg) for x in self.callbacks_recv]
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
except BlockingIOError:
pass

if self.sendq and mask & select.POLLOUT:
try:
self.sendq[0].send(sock)
idle = False
if self.sendq[0].done():
self.sendq.popleft()
except BlockingIOError:
pass

if idle and self.options.idle_sleep:
time.sleep(self.options.idle_sleep)

except OSError as e:
detail = f'{type(e).__name__}'
Expand All @@ -176,9 +179,6 @@ def _loop(self):
[x() for x in self.callbacks_disc]
continue

if self.options.loop_sleep:
time.sleep(self.options.loop_sleep)

if sock:
sock.close()

Expand Down
12 changes: 7 additions & 5 deletions portal/server_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Options:
max_send_queue: int = 4096
logging: bool = True
logging_color: str = 'blue'
loop_sleep: float = 0.0
idle_sleep: float = 0.0001


class ServerSocket:
Expand Down Expand Up @@ -105,22 +105,24 @@ def close(self, timeout=None):
def _loop(self):
try:
while self.running or self._numsending():
idle = True
writeable = []
# TODO: According to the py-spy profiler, the GIL is held during
# polling. Is there a way to avoid that?
for key, mask in self.sel.select(timeout=0.2):
if key.data is None and self.reading:
assert mask & selectors.EVENT_READ
self._accept(key.fileobj)
idle = False
elif mask & selectors.EVENT_READ and self.reading:
self._recv(key.data)
idle = False
elif mask & selectors.EVENT_WRITE:
writeable.append(key.data)
for conn in writeable:
if not conn.sendbufs:
continue
try:
conn.sendbufs[0].send(conn.sock)
idle = False
if conn.sendbufs[0].done():
conn.sendbufs.popleft()
except BlockingIOError:
Expand All @@ -129,8 +131,8 @@ def _loop(self):
# The client is gone but we may have buffered messages left to
# read, so we keep the socket open until recv() fails.
pass
if self.options.loop_sleep:
time.sleep(self.options.loop_sleep)
if idle and self.options.idle_sleep:
time.sleep(self.options.idle_sleep)
except Exception as e:
self.error = e

Expand Down

0 comments on commit cf6543e

Please sign in to comment.