diff --git a/cortext/protocol.py b/cortext/protocol.py index 559bcd3e..1761f872 100644 --- a/cortext/protocol.py +++ b/cortext/protocol.py @@ -379,6 +379,7 @@ async def process_streaming_response(self, response: StreamingResponse, organic= self.completion += tokens yield tokens except asyncio.TimeoutError as err: + self.completion += remain_chunk yield remain_chunk diff --git a/cursor/Dockerfile b/cursor/Dockerfile new file mode 100644 index 00000000..3e928ffe --- /dev/null +++ b/cursor/Dockerfile @@ -0,0 +1,18 @@ +# Use official Python image +FROM python:3.10 + +# Set working directory +WORKDIR /app + +# Copy and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the app files to the container +COPY . . + +# Expose the FastAPI port +EXPOSE 8000 + +# Start FastAPI app +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/cursor/app.py b/cursor/app.py deleted file mode 100644 index 543e00ba..00000000 --- a/cursor/app.py +++ /dev/null @@ -1,30 +0,0 @@ -from fastapi import FastAPI, HTTPException -from fastapi.responses import StreamingResponse -import httpx -import asyncio - -app = FastAPI() -organic_server_url = "https://0.0.0.0:8000" - - -@app.post("/apis.datura.ai/s18_sigma") -async def stream_prompt(): - try: - # Create an asynchronous HTTP client session - async with httpx.AsyncClient() as client: - # Make a streaming GET request to the external server - response = await client.stream("GET", organic_server_url) - response.raise_for_status() # Raise an error for bad responses (4xx, 5xx) - - # Define an async generator to read chunks of data from the external server - async def stream_generator(): - async for chunk in response.aiter_bytes(): - yield chunk - await asyncio.sleep(0) # Allow other tasks to run - - # Return a StreamingResponse that forwards the data from the external server - return StreamingResponse(stream_generator(), media_type="application/json") - except httpx.HTTPStatusError as e: - raise HTTPException(status_code=e.response.status_code, detail=str(e)) - except httpx.RequestError as e: - raise HTTPException(status_code=500, detail="Failed to fetch data from external server") diff --git a/cursor/app/__init__.py b/cursor/app/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cursor/app/core/__init__.py b/cursor/app/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cursor/app/core/middleware.py b/cursor/app/core/middleware.py new file mode 100644 index 00000000..e16641a4 --- /dev/null +++ b/cursor/app/core/middleware.py @@ -0,0 +1,33 @@ +import time +from fastapi import HTTPException + + +async def verify_api_key_rate_limit(config, api_key): + # NOTE: abit dangerous but very useful + if not config.prod: + if api_key == "test": + return True + + rate_limit_key = f"rate_limit:{api_key}" + rate_limit = await config.redis_db.get(rate_limit_key) + if rate_limit is None: + async with await config.psql_db.connection() as connection: + # rate_limit = await get_api_key_rate_limit(connection, api_key) + if rate_limit is None: + raise HTTPException(status_code=403, detail="Invalid API key") + await config.redis_db.set(rate_limit_key, rate_limit, ex=30) + else: + rate_limit = int(rate_limit) + + minute = time.time() // 60 + current_rate_limit_key = f"current_rate_limit:{api_key}:{minute}" + current_rate_limit = await config.redis_db.get(current_rate_limit_key) + if current_rate_limit is None: + current_rate_limit = 0 + await config.redis_db.expire(current_rate_limit_key, 60) + else: + current_rate_limit = int(current_rate_limit) + + await config.redis_db.incr(current_rate_limit_key) + if current_rate_limit >= rate_limit: + raise HTTPException(status_code=429, detail="Too many requests") diff --git a/cursor/app/database.py b/cursor/app/database.py new file mode 100644 index 00000000..92f1c93f --- /dev/null +++ b/cursor/app/database.py @@ -0,0 +1,21 @@ +import psycopg2 +import os + +DATABASE_URL = os.getenv("DATABASE_URL") +TABEL_NAME = 'query_resp_data' +# PostgreSQL connection parameters +conn = psycopg2.connect(DATABASE_URL) + +# Create a cursor object to interact with the database +cur = conn.cursor() + + +async def create_table(app): + global conn, cur, TABEL_NAME + try: + pass + + except Exception as e: + print(f"Error creating table: {e}") + +create_table(None) diff --git a/cursor/app/endpoints/__init__.py b/cursor/app/endpoints/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cursor/app/endpoints/text.py b/cursor/app/endpoints/text.py new file mode 100644 index 00000000..18e7bcca --- /dev/null +++ b/cursor/app/endpoints/text.py @@ -0,0 +1,173 @@ +import json +from typing import Any, AsyncGenerator +import uuid +from fastapi import Depends, HTTPException +from fastapi.responses import JSONResponse, StreamingResponse +from redis.asyncio import Redis +from fastapi.routing import APIRouter +from cursor.app.models import RequestModel +import asyncio +from redis.asyncio.client import PubSub +import time + +COUNTER_TEXT_GENERATION_ERROR = metrics.get_meter(__name__).create_counter("validator.entry_node.text.error") +COUNTER_TEXT_GENERATION_SUCCESS = metrics.get_meter(__name__).create_counter("validator.entry_node.text.success") +GAUGE_TOKENS_PER_SEC = metrics.get_meter(__name__).create_gauge( + "validator.entry_node.text.tokens_per_sec", + description="Average tokens per second metric for LLM streaming for an organic LLM query" +) + + +def _construct_organic_message(payload: dict, job_id: str, task: str) -> str: + return json.dumps({"query_type": gcst.ORGANIC, "query_payload": payload, "task": task, "job_id": job_id}) + + +async def _wait_for_acknowledgement(pubsub: PubSub, job_id: str) -> bool: + async for message in pubsub.listen(): + channel = message["channel"].decode() + if channel == f"{gcst.ACKNLOWEDGED}:{job_id}" and message["type"] == "message": + logger.info(f"Job {job_id} confirmed by worker") + break + await pubsub.unsubscribe(f"{gcst.ACKNLOWEDGED}:{job_id}") + return True + + +async def _stream_results(pubsub: PubSub, job_id: str, task: str, first_chunk: str, start_time: float) -> \ + AsyncGenerator[str, str]: + yield first_chunk + num_tokens = 0 + async for message in pubsub.listen(): + channel = message["channel"].decode() + + if channel == f"{rcst.JOB_RESULTS}:{job_id}" and message["type"] == "message": + result = json.loads(message["data"].decode()) + if gcst.ACKNLOWEDGED in result: + continue + status_code = result[gcst.STATUS_CODE] + if status_code >= 400: + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": task, "kind": "nth_chunk_timeout", + "status_code": status_code}) + raise HTTPException(status_code=status_code, detail=result[gcst.ERROR_MESSAGE]) + + content = result[gcst.CONTENT] + num_tokens += 1 + yield content + if "[DONE]" in content: + break + COUNTER_TEXT_GENERATION_SUCCESS.add(1, {"task": task, "status_code": 200}) + completion_time = time.time() - start_time + + tps = num_tokens / completion_time + GAUGE_TOKENS_PER_SEC.set(tps, {"task": task}) + logger.info(f"Tokens per second for job_id: {job_id}, task: {task}: {tps}") + + await pubsub.unsubscribe(f"{rcst.JOB_RESULTS}:{job_id}") + + +async def _get_first_chunk(pubsub: PubSub, job_id: str) -> str | None: + async for message in pubsub.listen(): + if message["type"] == "message" and message["channel"].decode() == f"{rcst.JOB_RESULTS}:{job_id}": + result = json.loads(message["data"].decode()) + if gcst.STATUS_CODE in result and result[gcst.STATUS_CODE] >= 400: + raise HTTPException(status_code=result[gcst.STATUS_CODE], detail=result[gcst.ERROR_MESSAGE]) + return result[gcst.CONTENT] + return None + + +async def make_stream_organic_query( + redis_db: Redis, + payload: dict[str, Any], + task: str, +) -> AsyncGenerator[str, str]: + job_id = uuid.uuid4().hex + organic_message = _construct_organic_message(payload=payload, job_id=job_id, task=task) + + pubsub = redis_db.pubsub() + await pubsub.subscribe(f"{gcst.ACKNLOWEDGED}:{job_id}") + await redis_db.lpush(rcst.QUERY_QUEUE_KEY, organic_message) # type: ignore + + first_chunk = None + try: + await asyncio.wait_for(_wait_for_acknowledgement(pubsub, job_id), timeout=1) + except asyncio.TimeoutError: + logger.error( + f"Query node down? No confirmation received for job {job_id} within timeout period. Task: {task}, model: {payload['model']}" + ) + COUNTER_TEXT_GENERATION_ERROR.add(1, + {"task": task, "kind": "redis_acknowledgement_timeout", "status_code": 500}) + raise HTTPException(status_code=500, detail="Unable to process request ; redis_acknowledgement_timeout") + + await pubsub.subscribe(f"{rcst.JOB_RESULTS}:{job_id}") + logger.info("Here waiting for a message!") + start_time = time.time() + try: + first_chunk = await asyncio.wait_for(_get_first_chunk(pubsub, job_id), timeout=2) + except asyncio.TimeoutError: + logger.error( + f"Query node down? Timed out waiting for the first chunk of results for job {job_id}. Task: {task}, model: {payload['model']}" + ) + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": task, "kind": "first_chunk_timeout", "status_code": 500}) + raise HTTPException(status_code=500, detail="Unable to process request ; first_chunk_timeout") + + if first_chunk is None: + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": task, "kind": "first_chunk_missing", "status_code": 500}) + raise HTTPException(status_code=500, detail="Unable to process request ; first_chunk_missing") + return _stream_results(pubsub, job_id, task, first_chunk, start_time) + + +async def _handle_no_stream(text_generator: AsyncGenerator[str, str]) -> JSONResponse: + all_content = "" + async for chunk in text_generator: + chunks = load_sse_jsons(chunk) + if isinstance(chunks, list): + for chunk in chunks: + content = chunk["choices"][0]["delta"]["content"] + all_content += content + if content == "": + break + + return JSONResponse({"choices": [{"delta": {"content": all_content}}]}) + + +async def chat( + chat_request: request_models.ChatRequest, + config: Config = Depends(get_config), +) -> StreamingResponse | JSONResponse: + payload = request_models.chat_to_payload(chat_request) + payload.temperature = 0.5 + + try: + text_generator = await make_stream_organic_query( + redis_db=config.redis_db, + payload=payload.model_dump(), + task=payload.model, + ) + + logger.info("Here returning a response!") + + if chat_request.stream: + return StreamingResponse(text_generator, media_type="text/event-stream") + else: + return await _handle_no_stream(text_generator) + + except HTTPException as http_exc: + COUNTER_TEXT_GENERATION_ERROR.add(1, + {"task": payload.model, "kind": type(http_exc).__name__, "status_code": 500}) + logger.info(f"HTTPException in chat endpoint: {str(http_exc)}") + raise http_exc + + except Exception as e: + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": payload.model, "kind": type(e).__name__, "status_code": 500}) + logger.error(f"Unexpected error in chat endpoint: {str(e)}") + raise HTTPException(status_code=500, detail="An unexpected error occurred") + + +router = APIRouter() +router.add_api_route( + "/v1/chat/completions", + chat, + methods=["POST", "OPTIONS"], + tags=["StreamPrompting"], + response_model=None, + dependencies=[Depends(verify_api_key_rate_limit)], +) diff --git a/cursor/app/main.py b/cursor/app/main.py new file mode 100644 index 00000000..86e3d3e2 --- /dev/null +++ b/cursor/app/main.py @@ -0,0 +1,49 @@ +from contextlib import asynccontextmanager +from fastapi import FastAPI, Depends, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from . import curd, models, schemas +from .database import create_table, conn, cur +from typing import List +from .endpoints.text import router as chat_router + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Load the ML model + await create_table(None) + yield + + +app = FastAPI(lifespan=lifespan) +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Allows all origins +) +app.include_router(chat_router) + + +@app.on_event("shutdown") +async def shutdown_event(): + cur.close() + conn.close() + + +# Create an item +@app.post("/items") +def create_item(items: List[schemas.ItemCreate]): + return curd.create_items(items=items) + + +# Read all items +@app.post("/items/search") +def read_items(req_body: models.RequestBody): + items = curd.get_items(req_body) + return items + + +# Read a single item by ID +@app.get("/items/{p_key}", response_model=schemas.Item) +def read_item(p_key: int): + db_item = curd.get_item(p_key=p_key) + if db_item is None: + raise HTTPException(status_code=404, detail="Item not found") + return db_item diff --git a/cursor/app/models.py b/cursor/app/models.py new file mode 100644 index 00000000..f8bab3a1 --- /dev/null +++ b/cursor/app/models.py @@ -0,0 +1,5 @@ +from cortext.protocol import StreamPrompting + + +class RequestModel(StreamPrompting): + pass diff --git a/cursor/docker-compose.yml b/cursor/docker-compose.yml new file mode 100644 index 00000000..ec1b10b9 --- /dev/null +++ b/cursor/docker-compose.yml @@ -0,0 +1,30 @@ +version: "3.9" + +services: + db: + image: postgres:13 + restart: always + environment: + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + volumes: + - postgres_data_score:/var/lib/postgresql/data + ports: + - "5432:5432" + + web: + build: . + restart: always + ports: + - "8000:8000" + depends_on: + - db + environment: + DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB} + POSTGRES_DB: ${POSTGRES_DB} + volumes: + - .:/app + +volumes: + postgres_data_score: \ No newline at end of file diff --git a/cursor/requirements.txt b/cursor/requirements.txt new file mode 100644 index 00000000..5c1bda28 --- /dev/null +++ b/cursor/requirements.txt @@ -0,0 +1,5 @@ +fastapi +uvicorn[standard] +psycopg2-binary +sqlalchemy +pydantic \ No newline at end of file diff --git a/server/app/curd.py b/server/app/curd.py index 4572e5e0..7eb32e19 100644 --- a/server/app/curd.py +++ b/server/app/curd.py @@ -62,7 +62,7 @@ def get_items(req_body: models.RequestBody): limit = req_body.limit filter_by_miner_score = f"score>={req_body.filters.min_score}" if req_body.filters.min_score else "" - filter_by_miner_similarity = f"score>={req_body.filters.min_similarity}" if req_body.filters.min_similarity else "" + filter_by_miner_similarity = f"similarity>={req_body.filters.min_similarity}" if req_body.filters.min_similarity else "" filter_by_provider = f"provider='{req_body.filters.provider}'" if req_body.filters.provider else "" filter_by_model = f"model='{req_body.filters.model}'" if req_body.filters.model else "" filter_by_min_timestamp = f"timestamp>={req_body.filters.min_timestamp}" if req_body.filters.min_timestamp else "" @@ -70,7 +70,7 @@ def get_items(req_body: models.RequestBody): filter_by_epoch_num = f"epoch_num={req_body.filters.epoch_num}" if req_body.filters.epoch_num else "" filter_by_block_num = f"block_num={req_body.filters.block_num}" if req_body.filters.block_num else "" filter_by_cycle_num = f"cycle_num={req_body.filters.cycle_num}" if req_body.filters.cycle_num else "" - filter_by_name = f"name={req_body.filters.name}" if req_body.filters.name else "" + filter_by_name = f"name='{req_body.filters.name}'" if req_body.filters.name else "" search_by_uid_or_hotkey = (f"miner_uid=%s" if str(req_body.search).isdigit() else f"miner_hot_key like %s") if req_body.search else "" conditions = [filter_by_miner_score, filter_by_miner_similarity, filter_by_provider, filter_by_model, diff --git a/validators/services/capacity.py b/validators/services/capacity.py index 6bfb1acf..4d92c8bd 100644 --- a/validators/services/capacity.py +++ b/validators/services/capacity.py @@ -32,6 +32,25 @@ async def query_capacity_to_miners(self, available_uids): if isinstance(resp, Exception): bt.logging.error(f"exception happens while querying capacity to miner {uid}, {resp}") else: - uid_to_capacity[uid] = resp.bandwidth_rpm + uid_to_capacity[uid] = self.validate_capacity(resp.bandwidth_rpm) self.uid_to_capacity = deepcopy(uid_to_capacity) return uid_to_capacity + + def validate_capacity(self, bandwidth): + try: + open_ai_cap = bandwidth.get("OpenAI").get("gpt-4o") + anthropic_cap = bandwidth.get("Anthropic").get("claude-3-5-sonnet-20240620") + groq_cap = bandwidth.get("Groq").get("llama-3.1-70b-versatile") + return { + "OpenAI": { + "gpt-4o": int(open_ai_cap) + }, + "Anthropic": { + "claude-3-5-sonnet-20240620": int(anthropic_cap) + }, + "Groq": { + "llama-3.1-70b-versatile": int(groq_cap) + } + } + except Exception as err: + return None diff --git a/validators/services/validators/base_validator.py b/validators/services/validators/base_validator.py index 35a84434..5cd93915 100644 --- a/validators/services/validators/base_validator.py +++ b/validators/services/validators/base_validator.py @@ -130,6 +130,7 @@ def get_uid_to_scores_dict(self, uid_to_query_resps, scored_responses: tuple[flo uid_provider_model_scores_avg_dict[key] = (avg_score, syns) # apply weight for each model and calculate score based on weight of models. + bt.logging.info("building table data.") uid_scores_dict = defaultdict(float) table_data = [ ["uid", "provider", "model", 'similarity', 'weight', 'bandwidth', 'weighted_score'] @@ -157,6 +158,7 @@ def get_uid_to_scores_dict(self, uid_to_query_resps, scored_responses: tuple[flo syn.similarity = avg_score syn.score = weighted_score + bt.logging.info("showing pretty formated table scores.") self.show_pretty_table_score(table_data) if not len(uid_scores_dict): diff --git a/validators/task_manager.py b/validators/task_manager.py index 7a8014f9..ac0af80e 100644 --- a/validators/task_manager.py +++ b/validators/task_manager.py @@ -1,5 +1,6 @@ import asyncio import random +import traceback from copy import deepcopy import bittensor as bt @@ -28,21 +29,24 @@ def get_remaining_bandwidth(self, uid, provider, model): def update_remain_capacity_based_on_new_capacity(self, new_uid_to_capacity): - for uid, capacity in new_uid_to_capacity.items(): - if not capacity: - continue - for provider, model_to_cap in capacity.items(): - for model, cap in model_to_cap.items(): - if self.get_remaining_bandwidth(uid, provider, model) is None: - utils.update_nested_dict(self.remain_resources, keys=[uid, provider, model], value=cap) - else: - diff = self.uid_to_capacity[uid][provider][model] - cap - if diff: - bt.logging.debug(f"diff {diff} found in {uid}, {provider}, {model}") - self.remain_resources[uid][provider][model] -= diff + try: + for uid, capacity in new_uid_to_capacity.items(): + if not capacity: + continue + for provider, model_to_cap in capacity.items(): + for model, cap in model_to_cap.items(): + if self.get_remaining_bandwidth(uid, provider, model) is None: + utils.update_nested_dict(self.remain_resources, keys=[uid, provider, model], value=cap) + else: + diff = self.uid_to_capacity[uid][provider][model] - cap + if diff: + bt.logging.debug(f"diff {diff} found in {uid}, {provider}, {model}") + self.remain_resources[uid][provider][model] -= diff - bt.logging.debug(f"remain_resources after epoch = {self.remain_resources}") - self.uid_to_capacity = deepcopy(self.remain_resources) + bt.logging.debug(f"remain_resources after epoch = {self.remain_resources}") + self.uid_to_capacity = deepcopy(self.remain_resources) + except Exception as err: + bt.logging.info(f"{err}, {traceback.format_exc()}") @error_handler def assign_task(self, synapse: ALL_SYNAPSE_TYPE): diff --git a/validators/weight_setter.py b/validators/weight_setter.py index aa6260a5..64644bcc 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -86,6 +86,8 @@ def __init__(self, config, cache: QueryResponseCache, loop=None): bt.logging.info(f"total loaded questions are {len(self.queries)}") self.set_up_next_block_to_wait() # Set up async tasks + # score_thread = threading.Thread(target=self.start_scoring_process) + # score_thread.start() self.loop.create_task(self.process_queries_from_database()) self.saving_datas = [] @@ -102,6 +104,9 @@ def __init__(self, config, cache: QueryResponseCache, loop=None): def start_axon_server(self): asyncio.run(self.consume_organic_queries()) + def start_scoring_process(self): + asyncio.run(self.process_queries_from_database()) + def process_synthetic_tasks(self): bt.logging.info("starting synthetic tasks.") asyncio.run(self.perform_synthetic_queries()) @@ -111,10 +116,10 @@ def saving_resp_answers_from_miners(self): self.cache.set_vali_info(vali_uid=self.my_uid, vali_hotkey=self.wallet.hotkey.ss58_address) while True: if not self.saving_datas: - time.sleep(1) + time.sleep(10) + bt.logging.trace("no datas for sending to central server") else: bt.logging.info(f"saving responses...") - start_time = time.time() self.cache.set_cache_in_batch(self.url, [item.get('synapse') for item in self.saving_datas], block_num=self.current_block or 0, cycle_num=(self.current_block or 0) // 36, @@ -294,6 +299,7 @@ async def perform_synthetic_queries(self): await asyncio.sleep(12) continue self.set_up_next_block_to_wait() + # await asyncio.sleep(432) self.loop.create_task(self.perform_synthetic_queries_one_cycle()) def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks): @@ -387,17 +393,16 @@ async def set_weights(self, scores): # Update the moving average scores self.moving_average_scores = alpha * scores + (1 - alpha) * self.moving_average_scores bt.logging.info(f"Updated moving average of weights: {self.moving_average_scores}") - await self.run_sync_in_async( - lambda: self.subtensor.set_weights( - netuid=self.config.netuid, - wallet=self.wallet, - uids=self.metagraph.uids, - weights=self.moving_average_scores, - wait_for_inclusion=True, - version_key=cortext.__weights_version__, - ) + start_time = time.time() + success, msg = self.subtensor.set_weights( + netuid=self.config.netuid, + wallet=self.wallet, + uids=self.metagraph.uids, + weights=self.moving_average_scores, + wait_for_inclusion=True, + version_key=cortext.__weights_version__, ) - bt.logging.success("Successfully included weights in block.") + bt.logging.info(f"done setting weights: {success}, {msg}. {time.time() - start_time} elaspsed for updating weights.") def blacklist_prompt(self, synapse: StreamPrompting) -> Tuple[bool, str]: blacklist = self.base_blacklist(synapse, cortext.PROMPT_BLACKLIST_STAKE) @@ -476,7 +481,8 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings: async def prompt(self, synapse: StreamPrompting) -> StreamingSynapse.BTStreamingResponse: bt.logging.info(f"Received {synapse}") - if len(json.dumps(synapse.messages)) > 1024: + contents = " ".join([message.get("content") for message in synapse.messages]) + if len(contents.split()) > 2048: raise HTTPException(status_code=413, detail="Request entity too large") async def _prompt(query_synapse: StreamPrompting, send: Send): @@ -522,7 +528,7 @@ async def handle_response(resp): responses = self.dendrite.call_stream( target_axon=axon, synapse=synapse, - timeout=synapse.timeout, + timeout=synapse.timeout ) return await handle_response(responses) @@ -609,7 +615,7 @@ async def process_queries_from_database(self): # with all query_respones, select one per uid, provider, model randomly and score them. score_tasks = self.get_scoring_tasks_from_query_responses(queries_to_process) - resps = await asyncio.gather(*score_tasks) + resps = await asyncio.gather(*score_tasks, return_exceptions=True) resps = [item for item in resps if item is not None] # Update total_scores and score_counts for uid_scores_dict, _, _ in resps: