Skip to content

Commit

Permalink
Fixes the refresh token process (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjaliaga authored Apr 1, 2024
1 parent a8562de commit 0d394b5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
1 change: 1 addition & 0 deletions aioaquarea/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
AQUAREA_SERVICE_CONTRACT = "remote/contract"
AQUAREA_SERVICE_A2W_STATUS_DISPLAY = "remote/a2wStatusDisplay"
AQUAREA_SERVICE_AUTH_CLIENT_ID = "vf2i6hW5hA2BB2BQGfTHXM4YFyW4I06K"
AQUAREA_SERVICE_AUTH0_CLIENT = "eyJuYW1lIjoiYXV0aDAuanMtdWxwIiwidmVyc2lvbiI6IjkuMjMuMiJ9"

PANASONIC = "Panasonic"

Expand Down
42 changes: 35 additions & 7 deletions aioaquarea/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .const import (
AQUAREA_SERVICE_A2W_STATUS_DISPLAY,
AQUAREA_SERVICE_AUTH0_CLIENT,
AQUAREA_SERVICE_BASE,
AQUAREA_SERVICE_CONSUMPTION,
AQUAREA_SERVICE_CONTRACT,
Expand Down Expand Up @@ -219,7 +220,7 @@ async def request(
data = await resp.json()

# let's check for access token and expiration time
if self._access_token and "accessToken" in data:
if self._access_token and self.__contains_valid_token(data):
self._access_token = data["accessToken"]["token"]
self._token_expiration = dt.datetime.strptime(
data["accessToken"]["expires"], "%Y-%m-%dT%H:%M:%S%z"
Expand All @@ -239,6 +240,14 @@ async def request(

return resp

def __contains_valid_token(self, data: dict) -> bool:
"""Check if the data contains a valid token."""
return (
"accessToken" in data
and "token" in data["accessToken"]
and "expires" in data["accessToken"]
)

async def look_for_errors(
self, data: dict
) -> list[FaultError]:
Expand All @@ -260,22 +269,22 @@ async def login(self) -> None:
return

if self._environment is AquareaEnvironment.DEMO:
await self._login_demo()
await self.__login_demo()
else:
await self._login_production()

await self.__login_production()
self._last_login = dt.datetime.now()

finally:
self._login_lock.release()

async def _login_demo(self) -> None:
async def __login_demo(self) -> None:
_ = await self.request("GET", "", referer=self._base_url)
self._token_expiration = dt.datetime.astimezone(
dt.datetime.utcnow(), tz=dt.timezone.utc
) + dt.timedelta(days=1)

async def _login_production(self) -> None:
async def __login_production(self) -> None:
response: aiohttp.ClientResponse = await self.request(
"POST",
AQUAREA_SERVICE_LOGIN,
Expand Down Expand Up @@ -304,6 +313,12 @@ async def _login_production(self) -> None:
allow_redirects=False)

location = response.headers.get("Location")

# We might be already on the last step of the login process if we're refreshing the token
if self.__is_final_step(location):
return await self.__complete_login(location)

# We continue with the login process
parsed_url = urllib.parse.urlparse(location)

# Extract the value of the 'state' query parameter
Expand Down Expand Up @@ -350,7 +365,7 @@ async def _login_production(self) -> None:
referer=f"https://authglb.digital.panasonic.com/login?{urllib.parse.urlencode(query_params)}",
content_type="application/json; charset=UTF-8",
headers={
"Auth0-Client": "eyJuYW1lIjoiYXV0aDAuanMtdWxwIiwidmVyc2lvbiI6IjkuMjMuMiJ9"
"Auth0-Client": AQUAREA_SERVICE_AUTH0_CLIENT,
},
allow_redirects=False,
json=data,
Expand Down Expand Up @@ -395,6 +410,14 @@ async def _login_production(self) -> None:

location = response.headers.get("Location")

if self.__is_final_step(location):
return await self.__complete_login(location)

raise AuthenticationError(AuthenticationErrorCodes.INVALID_USERNAME_OR_PASSWORD, "Invalid username or password")

async def __complete_login(self, location: str) -> None:
"""Complete the login process."""

response: aiohttp.ClientResponse = await self.request(
"GET",
external_url=location,
Expand All @@ -408,6 +431,11 @@ async def _login_production(self) -> None:
f"Login successful for {self.username}. Access Token Expiration: {self._token_expiration}"
)

def __is_final_step(self, location: str) -> bool:
"""Check if the location is the final step of the login process."""
parsed = urllib.parse.urlparse(location)
return parsed.hostname == "aquarea-smart.panasonic.com" and parsed.path == "/authorizationCallback" and "code" in urllib.parse.parse_qs(parsed.query)

@auth_required
async def get_devices(self, include_long_id=False) -> list[DeviceInfo]:
"""Get list of devices and its configuration, without status."""
Expand Down

0 comments on commit 0d394b5

Please sign in to comment.