Skip to content

Commit

Permalink
Client socket performance optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
danijar committed Sep 5, 2024
1 parent 18222b0 commit 7d7dde8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion portal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '3.1.3'
__version__ = '3.1.4'

import multiprocessing as mp
try:
Expand Down
9 changes: 5 additions & 4 deletions portal/buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class RecvBuffer:

def __init__(self, maxsize):
self.maxsize = maxsize
self.lenbuf = bytearray(4)
self.lenbuf = b''
self.buffer = None
self.pos = 0

Expand All @@ -64,9 +64,10 @@ def __repr__(self):

def recv(self, sock):
if self.buffer is None:
size = sock.recv_into(memoryview(self.lenbuf)[self.pos:])
self.pos += max(0, size)
if self.pos == 4:
part = sock.recv(4 - len(self.lenbuf))
self.lenbuf += part
size = len(part)
if len(self.lenbuf) == 4:
length = int.from_bytes(self.lenbuf, 'little', signed=False)
assert 1 <= length <= self.maxsize, (1, length, self.maxsize)
# We use Numpy to allocate uninitialized memory because Python's
Expand Down
39 changes: 23 additions & 16 deletions portal/client_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,38 +111,44 @@ def _loop(self):
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
sel = selectors.DefaultSelector()
sock = None
isconn = False # Local mirror of self.isconn without the lock.

while self.running or (self.sendq and self.isconn.is_set()):
while self.running or (self.sendq and isconn):

if not self.isconn.is_set():
if not isconn:
if not self.options.autoconn and not self.wantconn.wait(timeout=0.2):
continue
sock = self._connect()
if not sock:
break
sel.register(sock, selectors.EVENT_READ | selectors.EVENT_WRITE)
self.isconn.set()
isconn = True
if not self.options.autoconn:
self.wantconn.clear()
[x() for x in self.callbacks_conn]

try:

sel.select(timeout=0.2)
ready = sel.select(timeout=0.2)
if not ready:
continue
_, mask = ready[0]

try:
recvbuf.recv(sock)
if recvbuf.done():
if self.recvq.qsize() > self.options.max_recv_queue:
raise RuntimeError('Too many incoming messages enqueued')
msg = recvbuf.result()
self.recvq.put(msg)
[x(msg) for x in self.callbacks_recv]
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
except BlockingIOError:
pass

if self.sendq:
if mask & selectors.EVENT_READ:
try:
recvbuf.recv(sock)
if recvbuf.done():
if self.recvq.qsize() > self.options.max_recv_queue:
raise RuntimeError('Too many incoming messages enqueued')
msg = recvbuf.result()
self.recvq.put(msg)
[x(msg) for x in self.callbacks_recv]
recvbuf = buffers.RecvBuffer(maxsize=self.options.max_msg_size)
except BlockingIOError:
pass

if self.sendq and mask & selectors.EVENT_WRITE:
try:
self.sendq[0].send(sock)
if self.sendq[0].done():
Expand All @@ -155,6 +161,7 @@ def _loop(self):
detail = f'{detail}: {e}' if str(e) else detail
self._log(f'Connection to server lost ({detail})')
self.isconn.clear()
isconn = False
sel.unregister(sock)
sock.close()
# Clear message queue on disconnect. There is no meaningful concept of
Expand Down

0 comments on commit 7d7dde8

Please sign in to comment.