diff --git a/server/model/schema.py b/server/model/schema.py index 70ec87b1..ae3cd27f 100644 --- a/server/model/schema.py +++ b/server/model/schema.py @@ -96,12 +96,13 @@ class Base(db.Model): class User(Base): __tablename__ = "user" + github_id = db.Column(db.String(128), nullable=True, comment="GitHub ID, 作为唯一标识") email = db.Column(db.String(128), nullable=True, comment="邮箱,这里考虑一下如何做唯一的用户") telephone = db.Column(db.String(128), nullable=True, comment="手机号") name = db.Column(db.String(128), nullable=True, comment="用户名") avatar = db.Column(db.String(128), nullable=True, comment="头像") extra = db.Column( - JSONStr(1024), nullable=True, server_default=text("'{}'"), comment="用户其他字段" + JSONStr(2048), nullable=True, server_default=text("'{}'"), comment="用户其他字段" ) diff --git a/server/routes/github.py b/server/routes/github.py index 156fd306..446d85ff 100644 --- a/server/routes/github.py +++ b/server/routes/github.py @@ -2,57 +2,28 @@ import os from app import app -from flask import Blueprint, abort, redirect, request -from utils.github import get_installation_token, get_jwt, register_by_code +from flask import Blueprint, abort, redirect, request, session +from model.schema import Team +from utils.auth import authenticated +from utils.github.common import get_installation_token, get_jwt, verify_github_signature +from utils.user import register bp = Blueprint("github", __name__, url_prefix="/api/github") @bp.route("/install", methods=["GET"]) +@authenticated def github_install(): """Install GitHub App. - If not `installation_id`, redirect to install page. - If `installation_id`, get installation token. - - If `code`, register by code. + Redirect to GitHub App installation page. """ installation_id = request.args.get("installation_id", None) - if installation_id is None: return redirect( f"https://github.com/apps/{os.environ.get('GITHUB_APP_NAME')}/installations/new" ) - logging.debug(f"installation_id: {installation_id}") - - jwt = get_jwt( - os.environ.get("GITHUB_APP_PRIVATE_KEY_PATH"), - os.environ.get("GITHUB_APP_ID"), - ) - - installation_token = get_installation_token(jwt, installation_id) - if installation_token is None: - logging.debug("Failed to get installation token.") - - # TODO: 统一解决各类 http 请求失败的情况 - abort(500) - logging.debug(f"installation_token: {installation_token}") - - # 如果有 code 参数,则为该用户注册 - code = request.args.get("code", None) - if code is not None: - logging.debug(f"code: {code}") - - user_token = register_by_code(code) - if user_token is None: - logging.debug("Failed to register by code.") - abort(500) - - logging.debug(f"user_token: {user_token}") - - return "Success!" - @bp.route("/register", methods=["GET"]) def github_register(): @@ -62,18 +33,35 @@ def github_register(): If `code`, register by code. """ code = request.args.get("code", None) + if code is None: return redirect( f"https://github.com/login/oauth/authorize?client_id={os.environ.get('GITHUB_CLIENT_ID')}" ) - logging.debug(f"code: {code}") - user_token = register_by_code(code) - if user_token is None: + # 通过 code 注册;如果 user 已经存在,则一样会返回 user_id + user_id = register(code) + if user_id is None: return "Failed to register by code." - logging.debug(f"user_token: {user_token}") - return user_token + # 保存用户注册状态 + session["user_id"] = user_id + + return "Success!" + + +@bp.route("/hook", methods=["POST"]) +@verify_github_signature(os.environ.get("GITHUB_WEBHOOK_SECRET")) +def github_hook(): + """Receive GitHub webhook.""" + + x_github_event = request.headers.get("x-github-event", None) + + logging.info(x_github_event) + + logging.debug(request.json) + + return "Receive Success!" app.register_blueprint(bp) diff --git a/server/utils/github/__init__.py b/server/utils/github/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/server/utils/github/common.py b/server/utils/github/common.py new file mode 100644 index 00000000..4b3e811f --- /dev/null +++ b/server/utils/github/common.py @@ -0,0 +1,173 @@ +import hashlib +import hmac +import logging +import os +import time +from functools import wraps +from urllib.parse import parse_qs + +import httpx +from flask import abort, request +from jwt import JWT, jwk_from_pem + + +def get_jwt(pem_path: str, app_id: str) -> str: + """Generate a JSON Web Token (JWT) for authentication. + + Args: + pem_path (str): path to the private key file. + app_id (str): GitHub App's identifier. + + Returns: + str: JWT. + """ + + # Open PEM + with open(pem_path, "rb") as pem_file: + signing_key = jwk_from_pem(pem_file.read()) + + payload = { + # Issued at time + "iat": int(time.time()), + # JWT expiration time (10 minutes maximum) + "exp": int(time.time()) + 600, + # GitHub App's identifier + "iss": app_id, + } + + # Create JWT + jwt_instance = JWT() + encoded_jwt = jwt_instance.encode(payload, signing_key, alg="RS256") + + return encoded_jwt + + +def get_installation_token(jwt: str, installation_id: str) -> str | None: + """Get installation access token + + Args: + jwt (str): The JSON Web Token used for authentication. + installation_id (str): The ID of the installation. + + Returns: + str: The installation access token. + """ + + with httpx.Client() as client: + response = client.post( + f"https://api.github.com/app/installations/{installation_id}/access_tokens", + headers={ + "Accept": "application/vnd.github+json", + "Authorization": f"Bearer {jwt}", + "X-GitHub-Api-Version": "2022-11-28", + }, + ) + if response.status_code != 200: + logging.debug(f"Failed to get installation token. {response.text}") + return None + + installation_token = response.json().get("token", None) + return installation_token + + return None + + +def oauth_by_code(code: str) -> dict | None: + """Register by code + + Args: + code (str): The code returned by GitHub OAuth. + + Returns: + str: The user access token. + """ + + with httpx.Client() as client: + response = client.post( + "https://github.com/login/oauth/access_token", + params={ + "client_id": os.environ.get("GITHUB_CLIENT_ID"), + "client_secret": os.environ.get("GITHUB_CLIENT_SECRET"), + "code": code, + }, + ) + if response.status_code != 200: + return None + + try: + oauth_info = parse_qs(response.text) + except Exception as e: + logging.debug(e) + return None + + return oauth_info + + +def verify_github_signature( + secret: str = os.environ.get("GITHUB_WEBHOOK_SECRET", "secret") +): + """Decorator to verify the signature of a GitHub webhook request. + + Args: + secret (str): The secret key used to sign the webhook request. + + Returns: + function: The decorated function. + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + signature = request.headers.get("x-hub-signature-256") + if not signature: + abort(400, "No signature provided.") + + # Verify the signature + body = request.get_data() + + hash_object = hmac.new( + secret.encode("utf-8"), + msg=body, + digestmod=hashlib.sha256, + ) + expected_signature = "sha256=" + hash_object.hexdigest() + + logging.debug(f"{expected_signature} {signature}") + + if not hmac.compare_digest(expected_signature, signature): + logging.debug("Invalid signature.") + abort(403, "Invalid signature.") + + return func(*args, **kwargs) + + return wrapper + + return decorator + + +def get_user_info(access_token: str): + """Get user info by access token. + + Args: + access_token (str): The user access token. + + Returns: + dict: User info. + """ + + with httpx.Client() as client: + response = client.get( + "https://api.github.com/user", + headers={ + "Accept": "application/vnd.github.v3+json", + "Authorization": f"token {access_token}", + }, + ) + if response.status_code != 200: + logging.debug(f"Failed to get user info. {response.text}") + return None + + user_info = response.json() + return user_info + + return None diff --git a/server/utils/user.py b/server/utils/user.py new file mode 100644 index 00000000..36ff98fa --- /dev/null +++ b/server/utils/user.py @@ -0,0 +1,53 @@ +from app import app, db +from model.schema import BindUser, ObjID, User +from utils.github.common import get_user_info, oauth_by_code + + +def register(code: str) -> str | None: + """GitHub OAuth register. + + If `code`, register by code. + """ + + oauth_info = oauth_by_code(code) # 获取 access token + + access_token = oauth_info.get("access_token", None)[0] # 这里要考虑取哪个,为什么会有多个? + + # 使用 oauth_info 中的 access_token 获取用户信息 + user_info = get_user_info(access_token) + + # 查询 github_id 是否已经存在,若存在,则返回 user_id + github_id = user_info.get("id", None) + if github_id is not None: + user = User.query.filter_by(github_id=github_id).first() + if user is not None: + return user.id + + new_user = User( + id=ObjID.new_id(), + github_id=github_id, + email=user_info.get( + "email", None + ), # 这里的邮箱其实是公开邮箱,可能会获取不到 TODO: 换成使用用户邮箱 API 来获取 + name=user_info.get("login", None), + avatar=user_info.get("avatar_url", None), + extra=user_info, + ) + + db.session.add(new_user) + db.session.commit() + + new_bind_user = BindUser( + id=ObjID.new_id(), + user_id=new_user.id, + platform="github", + email=user_info.get("email", None), + avatar=user_info.get("avatar_url", None), + extra=oauth_info, + ) + + db.session.add(new_bind_user) + + db.session.commit() + + return new_user.id