From aaac5189e46637ff7b2707f35e2351944ad32180 Mon Sep 17 00:00:00 2001 From: Danijar Hafner Date: Tue, 17 Sep 2024 09:25:51 +0000 Subject: [PATCH] Delay multiprocessing context creation --- portal/__init__.py | 2 +- portal/client_socket.py | 4 ++-- portal/contextlib.py | 7 +++++-- portal/server.py | 6 ++++-- portal/server_socket.py | 5 +++-- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/portal/__init__.py b/portal/__init__.py index b4e53c3..3734bea 100644 --- a/portal/__init__.py +++ b/portal/__init__.py @@ -1,4 +1,4 @@ -__version__ = '3.4.0' +__version__ = '3.4.1' import multiprocessing as mp try: diff --git a/portal/client_socket.py b/portal/client_socket.py index 94812a6..df1d702 100644 --- a/portal/client_socket.py +++ b/portal/client_socket.py @@ -190,7 +190,6 @@ def _connect(self): port = int(port) addr = (host, port, 0, 0) if self.options.ipv6 else (host, port) sock = self._create() - start = time.time() error = None try: sock.settimeout(10) @@ -214,9 +213,10 @@ def _connect(self): def _create(self): if self.options.ipv6: sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TODO after = self.options.keepalive_after every = self.options.keepalive_every diff --git a/portal/contextlib.py b/portal/contextlib.py index f5100db..a3678b2 100644 --- a/portal/contextlib.py +++ b/portal/contextlib.py @@ -21,10 +21,13 @@ def __init__(self): self.interval = 20 self.clientkw = {} self.serverkw = {} + self.printlock = threading.Lock() self.done = threading.Event() self.watcher = None - self.mp = mp.get_context() - self.printlock = self.mp.Lock() + + @property + def mp(self): + return mp.get_context('spawn') def options(self): return { diff --git a/portal/server.py b/portal/server.py index b64aa85..45a977c 100644 --- a/portal/server.py +++ b/portal/server.py @@ -124,14 +124,16 @@ def _loop(self): data = job.result() if job.postfn: data, info = data - else: - job.active.release() + del info data = packlib.pack(data) status = int(0).to_bytes(8, 'little', signed=False) self.socket.send(job.addr, job.reqnum, status, *data) self.metrics['send'] += 1 except Exception as e: self._error(job.addr, job.reqnum, 4, f'Error in server method: {e}') + finally: + if not job.postfn: + job.active.release() if completed: while self.postfn_inp and self.postfn_inp[0].done(): job = self.postfn_inp.popleft() diff --git a/portal/server_socket.py b/portal/server_socket.py index 8b21b11..75930c1 100644 --- a/portal/server_socket.py +++ b/portal/server_socket.py @@ -43,16 +43,17 @@ def __init__(self, port, name='Server', **kwargs): self.options = Options(**{**contextlib.context.serverkw, **kwargs}) if self.options.ipv6: self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) self.addr = (self.options.host or '::', port, 0, 0) else: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.addr = (self.options.host or '0.0.0.0', port) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - # self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TODO self._log(f'Binding to {self.addr[0]}:{self.addr[1]}') self.sock.bind(self.addr) self.sock.setblocking(False) - self.sock.listen() + self.sock.listen(8192) self.sel = selectors.DefaultSelector() self.sel.register(self.sock, selectors.EVENT_READ, data=None) self._log(f'Listening at {self.addr[0]}:{self.addr[1]}')