Skip to content

Commit

Permalink
Optimize CPU and GIL usage
Browse files Browse the repository at this point in the history
  • Loading branch information
danijar committed Sep 21, 2024
1 parent cf6543e commit 7c50a21
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 63 deletions.
6 changes: 1 addition & 5 deletions perf/server_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ def client(port):
ping = sum(durations) / len(durations)
print(1000 * ping) # <1ms

portal.setup(
host='localhost',
serverkw=dict(idle_sleep=0.0),
clientkw=dict(idle_sleep=0.0),
)
portal.setup(host='localhost')
port = portal.free_port()
portal.run([
portal.Process(server, port),
Expand Down
6 changes: 1 addition & 5 deletions perf/socket_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ def client(port):
ping = sum(durations) / len(durations)
print(1000 * ping) # <1ms

portal.setup(
host='localhost',
serverkw=dict(idle_sleep=0.0),
clientkw=dict(idle_sleep=0.0),
)
portal.setup(host='localhost')
port = portal.free_port()
portal.run([
portal.Process(server, port),
Expand Down
2 changes: 1 addition & 1 deletion perf/socket_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def main():
size = 1024 ** 3 // 4
parts = 64
prefetch = 8
twoway = True # False
twoway = False
assert size % parts == 0

def server(port):
Expand Down
2 changes: 1 addition & 1 deletion portal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '3.4.4'
__version__ = '3.5.0'

import multiprocessing as mp
try:
Expand Down
71 changes: 39 additions & 32 deletions portal/client_socket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import dataclasses
import os
import queue
import select
import socket
Expand Down Expand Up @@ -30,7 +31,6 @@ class Options:
logging: bool = True
logging_color: str = 'yellow'
connect_wait: float = 0.1
idle_sleep: float = 0.0001


class ClientSocket:
Expand All @@ -52,6 +52,7 @@ def __init__(self, addr, name='Client', start=True, **kwargs):
self.wantconn = threading.Event()
self.sendq = collections.deque()
self.recvq = queue.Queue()
self.get_signal, self.set_signal = os.pipe()

self.running = True
self.thread = thread.Thread(self._loop, name=f'{name}Loop')
Expand All @@ -76,6 +77,7 @@ def send(self, *data, timeout=None):
self.require_connection(timeout)
maxsize = self.options.max_msg_size
self.sendq.append(buffers.SendBuffer(*data, maxsize=maxsize))
os.write(self.set_signal, bytes(1))

def recv(self, timeout=None):
assert self.running
Expand All @@ -98,6 +100,8 @@ def close(self, timeout=None):
self.running = False
self.thread.join(timeout)
self.thread.kill()
os.close(self.get_signal)
os.close(self.set_signal)

def require_connection(self, timeout):
if self.connected:
Expand All @@ -111,7 +115,9 @@ def _loop(self):
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
sock = None
poll = select.poll()
poll.register(self.get_signal, select.POLLIN)
isconn = False # Local mirror of self.isconn without the lock.
writing = False

while self.running or (self.sendq and isconn):

Expand All @@ -121,7 +127,7 @@ def _loop(self):
sock = self._connect()
if not sock:
break
poll.register(sock, select.POLLIN | select.POLLOUT)
poll.register(sock, select.POLLIN)
self.isconn.set()
isconn = True
if not self.options.autoconn:
Expand All @@ -130,36 +136,37 @@ def _loop(self):

try:

idle = True
pairs = poll.poll(0.2)
if pairs:
_, mask = pairs[0]

if mask & select.POLLIN:
try:
recvbuf.recv(sock)
idle = False
if recvbuf.done():
if self.recvq.qsize() > self.options.max_recv_queue:
raise RuntimeError('Too many incoming messages enqueued')
msg = recvbuf.result()
self.recvq.put(msg)
[x(msg) for x in self.callbacks_recv]
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
except BlockingIOError:
pass

if self.sendq and mask & select.POLLOUT:
try:
self.sendq[0].send(sock)
idle = False
if self.sendq[0].done():
self.sendq.popleft()
except BlockingIOError:
pass

if idle and self.options.idle_sleep:
time.sleep(self.options.idle_sleep)
if not writing:
fds = [fd for fd, _ in poll.poll(0.2)]
if self.get_signal in fds:
writing = True
os.read(self.get_signal, 1)

try:
recvbuf.recv(sock)
if recvbuf.done():
if self.recvq.qsize() > self.options.max_recv_queue:
raise RuntimeError('Too many incoming messages enqueued')
msg = recvbuf.result()
self.recvq.put(msg)
[x(msg) for x in self.callbacks_recv]
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
except BlockingIOError:
pass

if self.sendq:
try:
self.sendq[0].send(sock)
if self.sendq[0].done():
self.sendq.popleft()
if not self.sendq:
writing = False
except BlockingIOError:
pass
except ConnectionResetError:
# The server is gone but we may have buffered messages left to
# read, so we keep the socket open until recv() fails.
pass

except OSError as e:
detail = f'{type(e).__name__}'
Expand Down
39 changes: 20 additions & 19 deletions portal/server_socket.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import collections
import dataclasses
import os
import queue
import selectors
import socket
import time

from . import buffers
from . import contextlib
Expand Down Expand Up @@ -32,7 +32,6 @@ class Options:
max_send_queue: int = 4096
logging: bool = True
logging_color: str = 'blue'
idle_sleep: float = 0.0001


class ServerSocket:
Expand All @@ -56,7 +55,9 @@ def __init__(self, port, name='Server', **kwargs):
self.sock.bind(self.addr)
self.sock.setblocking(False)
self.sock.listen(8192)
self.get_signal, self.set_signal = os.pipe()
self.sel = selectors.DefaultSelector()
self.sel.register(self.get_signal, selectors.EVENT_READ, data='signal')
self.sel.register(self.sock, selectors.EVENT_READ, data=None)
self._log(f'Listening at {self.addr[0]}:{self.addr[1]}')
self.conns = {}
Expand Down Expand Up @@ -89,6 +90,7 @@ def send(self, addr, *data):
try:
self.conns[addr].sendbufs.append(
buffers.SendBuffer(*data, maxsize=maxsize))
os.write(self.set_signal, bytes(1))
except KeyError:
self._log('Dropping message to disconnected client')

Expand All @@ -101,38 +103,38 @@ def close(self, timeout=None):
[conn.sock.close() for conn in self.conns.values()]
self.sock.close()
self.sel.close()
os.close(self.get_signal)
os.close(self.set_signal)

def _loop(self):
writing = False
try:
while self.running or self._numsending():
idle = True
writeable = []
for key, mask in self.sel.select(timeout=0.2):
if key.data is None and self.reading:
if key.data == 'signal':
writing = True
os.read(self.get_signal, 1)
elif key.data is None and self.reading:
assert mask & selectors.EVENT_READ
self._accept(key.fileobj)
idle = False
elif mask & selectors.EVENT_READ and self.reading:
self._recv(key.data)
idle = False
elif mask & selectors.EVENT_WRITE:
writeable.append(key.data)
for conn in writeable:
if not conn.sendbufs:
continue
if not writing:
continue
pending = [conn for conn in self.conns.values() if conn.sendbufs]
for conn in pending:
try:
conn.sendbufs[0].send(conn.sock)
idle = False
if conn.sendbufs[0].done():
conn.sendbufs.popleft()
if not any(conn.sendbufs for conn in pending):
writing = False
except BlockingIOError:
pass
except ConnectionResetError:
# The client is gone but we may have buffered messages left to
# read, so we keep the socket open until recv() fails.
# The client is gone but we may have buffered messages left
# to read, so we keep the socket open until recv() fails.
pass
if idle and self.options.idle_sleep:
time.sleep(self.options.idle_sleep)
except Exception as e:
self.error = e

Expand All @@ -141,8 +143,7 @@ def _accept(self, sock):
self._log(f'Accepted connection from {addr[0]}:{addr[1]}')
sock.setblocking(False)
conn = Connection(sock, addr)
self.sel.register(
sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=conn)
self.sel.register(sock, selectors.EVENT_READ, data=conn)
self.conns[addr] = conn

def _recv(self, conn):
Expand Down

0 comments on commit 7c50a21

Please sign in to comment.