Skip to content

Commit

Permalink
Delay multiprocessing context creation
Browse files Browse the repository at this point in the history
  • Loading branch information
danijar committed Sep 17, 2024
1 parent f7a26d0 commit aaac518
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
2 changes: 1 addition & 1 deletion portal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '3.4.0'
__version__ = '3.4.1'

import multiprocessing as mp
try:
Expand Down
4 changes: 2 additions & 2 deletions portal/client_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ def _connect(self):
port = int(port)
addr = (host, port, 0, 0) if self.options.ipv6 else (host, port)
sock = self._create()
start = time.time()
error = None
try:
sock.settimeout(10)
Expand All @@ -214,9 +213,10 @@ def _connect(self):
def _create(self):
if self.options.ipv6:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TODO

after = self.options.keepalive_after
every = self.options.keepalive_every
Expand Down
7 changes: 5 additions & 2 deletions portal/contextlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ def __init__(self):
self.interval = 20
self.clientkw = {}
self.serverkw = {}
self.printlock = threading.Lock()
self.done = threading.Event()
self.watcher = None
self.mp = mp.get_context()
self.printlock = self.mp.Lock()

@property
def mp(self):
return mp.get_context('spawn')

def options(self):
return {
Expand Down
6 changes: 4 additions & 2 deletions portal/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,16 @@ def _loop(self):
data = job.result()
if job.postfn:
data, info = data
else:
job.active.release()
del info
data = packlib.pack(data)
status = int(0).to_bytes(8, 'little', signed=False)
self.socket.send(job.addr, job.reqnum, status, *data)
self.metrics['send'] += 1
except Exception as e:
self._error(job.addr, job.reqnum, 4, f'Error in server method: {e}')
finally:
if not job.postfn:
job.active.release()
if completed:
while self.postfn_inp and self.postfn_inp[0].done():
job = self.postfn_inp.popleft()
Expand Down
5 changes: 3 additions & 2 deletions portal/server_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ def __init__(self, port, name='Server', **kwargs):
self.options = Options(**{**contextlib.context.serverkw, **kwargs})
if self.options.ipv6:
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
self.addr = (self.options.host or '::', port, 0, 0)
else:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addr = (self.options.host or '0.0.0.0', port)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TODO
self._log(f'Binding to {self.addr[0]}:{self.addr[1]}')
self.sock.bind(self.addr)
self.sock.setblocking(False)
self.sock.listen()
self.sock.listen(8192)
self.sel = selectors.DefaultSelector()
self.sel.register(self.sock, selectors.EVENT_READ, data=None)
self._log(f'Listening at {self.addr[0]}:{self.addr[1]}')
Expand Down

0 comments on commit aaac518

Please sign in to comment.