Skip to content

Commit

Permalink
Add retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Bre77 committed Mar 8, 2024
1 parent 504bad1 commit 3732844
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 69 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="teslemetry_stream",
version="0.1.3",
version="0.1.4",
author="Brett Adams",
author_email="hello@teslemetry.com",
description="Teslemetry Streaming API library for Python",
Expand Down
100 changes: 32 additions & 68 deletions teslemetry_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


LOGGER = logging.getLogger(__package__)
DELAY = 10


class TeslemetryStreamError(Exception):
Expand Down Expand Up @@ -36,11 +37,10 @@ class TeslemetryStream:

fields: dict[TelemetryFields, dict[str, int]]
alerts: list[TelemetryAlerts]
_update_lock = asyncio.Lock()
_response: aiohttp.ClientResponse | None = None
_listeners: dict[Callable, Callable] = {}
connected = False
active = False
delay = DELAY
active = None

def __init__(
self,
Expand All @@ -62,6 +62,11 @@ def __init__(
self._headers = {"Authorization": f"Bearer {access_token}"}
self.parse_timestamp = parse_timestamp

@property
def connected(self) -> bool:
"""Return if connected."""
return self._response is not None

async def get_config(self, vin: str | None = None) -> None:
"""Get the current stream config."""

Expand All @@ -79,8 +84,9 @@ async def get_config(self, vin: str | None = None) -> None:
response = (await req.json()).get("response")

if (
response and (config := response.get("config"))
# and config["hostname"].endswith(".teslemetry.com")
response
and (config := response.get("config"))
and config["hostname"].endswith(".teslemetry.com")
):
self.server = config["hostname"]
self.fields = config["fields"]
Expand All @@ -90,36 +96,6 @@ async def get_config(self, vin: str | None = None) -> None:
if not response.get("synced"):
LOGGER.warning("Vehicle configuration not active")

async def add_field(
self, field: TelemetryFields, interval: int, update: bool = True
) -> None:
"""Add field to telemetry stream."""
if not self.fields.get(field, {}).get("interval_seconds") == interval:
self.fields[field] = {"interval_seconds": interval}
if update:
await self.update()

async def remove_field(self, field: TelemetryFields, update: bool = True) -> None:
"""Remove field from telemetry stream."""
if field in self.fields:
del self.fields[field]
if update:
await self.update()

async def add_alert(self, alert: TelemetryAlerts, update: bool = True) -> None:
"""Add alert to telemetry stream."""
if alert not in self.alerts:
self.alerts.append(alert)
if update:
await self.update()

async def remove_alert(self, alert: TelemetryAlerts, update: bool = True) -> None:
"""Remove alert from telemetry stream."""
if alert in self.alerts:
self.alerts.remove(alert)
if update:
await self.update()

@property
def config(self) -> dict:
"""Return current configuration."""
Expand All @@ -129,20 +105,9 @@ def config(self) -> dict:
"alerts": self.alerts,
}

async def update(self, wait: int = 1) -> None:
"""Update the telemetry stream."""
if self._update_lock.locked():
return
with self._update_lock:
await asyncio.sleep(wait)
await self._session.post(
f"https://api.teslemetry.com/api/telemetry/{self.vin}",
headers=self._headers,
json=self.config,
)

async def connect(self) -> None:
"""Connect to the telemetry stream."""
self.active = True
if not self.server:
await self.get_config()

Expand All @@ -151,9 +116,10 @@ async def connect(self) -> None:
f"https://{self.server}/sse/{self.vin or ''}",
headers=self._headers,
raise_for_status=True,
timeout=aiohttp.ClientTimeout(connect=5, sock_read=30, total=None),
timeout=aiohttp.ClientTimeout(
connect=5, sock_connect=5, sock_read=30, total=None
),
)
self.connected = True
LOGGER.debug(
"Connected to %s with status %s", self._response.url, self._response.status
)
Expand All @@ -164,27 +130,21 @@ def close(self) -> None:
LOGGER.debug("Disconnecting from %s", self.server)
self._response.close()
self._response = None
self.connected = False

async def __aenter__(self) -> "TeslemetryStream":
"""Connect and listen Server-Sent Event."""
await self.connect()
return self

async def __aexit__(self, *exc):
"""Close connection."""
self.close()

def __aiter__(self):
"""Return"""
return self

async def __anext__(self) -> dict:
"""Return next event."""
self.active = True
if not self._response:
await self.connect()
try:
if self.active is False:
# Stop the stream and loop
self.close()
raise StopAsyncIteration
if not self._response:
# Connect to the stream
await self.connect()
async for line_in_bytes in self._response.content:
line = line_in_bytes.decode("utf8")
if line.startswith("data:"):
Expand All @@ -197,12 +157,14 @@ async def __anext__(self) -> dict:
.timestamp()
) * 1000 + int(ns[:3])
LOGGER.debug("event %s", json.dumps(data))
self.delay = DELAY
return data
continue
except aiohttp.ClientConnectionError as error:
raise StopAsyncIteration from error
finally:
self.active = False
except aiohttp.ClientError as error:
LOGGER.warning("Connection error: %s", error)
self.close()
LOGGER.debug("Reconnecting in %s seconds", self.delay)
await asyncio.sleep(self.delay)
self.delay += DELAY

def async_add_listener(self, callback: Callable) -> Callable[[], None]:
"""Listen for data updates."""
Expand All @@ -212,7 +174,7 @@ def remove_listener() -> None:
"""Remove update listener."""
self._listeners.pop(remove_listener)
if not self._listeners:
self.close()
self.active = False

self._listeners[remove_listener] = callback

Expand All @@ -224,7 +186,9 @@ def remove_listener() -> None:

async def listen(self):
"""Listen to the telemetry stream."""

async for event in self:
if event:
for listener in self._listeners.values():
listener(event)
LOGGER.debug("Ending Loop")

0 comments on commit 3732844

Please sign in to comment.