Skip to content

Commit

Permalink
Rename package and update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
danijar committed Sep 4, 2024
1 parent c9a28d0 commit aef425e
Show file tree
Hide file tree
Showing 31 changed files with 339 additions and 369 deletions.
104 changes: 38 additions & 66 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
[![PyPI](https://img.shields.io/pypi/v/zerofun.svg)](https://pypi.python.org/pypi/zerofun/#history)
[![PyPI](https://img.shields.io/pypi/v/portal.svg)](https://pypi.python.org/pypi/portal/#history)

# 🙅 Zerofun
# 🌀 Portal

Remote function calls for array data using [ZMQ](https://zeromq.org/).
Fast and reliable distributed systems in Python.

## Overview
## Features

Zerofun provides a `Server` that you can bind functions to and a `Client` that
can call the messages and receive their results. The function inputs and
results are both flat **dicts of Numpy arrays**. The data is sent efficiently
without serialization to maximize throughput.
- 📡 **Communication:** Portal lets you bind functions to a `Server` and call
them from a `Client`. Wait on results via `Future` objects. Clients can
automatically restore broken connections.
- 🚀 **Performance:** Optimized for throughput and latency. Array data is
zero-copy serialized and deserialized for throughput near the hardware limit.
- 🤸 **Flexibility:** Function inputs and outputs can be nested dicts and lists
of numbers, strings, bytes, None values, and Numpy arrays. Bytes allow
applications to chose their own serialization, such as `pickle`.
- 🚨 **Error handlings:** Provides `Process` and `Thread` objects that can
reliably be killed by the parent. Unhandled exceptions in threads stop
the program. Error files can be used to stop distributed systems.
- 📦 **Request batching:** Use `BatchServer` to collect multiple incoming
requests and process them at once, for example for AI inference servers.
Batching and dispatching happens in a separate process to free the GIL.
-**Correctness:** Covered by over 100 unit tests for common usage and edge
cases and used for large scale distributed AI systems.

## Installation

```sh
pip install zerofun
pip install portal
```

## Example
Expand All @@ -25,70 +37,30 @@ different machines.

```python
def server():
import zerofun
server = zerofun.Server('tcp://*:2222')
server.bind('add', lambda data: {'result': data['foo'] + data['bar']})
server.bind('msg', lambda data: print('Message from client:', data['msg']))
server.run()
import portal
server = portal.Server(2222)
server.bind('add', lambda x, y: x + y)
server.bind('greet', lambda msg: print('Message from client:', msg))
server.start()

def client():
import zerofun
client = zerofun.Client('tcp://localhost:2222')
client.connect()
future = client.add({'foo': 1, 'bar': 1})
import portal
client = portal.Client('localhost', 2222)
future = client.add(12, 42)
result = future.result()
print(result) # {'result': 2}
client.msg({'msg': 'Hello World'})
print(result) # 54
client.greet('Hello World')

if __name__ == '__main__':
import zerofun
server_proc = zerofun.Process(server, start=True)
client_proc = zerofun.Process(client, start=True)
import portal
server_proc = portal.Process(server, start=True)
client_proc = portal.Process(client, start=True)
client_proc.join()
server_proc.terminate()
server_proc.kill()
print('Done')
```

## Features

Several productivity and performance features are available:

- **Request batching:** The server can batch requests together so that the user
function receives a dict of stacked arrays and the function result will be
split and sent back to the corresponding clients.
- **Multithreading:** Servers can use a thread pool to process multiple
requests in parallel. Optionally, each function can also request its own
thread pool to allow functions to block (e.g. for rate limiting) without
blocking other functions.
- **Async clients:** Clients can send multiple overlapping requests and wait
on the results when needed using `Future` objects. The maximum number of
inflight requests can be limited to avoid requests building up when the
server is slower than the client.
- **Error handling:** Exceptions raised in server functions are reported to the
client and raised in `future.result()` or, if the user did not store the
future object, on the next request. Worker exception can also be reraised in
the server application using `server.check()`.
- **Heartbeating:** Clients can send ping requests when they have not received
a result from the server for a while, allowing to wait for results that take
a long time to compute without assuming connection loss.
- **Concurrency:** `Thread` and `Process` implementations with exception
forwarding that can be forcefully terminated by the parent, which Python
threads do not natively support. Stoppable threads and processes are also
available for coorperative shutdown.
- **GIL load reduction:** The `ProcServer` behaves just like the normal
`Server` but uses a background process to batch requests and fan out results,
substantially reducing GIL load for the server workers in the main process.

## FAQ

### Popup on MacOS

If you see a popup *"Do you want the application "Python.app" to accept
incoming network connections?"* you can either restrict your server sockets to
the local machine using `zerofun.setup(hostname='localhost')` or [code sign
your Python binary](https://stackoverflow.com/a/61462541) with the [deep
flag](https://superuser.com/a/1281683).

## Questions

Please open a [GitHub issue](https://github.com/danijar/zerofun/issues) for
each question. Over time, we will add common questions to the README.
Please open a separate [GitHub issue](https://github.com/danijar/portal/issues)
for each question.
32 changes: 15 additions & 17 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
# TODO

def server():
import zerofun
server = zerofun.Server('tcp://*:2222')
server.bind('add', lambda data: {'result': data['foo'] + data['bar']})
server.bind('msg', lambda data: print('Message from client:', data['msg']))
server.run()
import portal
server = portal.Server(2222)
server.bind('add', lambda x, y: x + y)
server.bind('greet', lambda msg: print('Message from client:', msg))
server.start()

def client():
import zerofun
client = zerofun.Client('tcp://localhost:2222')
client.connect()
future = client.add({'foo': 1, 'bar': 1})
import portal
client = portal.Client('localhost', 2222)
future = client.add(12, 42)
result = future.result()
print(result) # {'result': 2}
client.msg({'msg': 'Hello World'})
print(result) # 54
client.greet('Hello World')

if __name__ == '__main__':
import zerofun
server_proc = zerofun.Process(server, start=True)
client_proc = zerofun.Process(client, start=True)
import portal
server_proc = portal.Process(server, start=True)
client_proc = portal.Process(client, start=True)
client_proc.join()
server_proc.terminate()
server_proc.kill()
print('Done')
16 changes: 8 additions & 8 deletions perf/server_latency.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import collections
import time

import zerofun
import portal


def main():

size = 1024

def server(port):
server = zerofun.Server(port)
server = portal.Server(port)
def fn(x):
assert len(x) == size
return b'ok'
Expand All @@ -18,7 +18,7 @@ def fn(x):

def client(port):
data = bytearray(size)
client = zerofun.Client('localhost', port)
client = portal.Client('localhost', port)
futures = collections.deque()
durations = collections.deque(maxlen=50)
while True:
Expand All @@ -31,11 +31,11 @@ def client(port):
ping = sum(durations) / len(durations)
print(1000 * ping) # <1ms

zerofun.setup(hostname='localhost')
port = zerofun.free_port()
zerofun.run([
zerofun.Process(server, port),
zerofun.Process(client, port),
portal.setup(hostname='localhost')
port = portal.free_port()
portal.run([
portal.Process(server, port),
portal.Process(client, port),
])


Expand Down
22 changes: 11 additions & 11 deletions perf/server_throughput.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import collections
import time

import zerofun
import portal


def main():
Expand All @@ -11,10 +11,10 @@ def main():
twoway = False

def server(port):
server = zerofun.Server(port)
# server = zerofun.BatchServer(port)
# server = zerofun.BatchServer(port, process=False)
# server = zerofun.BatchServer(port, shmem=True)
server = portal.Server(port)
# server = portal.BatchServer(port)
# server = portal.BatchServer(port, process=False)
# server = portal.BatchServer(port, shmem=True)
def fn(x):
assert len(x) == size
return x if twoway else b'ok'
Expand All @@ -23,7 +23,7 @@ def fn(x):

def client(port):
data = bytearray(size)
client = zerofun.Client('localhost', port, maxinflight=prefetch + 1)
client = portal.Client('localhost', port, maxinflight=prefetch + 1)
futures = collections.deque()
for _ in range(prefetch):
futures.append(client.call('foo', data))
Expand All @@ -44,11 +44,11 @@ def client(port):
mbps *= 2 if twoway else 1
print(mbps) # 3700 oneway, 3000 twoway

zerofun.setup(hostname='localhost')
port = zerofun.free_port()
zerofun.run([
zerofun.Process(server, port),
zerofun.Process(client, port),
portal.setup(hostname='localhost')
port = portal.free_port()
portal.run([
portal.Process(server, port),
portal.Process(client, port),
])


Expand Down
16 changes: 8 additions & 8 deletions perf/socket_latency.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import collections
import time

import zerofun
import portal


def main():
Expand All @@ -10,15 +10,15 @@ def main():
parts = 32

def server(port):
server = zerofun.ServerSocket(port)
server = portal.ServerSocket(port)
while True:
addr, data = server.recv()
server.send(addr, b'ok')
assert len(data) == size // parts * parts

def client(port):
data = [bytearray(size // parts) for _ in range(parts)]
client = zerofun.ClientSocket('localhost', port)
client = portal.ClientSocket('localhost', port)
durations = collections.deque(maxlen=10)
while True:
start = time.perf_counter()
Expand All @@ -29,11 +29,11 @@ def client(port):
ping = sum(durations) / len(durations)
print(1000 * ping) # <1ms

zerofun.setup(hostname='localhost')
port = zerofun.free_port()
zerofun.run([
zerofun.Process(server, port),
zerofun.Process(client, port),
portal.setup(hostname='localhost')
port = portal.free_port()
portal.run([
portal.Process(server, port),
portal.Process(client, port),
])


Expand Down
22 changes: 11 additions & 11 deletions perf/socket_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import queue
import time

import zerofun
import portal


def main():
Expand All @@ -12,7 +12,7 @@ def main():
prefetch = 16

def server(port1):
server = zerofun.ServerSocket(port1)
server = portal.ServerSocket(port1)
durations = collections.deque(maxlen=50)
start = time.time()
while True:
Expand All @@ -27,8 +27,8 @@ def server(port1):
print(mbps) # ~3500

def proxy(port1, port2):
server = zerofun.ServerSocket(port2)
client = zerofun.ClientSocket('localhost', port1)
server = portal.ServerSocket(port2)
client = portal.ClientSocket('localhost', port1)
addrs = collections.deque()
while True:
try:
Expand All @@ -45,19 +45,19 @@ def proxy(port1, port2):

def client(port2):
data = [bytearray(size // parts) for _ in range(parts)]
client = zerofun.ClientSocket('localhost', port2)
client = portal.ClientSocket('localhost', port2)
for _ in range(prefetch):
client.send(*data)
while True:
client.send(*data)
assert client.recv() == b'ok'

port1 = zerofun.free_port()
port2 = zerofun.free_port()
zerofun.run([
zerofun.Process(server, port1),
zerofun.Process(proxy, port1, port2),
zerofun.Process(client, port2),
port1 = portal.free_port()
port2 = portal.free_port()
portal.run([
portal.Process(server, port1),
portal.Process(proxy, port1, port2),
portal.Process(client, port2),
])


Expand Down
Loading

0 comments on commit aef425e

Please sign in to comment.