Skip to content

Commit

Permalink
Remove PubSub (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
avgupta456 authored Nov 23, 2023
1 parent 892fafd commit 608321c
Show file tree
Hide file tree
Showing 104 changed files with 553 additions and 1,066 deletions.
2 changes: 0 additions & 2 deletions backend/.env-template
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,4 @@ OAUTH_REDIRECT_URI=abc123

GOOGLE_APPLICATION_CREDENTIALS=abc123

PUBSUB_TOKEN=abc123

MONGODB_PASSWORD=abc123
6 changes: 3 additions & 3 deletions backend/delete_old_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
from datetime import datetime
from typing import Any

import asyncio
from dotenv import find_dotenv, load_dotenv
from datetime import datetime

load_dotenv(find_dotenv())

Expand All @@ -29,7 +29,7 @@ async def count_old_rows(cutoff_date: datetime) -> int:

async def delete_old_rows(cutoff_date: datetime):
filters = get_filters(cutoff_date)
result = await USER_MONTHS.delete_many(filters) # type: ignore
result = await USER_MONTHS.delete_many(filters)
print(f"Deleted {result.deleted_count} rows")


Expand Down
1 change: 0 additions & 1 deletion backend/deploy/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ steps:
args: ["run", "create-env"]
dir: "backend"
env:
- "PUBSUB_TOKEN=${_PUBSUB_TOKEN}"
- "OAUTH_CLIENT_ID=${_OAUTH_CLIENT_ID}"
- "OAUTH_CLIENT_SECRET=${_OAUTH_CLIENT_SECRET}"
- "OAUTH_REDIRECT_URI=${_OAUTH_REDIRECT_URI}"
Expand Down
11 changes: 0 additions & 11 deletions backend/deploy/pubsub.Dockerfile

This file was deleted.

616 changes: 195 additions & 421 deletions backend/poetry.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ motor = "^3.3.1"
aiofiles = "^23.2.1"
aiounittest = "^1.4.2"
coveralls = "^3.3.1"
google-cloud-pubsub = "^2.18.4"
grpcio = "^1.59.2"
gunicorn = "^21.2.0"
pymongo = {extras = ["srv"], version = "^4.6.0"}
Expand Down
27 changes: 7 additions & 20 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ aiofiles==23.2.1
aiounittest==1.4.2
annotated-types==0.6.0
anyio==3.7.1
cachetools==5.3.2
certifi==2023.7.22
certifi==2023.11.17
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
Expand All @@ -12,39 +11,27 @@ coveralls==3.3.1
dnspython==2.4.2
docopt==0.6.2
fastapi==0.104.1
google-api-core[grpc]==2.14.0
google-auth==2.23.4
google-cloud-pubsub==2.18.4
googleapis-common-protos==1.61.0
googleapis-common-protos[grpc]==1.61.0
grpc-google-iam-v1==0.12.7
grpcio-status==1.59.2
grpcio==1.59.2
grpcio==1.59.3
gunicorn==21.2.0
h11==0.14.0
httptools==0.6.1
idna==3.4
motor==3.3.1
motor==3.3.2
packaging==23.2
proto-plus==1.22.3
protobuf==4.25.0
pyasn1-modules==0.3.0
pyasn1==0.5.0
pydantic-core==2.10.1
pydantic==2.4.2
pydantic-core==2.14.5
pydantic==2.5.2
pymongo==4.6.0
pymongo[srv]==4.6.0
python-dotenv==1.0.0
pytz==2023.3.post1
pyyaml==6.0.1
requests==2.31.0
rsa==4.9
sentry-sdk==1.34.0
sentry-sdk==1.36.0
sniffio==1.3.0
starlette==0.27.0
svgwrite==1.4.3
typing-extensions==4.8.0
urllib3==2.0.7
urllib3==2.1.0
uvicorn[standard]==0.24.0.post1
uvloop==0.19.0
watchfiles==0.21.0
Expand Down
3 changes: 3 additions & 0 deletions backend/src/aggregation/layer0/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from src.aggregation.layer0.package import get_user_data

__all__ = ["get_user_data"]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytz

from src.aggregation.layer0.languages import CommitLanguages, get_commit_languages
from src.constants import (
GRAPHQL_NODE_CHUNK_SIZE,
GRAPHQL_NODE_THREADS,
Expand All @@ -29,10 +30,6 @@
get_repo_commits,
)
from src.models import UserContributions
from src.subscriber.aggregation.user.languages import (
CommitLanguages,
get_commit_languages,
)
from src.utils import date_to_datetime, gather


Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import date
from typing import Optional

from src.aggregation.layer0.contributions import get_contributions
from src.models import UserPackage
from src.subscriber.aggregation.user.contributions import get_contributions

# from src.subscriber.aggregation.user.follows import get_user_follows

Expand Down
3 changes: 3 additions & 0 deletions backend/src/aggregation/layer1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from src.aggregation.layer1.user import query_user

__all__ = ["query_user"]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import List
from typing import List, Tuple

from src.constants import OWNER, REPO
from src.data.github.rest import (
Expand Down Expand Up @@ -35,7 +35,7 @@ async def get_valid_db_user(user_id: str) -> bool:
@alru_cache(ttl=timedelta(minutes=15))
async def get_repo_stargazers(
owner: str = OWNER, repo: str = REPO, no_cache: bool = False
) -> List[str]:
) -> Tuple[bool, List[str]]:
access_token = get_access_token()
data: List[str] = []
page = 0
Expand All @@ -45,7 +45,7 @@ async def get_repo_stargazers(
data.extend(temp_data)
page += 1

return (True, data) # type: ignore
return (True, data)


async def get_user_stars(user_id: str) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from calendar import monthrange
from datetime import date, datetime, timedelta
from typing import List, Optional
from typing import List, Optional, Tuple

import requests

from src.constants import API_VERSION, BACKEND_URL, DOCKER, LOCAL_PUBLISHER, PROD
from src.aggregation.layer0.package import get_user_data
from src.constants import API_VERSION, BACKEND_URL, PROD
from src.data.github.graphql import GraphQLErrorRateLimit
from src.data.mongo.secret import update_keys
from src.data.mongo.user_months import UserMonth, get_user_months, set_user_month
from src.models.user.main import UserPackage
from src.subscriber.aggregation import get_user_data
from src.utils import alru_cache, date_to_datetime

s = requests.Session()

# NOTE: query user from PubSub, not from subscriber user router

# Formerly the subscriber, compute and save new data here


async def query_user_month(
Expand Down Expand Up @@ -72,7 +73,7 @@ async def query_user(
start_date: date = date.today() - timedelta(365),
end_date: date = date.today(),
no_cache: bool = False,
) -> UserPackage:
) -> Tuple[bool, UserPackage]:
# Return (possibly incomplete) within 45 seconds
start_time = datetime.now()
incomplete = False
Expand All @@ -85,17 +86,17 @@ async def query_user(
curr_months = [x.month for x in curr_data if x.complete]

month, year = start_date.month, start_date.year
months: List[date] = []
new_months: List[date] = []
while date(year, month, 1) <= end_date:
start = date(year, month, 1)
if date_to_datetime(start) not in curr_months:
months.append(start)
new_months.append(start)
month = month % 12 + 1
year = year + (month == 1)

# Start with complete months and add any incomplete months
all_user_packages: List[UserPackage] = [x.data for x in curr_data if x.complete]
for month in months:
for month in new_months:
if datetime.now() - start_time < timedelta(seconds=40):
temp = await query_user_month(user_id, access_token, private_access, month)
if temp is not None:
Expand All @@ -112,14 +113,12 @@ async def query_user(
out += user_package
out.incomplete = incomplete

if incomplete or len(months) > 1:
if incomplete or len(new_months) > 1:
# cache buster for publisher
if PROD:
s.get(f"{BACKEND_URL}/user/{user_id}?no_cache=True")
elif DOCKER:
s.get(f"{LOCAL_PUBLISHER}/user/{user_id}?no_cache=True")

return (False, out) # type: ignore
return (False, out)

# only cache if just the current month updated
return (True, out) # type: ignore
return (True, out)
4 changes: 4 additions & 0 deletions backend/src/aggregation/layer2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from src.aggregation.layer2.auth import get_is_valid_user
from src.aggregation.layer2.user import get_user, get_user_demo

__all__ = ["get_is_valid_user", "get_user", "get_user_demo"]
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from datetime import timedelta
from typing import Tuple

from src.constants import OWNER, REPO
from src.data.github.rest import RESTError
from src.subscriber.aggregation import (
from src.aggregation.layer1.auth import (
get_repo_stargazers,
get_user_stars,
get_valid_db_user,
get_valid_github_user,
)
from src.constants import OWNER, REPO
from src.data.github.rest import RESTError
from src.utils import alru_cache

USER_WHITELIST = [
Expand Down Expand Up @@ -43,17 +44,17 @@ async def check_user_starred_repo(


@alru_cache(ttl=timedelta(hours=1))
async def get_is_valid_user(user_id: str) -> str:
async def get_is_valid_user(user_id: str) -> Tuple[bool, str]:
if user_id in USER_WHITELIST:
return (True, "Valid user") # type: ignore
return (True, "Valid user")

valid_github_user = await check_github_user_exists(user_id)
if not valid_github_user:
return (False, "GitHub user not found") # type: ignore
return (False, "GitHub user not found")

valid_db_user = await check_db_user_exists(user_id)
user_starred = await check_user_starred_repo(user_id)
if not (user_starred or valid_db_user):
return (False, "Repo not starred") # type: ignore
return (False, "Repo not starred")

return (True, "Valid user") # type: ignore
return (True, "Valid user")
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
from datetime import date, timedelta
from typing import Optional
from typing import Optional, Tuple

from src.aggregation.layer0 import get_user_data
from src.data.mongo.secret.functions import update_keys
from src.data.mongo.user import PublicUserModel, get_public_user as db_get_public_user
from src.data.mongo.user_months import get_user_months
from src.models import UserPackage
from src.publisher.processing.pubsub import publish_user

# TODO: replace with call to subscriber so compute not on publisher
from src.subscriber.aggregation import get_user_data
from src.models.background import UpdateUserBackgroundTask
from src.utils import alru_cache


@alru_cache()
async def update_user(
user_id: str, access_token: Optional[str], private_access: bool
) -> bool:
"""Sends a message to pubsub to request a user update (auto cache updates)"""
await publish_user(user_id, access_token, private_access)
return (True, True) # type: ignore
# Formerly the publisher, loads existing data here


async def _get_user(
Expand All @@ -42,23 +33,26 @@ async def get_user(
start_date: date,
end_date: date,
no_cache: bool = False,
) -> Optional[UserPackage]:
) -> Tuple[bool, Tuple[Optional[UserPackage], Optional[UpdateUserBackgroundTask]]]:
user: Optional[PublicUserModel] = await db_get_public_user(user_id)
if user is None:
return (False, None) # type: ignore
return (False, (None, None))

private_access = user.private_access or False
await update_user(user_id, user.access_token, private_access)
user_data = await _get_user(user_id, private_access, start_date, end_date)
return (user_data is not None, user_data) # type: ignore
background_task = UpdateUserBackgroundTask(
user_id=user_id, access_token=user.access_token, private_access=private_access
)
return (user_data is not None, (user_data, background_task))


@alru_cache(ttl=timedelta(minutes=15))
async def get_user_demo(
user_id: str, start_date: date, end_date: date, no_cache: bool = False
) -> UserPackage:
) -> Tuple[bool, UserPackage]:
await update_keys()
timezone_str = "US/Eastern"
# recompute/cache but don't save to db
data = await get_user_data(
user_id=user_id,
start_date=start_date,
Expand All @@ -67,4 +61,4 @@ async def get_user_demo(
access_token=None,
catch_errors=True,
)
return (True, data) # type: ignore
return (True, data)
16 changes: 7 additions & 9 deletions backend/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
BLACKLIST = ["Jupyter Notebook", "HTML"] # languages to ignore

# OAUTH
OAUTH_CLIENT_ID = os.getenv("OAUTH_CLIENT_ID", "") # client ID for GitHub OAuth App
OAUTH_CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET", "") # client secret for App
OAUTH_REDIRECT_URI = os.getenv("OAUTH_REDIRECT_URI", "") # redirect uri for App

# PUBSUB
PUBSUB_TOKEN = os.getenv("PUBSUB_TOKEN", "")
# LOCAL_SUBSCRIBER = "http://" + ("subscriber" if DOCKER else "localhost") + ":8001"
LOCAL_SUBSCRIBER = "http://backend:8000" if DOCKER else BACKEND_URL
LOCAL_PUBLISHER = "http://backend:8000" if DOCKER else BACKEND_URL
prefix = "PROD" if PROD else "DEV"
# client ID for GitHub OAuth App
OAUTH_CLIENT_ID = os.getenv(f"{prefix}_OAUTH_CLIENT_ID", "")
# client secret for App
OAUTH_CLIENT_SECRET = os.getenv(f"{prefix}_OAUTH_CLIENT_SECRET", "")
# redirect uri for App
OAUTH_REDIRECT_URI = os.getenv(f"{prefix}_OAUTH_REDIRECT_URI", "")


# MONGODB
Expand Down
2 changes: 1 addition & 1 deletion backend/src/data/github/auth/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_unknown_user(access_token: str) -> Optional[str]:
}

r = s.get("https://api.github.com/user", params={}, headers=headers)
return r.json().get("login", None) # type: ignore
return r.json().get("login", None)


class OAuthError(Exception):
Expand Down
Loading

0 comments on commit 608321c

Please sign in to comment.