From 5fed04ec911967495b7618b3e9486dea8946185b Mon Sep 17 00:00:00 2001 From: Brett Date: Sat, 2 Mar 2024 16:48:30 +1000 Subject: [PATCH] Add async listener --- README.md | 23 ++++++++++------------ setup.py | 2 +- teslemetry_stream/__init__.py | 37 ++++++++++++++++++++++++++++------- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index ef9c238..b22a598 100644 --- a/README.md +++ b/README.md @@ -11,17 +11,13 @@ The TeslemetryStream class requires: - session: an aiohttp.ClientSession - access_token: an access token from the [Teslemetry console](https://teslemetry.com/console) -- vin: your Tesla's Vehicle Identification Number - -The TeslemetryStream instance can then be configured with: -- `add_field(field, interval)` -- `remove_field(field)` -- `add_alert(alert)` -- `remove_alert(alert)` +- One or both: + - vin: your Tesla's Vehicle Identification Number + - server: The Teslemetry server to connect to The full list of possible values are provided in `TeslemetryStream.Fields` and `TeslemetryStream.Alerts` -To connect, either use `async with` on the instance, call `connect()`, or register an async callback on `listen()`, which will connect automatically. +To connect, either use `async with` on the instance, call `connect()`, or register an callback with `async_add_listener`, which will connect when added and disconnect when removed. Using `connect()` or `listen()` will require you to close the session manually using `close()`. @@ -37,11 +33,12 @@ async def main(): ) await stream.connect() - async def callback(event): - print(event) + def callback(event): + print(event["data"]) - asyncio.create_task(stream.listen(callback)) + remove = stream.async_add_listener(callback) - await asyncio.sleep(20) - await stream.close() + print("Running") + await asyncio.sleep(60) + remove() ``` \ No newline at end of file diff --git a/setup.py b/setup.py index 5145928..599b3b8 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="teslemetry_stream", - version="0.0.5", + version="0.1.0", author="Brett Adams", author_email="hello@teslemetry.com", description="Teslemetry Streaming API library for Python", diff --git a/teslemetry_stream/__init__.py b/teslemetry_stream/__init__.py index 3ab1c5c..cdf183a 100644 --- a/teslemetry_stream/__init__.py +++ b/teslemetry_stream/__init__.py @@ -1,9 +1,11 @@ +from collections.abc import Callable, Awaitable import aiohttp import asyncio import json import logging from .const import TelemetryFields, TelemetryAlerts + LOGGER = logging.getLogger(__package__) @@ -35,6 +37,7 @@ class TeslemetryStream: alerts: list[TelemetryAlerts] _update_lock = asyncio.Lock() _response: aiohttp.ClientResponse | None = None + _listeners: dict[Callable, Callable] = {} def __init__( self, @@ -144,12 +147,7 @@ async def connect(self) -> None: "Connected to %s with status %s", self._response.url, self._response.status ) - async def listen(self, callback): - """Listen to the telemetry stream.""" - async for event in self: - await callback(event) - - async def close(self) -> None: + def close(self) -> None: """Close connection.""" if self._response is not None: self._response.close() @@ -162,7 +160,7 @@ async def __aenter__(self) -> "TeslemetryStream": async def __aexit__(self, *exc): """Close connection.""" - await self.close() + self.close() def __aiter__(self): """Return""" @@ -181,3 +179,28 @@ async def __anext__(self) -> dict: continue except aiohttp.ClientConnectionError as error: raise StopAsyncIteration from error + + def async_add_listener(self, callback: Callable) -> Callable[[], None]: + """Listen for data updates.""" + schedule_refresh = not self._listeners + + def remove_listener() -> None: + """Remove update listener.""" + self._listeners.pop(remove_listener) + if not self._listeners: + self.close() + + self._listeners[remove_listener] = callback + + # This is the first listener, set up task. + if schedule_refresh: + asyncio.create_task(self.listen()) + + return remove_listener + + async def listen(self): + """Listen to the telemetry stream.""" + async for event in self: + if event: + for listener in self._listeners.values(): + listener(event)