From b08f4c32d89186c0f57974ead8a8de571ed4399e Mon Sep 17 00:00:00 2001 From: acer-king Date: Fri, 8 Nov 2024 03:36:10 -0800 Subject: [PATCH 01/16] add more logging --- cortext/protocol.py | 1 + validators/services/validators/base_validator.py | 2 ++ validators/weight_setter.py | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cortext/protocol.py b/cortext/protocol.py index 559bcd3e..1761f872 100644 --- a/cortext/protocol.py +++ b/cortext/protocol.py @@ -379,6 +379,7 @@ async def process_streaming_response(self, response: StreamingResponse, organic= self.completion += tokens yield tokens except asyncio.TimeoutError as err: + self.completion += remain_chunk yield remain_chunk diff --git a/validators/services/validators/base_validator.py b/validators/services/validators/base_validator.py index 35a84434..5cd93915 100644 --- a/validators/services/validators/base_validator.py +++ b/validators/services/validators/base_validator.py @@ -130,6 +130,7 @@ def get_uid_to_scores_dict(self, uid_to_query_resps, scored_responses: tuple[flo uid_provider_model_scores_avg_dict[key] = (avg_score, syns) # apply weight for each model and calculate score based on weight of models. + bt.logging.info("building table data.") uid_scores_dict = defaultdict(float) table_data = [ ["uid", "provider", "model", 'similarity', 'weight', 'bandwidth', 'weighted_score'] @@ -157,6 +158,7 @@ def get_uid_to_scores_dict(self, uid_to_query_resps, scored_responses: tuple[flo syn.similarity = avg_score syn.score = weighted_score + bt.logging.info("showing pretty formated table scores.") self.show_pretty_table_score(table_data) if not len(uid_scores_dict): diff --git a/validators/weight_setter.py b/validators/weight_setter.py index aa6260a5..f18c0145 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -522,7 +522,7 @@ async def handle_response(resp): responses = self.dendrite.call_stream( target_axon=axon, synapse=synapse, - timeout=synapse.timeout, + timeout=synapse.timeout ) return await handle_response(responses) From d9fa0b8305a16e2845987f0319ccbc2a6a6e63ae Mon Sep 17 00:00:00 2001 From: acer-king Date: Fri, 8 Nov 2024 03:47:37 -0800 Subject: [PATCH 02/16] run scoring task in different thread. --- validators/weight_setter.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index f18c0145..f6db94e1 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -86,7 +86,8 @@ def __init__(self, config, cache: QueryResponseCache, loop=None): bt.logging.info(f"total loaded questions are {len(self.queries)}") self.set_up_next_block_to_wait() # Set up async tasks - self.loop.create_task(self.process_queries_from_database()) + score_thread = threading.Thread(target=self.start_scoring_process) + score_thread.start() self.saving_datas = [] self.url = "http://ec2-3-239-8-190.compute-1.amazonaws.com:8000/items" @@ -102,6 +103,9 @@ def __init__(self, config, cache: QueryResponseCache, loop=None): def start_axon_server(self): asyncio.run(self.consume_organic_queries()) + def start_scoring_process(self): + asyncio.run(self.process_queries_from_database()) + def process_synthetic_tasks(self): bt.logging.info("starting synthetic tasks.") asyncio.run(self.perform_synthetic_queries()) From 4dd657404ebbc89b501583fac6f552d8321a4ca7 Mon Sep 17 00:00:00 2001 From: acer-king Date: Fri, 8 Nov 2024 06:28:02 -0800 Subject: [PATCH 03/16] revert back to the main thread --- validators/weight_setter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index f6db94e1..88db96d1 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -86,8 +86,9 @@ def __init__(self, config, cache: QueryResponseCache, loop=None): bt.logging.info(f"total loaded questions are {len(self.queries)}") self.set_up_next_block_to_wait() # Set up async tasks - score_thread = threading.Thread(target=self.start_scoring_process) - score_thread.start() + # score_thread = threading.Thread(target=self.start_scoring_process) + # score_thread.start() + self.loop.create_task(self.process_queries_from_database()) self.saving_datas = [] self.url = "http://ec2-3-239-8-190.compute-1.amazonaws.com:8000/items" From f92e19c73f76f37cb9bf7df449786c0e4cecff03 Mon Sep 17 00:00:00 2001 From: acer-king Date: Fri, 8 Nov 2024 09:49:27 -0800 Subject: [PATCH 04/16] too big request issue has been fixed --- validators/weight_setter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 88db96d1..9d9ab471 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -481,7 +481,8 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings: async def prompt(self, synapse: StreamPrompting) -> StreamingSynapse.BTStreamingResponse: bt.logging.info(f"Received {synapse}") - if len(json.dumps(synapse.messages)) > 1024: + contents = "".join([message.get("content") for message in synapse.messages]) + if len(contents) > 1024: raise HTTPException(status_code=413, detail="Request entity too large") async def _prompt(query_synapse: StreamPrompting, send: Send): From 9ca2b6f025f77a994326d0d2553e3f74b6882206 Mon Sep 17 00:00:00 2001 From: acer-king Date: Fri, 8 Nov 2024 11:15:47 -0800 Subject: [PATCH 05/16] add more log --- validators/weight_setter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 9d9ab471..9796f330 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -116,7 +116,8 @@ def saving_resp_answers_from_miners(self): self.cache.set_vali_info(vali_uid=self.my_uid, vali_hotkey=self.wallet.hotkey.ss58_address) while True: if not self.saving_datas: - time.sleep(1) + time.sleep(10) + bt.logging.trace("no datas for sending to central server") else: bt.logging.info(f"saving responses...") start_time = time.time() @@ -615,7 +616,7 @@ async def process_queries_from_database(self): # with all query_respones, select one per uid, provider, model randomly and score them. score_tasks = self.get_scoring_tasks_from_query_responses(queries_to_process) - resps = await asyncio.gather(*score_tasks) + resps = await asyncio.gather(*score_tasks,return_exceptions=True) resps = [item for item in resps if item is not None] # Update total_scores and score_counts for uid_scores_dict, _, _ in resps: From a9942ea6a311aaa5aae4c633eac4ac51e4453b9e Mon Sep 17 00:00:00 2001 From: acer-king Date: Sat, 9 Nov 2024 02:06:19 -0800 Subject: [PATCH 06/16] run set_weights inside same thread. --- validators/weight_setter.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 9796f330..3f074e7f 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -120,7 +120,6 @@ def saving_resp_answers_from_miners(self): bt.logging.trace("no datas for sending to central server") else: bt.logging.info(f"saving responses...") - start_time = time.time() self.cache.set_cache_in_batch(self.url, [item.get('synapse') for item in self.saving_datas], block_num=self.current_block or 0, cycle_num=(self.current_block or 0) // 36, @@ -393,15 +392,14 @@ async def set_weights(self, scores): # Update the moving average scores self.moving_average_scores = alpha * scores + (1 - alpha) * self.moving_average_scores bt.logging.info(f"Updated moving average of weights: {self.moving_average_scores}") - await self.run_sync_in_async( - lambda: self.subtensor.set_weights( - netuid=self.config.netuid, - wallet=self.wallet, - uids=self.metagraph.uids, - weights=self.moving_average_scores, - wait_for_inclusion=True, - version_key=cortext.__weights_version__, - ) + + self.subtensor.set_weights( + netuid=self.config.netuid, + wallet=self.wallet, + uids=self.metagraph.uids, + weights=self.moving_average_scores, + wait_for_inclusion=True, + version_key=cortext.__weights_version__, ) bt.logging.success("Successfully included weights in block.") @@ -616,7 +614,7 @@ async def process_queries_from_database(self): # with all query_respones, select one per uid, provider, model randomly and score them. score_tasks = self.get_scoring_tasks_from_query_responses(queries_to_process) - resps = await asyncio.gather(*score_tasks,return_exceptions=True) + resps = await asyncio.gather(*score_tasks, return_exceptions=True) resps = [item for item in resps if item is not None] # Update total_scores and score_counts for uid_scores_dict, _, _ in resps: From 325f9d180e83fdb2640ca5bb44bc398e94c4818f Mon Sep 17 00:00:00 2001 From: acer-king Date: Sun, 10 Nov 2024 07:09:22 -0800 Subject: [PATCH 07/16] update weights --- validators/weight_setter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 3f074e7f..e652cfc7 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -392,7 +392,7 @@ async def set_weights(self, scores): # Update the moving average scores self.moving_average_scores = alpha * scores + (1 - alpha) * self.moving_average_scores bt.logging.info(f"Updated moving average of weights: {self.moving_average_scores}") - + start_time = time.time() self.subtensor.set_weights( netuid=self.config.netuid, wallet=self.wallet, @@ -401,7 +401,7 @@ async def set_weights(self, scores): wait_for_inclusion=True, version_key=cortext.__weights_version__, ) - bt.logging.success("Successfully included weights in block.") + bt.logging.success(f"Successfully included weights in block. {time.time() - start_time} elaspsed for updating weights.") def blacklist_prompt(self, synapse: StreamPrompting) -> Tuple[bool, str]: blacklist = self.base_blacklist(synapse, cortext.PROMPT_BLACKLIST_STAKE) From bd90cf9fd5b06d33c79727324ebfabc7efaa02cc Mon Sep 17 00:00:00 2001 From: acer-king Date: Sun, 10 Nov 2024 20:03:44 -0800 Subject: [PATCH 08/16] disable table score showing for test --- validators/services/validators/base_validator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validators/services/validators/base_validator.py b/validators/services/validators/base_validator.py index 5cd93915..0535850b 100644 --- a/validators/services/validators/base_validator.py +++ b/validators/services/validators/base_validator.py @@ -158,8 +158,8 @@ def get_uid_to_scores_dict(self, uid_to_query_resps, scored_responses: tuple[flo syn.similarity = avg_score syn.score = weighted_score - bt.logging.info("showing pretty formated table scores.") - self.show_pretty_table_score(table_data) + # bt.logging.info("showing pretty formated table scores.") + # self.show_pretty_table_score(table_data) if not len(uid_scores_dict): validator_type = self.__class__.__name__ From 0bfcff13c85085f64e1860cee83aaa0897f8fe99 Mon Sep 17 00:00:00 2001 From: acer-king Date: Mon, 11 Nov 2024 05:30:23 -0800 Subject: [PATCH 09/16] min_similarity works --- server/app/curd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/app/curd.py b/server/app/curd.py index 4572e5e0..9da7644c 100644 --- a/server/app/curd.py +++ b/server/app/curd.py @@ -62,7 +62,7 @@ def get_items(req_body: models.RequestBody): limit = req_body.limit filter_by_miner_score = f"score>={req_body.filters.min_score}" if req_body.filters.min_score else "" - filter_by_miner_similarity = f"score>={req_body.filters.min_similarity}" if req_body.filters.min_similarity else "" + filter_by_miner_similarity = f"similarity>={req_body.filters.min_similarity}" if req_body.filters.min_similarity else "" filter_by_provider = f"provider='{req_body.filters.provider}'" if req_body.filters.provider else "" filter_by_model = f"model='{req_body.filters.model}'" if req_body.filters.model else "" filter_by_min_timestamp = f"timestamp>={req_body.filters.min_timestamp}" if req_body.filters.min_timestamp else "" From 2951acfd09a825ad450413bad6c8ba565214080f Mon Sep 17 00:00:00 2001 From: acer-king Date: Mon, 11 Nov 2024 06:49:13 -0800 Subject: [PATCH 10/16] enable table logging and disable is_cycle_end --- validators/services/validators/base_validator.py | 4 ++-- validators/weight_setter.py | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/validators/services/validators/base_validator.py b/validators/services/validators/base_validator.py index 0535850b..5cd93915 100644 --- a/validators/services/validators/base_validator.py +++ b/validators/services/validators/base_validator.py @@ -158,8 +158,8 @@ def get_uid_to_scores_dict(self, uid_to_query_resps, scored_responses: tuple[flo syn.similarity = avg_score syn.score = weighted_score - # bt.logging.info("showing pretty formated table scores.") - # self.show_pretty_table_score(table_data) + bt.logging.info("showing pretty formated table scores.") + self.show_pretty_table_score(table_data) if not len(uid_scores_dict): validator_type = self.__class__.__name__ diff --git a/validators/weight_setter.py b/validators/weight_setter.py index e652cfc7..474fea55 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -295,10 +295,11 @@ async def perform_synthetic_queries_one_cycle(self): async def perform_synthetic_queries(self): while True: - if not self.is_cycle_end(): - await asyncio.sleep(12) - continue - self.set_up_next_block_to_wait() + # if not self.is_cycle_end(): + # await asyncio.sleep(12) + # continue + # self.set_up_next_block_to_wait() + await asyncio.sleep(432) self.loop.create_task(self.perform_synthetic_queries_one_cycle()) def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks): From 5010077a7c2eb8a9ded0e30abbb9ff2764fba086 Mon Sep 17 00:00:00 2001 From: acer-king Date: Mon, 11 Nov 2024 08:06:54 -0800 Subject: [PATCH 11/16] fix name filter issue --- server/app/curd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/app/curd.py b/server/app/curd.py index 9da7644c..7eb32e19 100644 --- a/server/app/curd.py +++ b/server/app/curd.py @@ -70,7 +70,7 @@ def get_items(req_body: models.RequestBody): filter_by_epoch_num = f"epoch_num={req_body.filters.epoch_num}" if req_body.filters.epoch_num else "" filter_by_block_num = f"block_num={req_body.filters.block_num}" if req_body.filters.block_num else "" filter_by_cycle_num = f"cycle_num={req_body.filters.cycle_num}" if req_body.filters.cycle_num else "" - filter_by_name = f"name={req_body.filters.name}" if req_body.filters.name else "" + filter_by_name = f"name='{req_body.filters.name}'" if req_body.filters.name else "" search_by_uid_or_hotkey = (f"miner_uid=%s" if str(req_body.search).isdigit() else f"miner_hot_key like %s") if req_body.search else "" conditions = [filter_by_miner_score, filter_by_miner_similarity, filter_by_provider, filter_by_model, From ec235c608d194475051b863d80b918f6c46dd72d Mon Sep 17 00:00:00 2001 From: acer-king Date: Mon, 11 Nov 2024 09:32:05 -0800 Subject: [PATCH 12/16] detail logs in updating weight_setter --- validators/weight_setter.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index 474fea55..b17fd273 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -295,11 +295,11 @@ async def perform_synthetic_queries_one_cycle(self): async def perform_synthetic_queries(self): while True: - # if not self.is_cycle_end(): - # await asyncio.sleep(12) - # continue - # self.set_up_next_block_to_wait() - await asyncio.sleep(432) + if not self.is_cycle_end(): + await asyncio.sleep(12) + continue + self.set_up_next_block_to_wait() + # await asyncio.sleep(432) self.loop.create_task(self.perform_synthetic_queries_one_cycle()) def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks): @@ -394,7 +394,7 @@ async def set_weights(self, scores): self.moving_average_scores = alpha * scores + (1 - alpha) * self.moving_average_scores bt.logging.info(f"Updated moving average of weights: {self.moving_average_scores}") start_time = time.time() - self.subtensor.set_weights( + success, msg = self.subtensor.set_weights( netuid=self.config.netuid, wallet=self.wallet, uids=self.metagraph.uids, @@ -402,7 +402,7 @@ async def set_weights(self, scores): wait_for_inclusion=True, version_key=cortext.__weights_version__, ) - bt.logging.success(f"Successfully included weights in block. {time.time() - start_time} elaspsed for updating weights.") + bt.logging.info(f"done setting weights: {success}, {msg}. {time.time() - start_time} elaspsed for updating weights.") def blacklist_prompt(self, synapse: StreamPrompting) -> Tuple[bool, str]: blacklist = self.base_blacklist(synapse, cortext.PROMPT_BLACKLIST_STAKE) From 0073cea5bf5d64fcbd6dcec6bdb56373f8c69255 Mon Sep 17 00:00:00 2001 From: acer-king Date: Mon, 11 Nov 2024 11:24:09 -0800 Subject: [PATCH 13/16] increase token limit --- validators/weight_setter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index b17fd273..64644bcc 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -481,8 +481,8 @@ async def embeddings(self, synapse: Embeddings) -> Embeddings: async def prompt(self, synapse: StreamPrompting) -> StreamingSynapse.BTStreamingResponse: bt.logging.info(f"Received {synapse}") - contents = "".join([message.get("content") for message in synapse.messages]) - if len(contents) > 1024: + contents = " ".join([message.get("content") for message in synapse.messages]) + if len(contents.split()) > 2048: raise HTTPException(status_code=413, detail="Request entity too large") async def _prompt(query_synapse: StreamPrompting, send: Send): From 5f050da7f71b8d595624edbf4e50739ab6ab911f Mon Sep 17 00:00:00 2001 From: acer-king Date: Tue, 12 Nov 2024 03:58:31 -0800 Subject: [PATCH 14/16] add exception handler in updating capacity module --- validators/task_manager.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/validators/task_manager.py b/validators/task_manager.py index 7a8014f9..ac0af80e 100644 --- a/validators/task_manager.py +++ b/validators/task_manager.py @@ -1,5 +1,6 @@ import asyncio import random +import traceback from copy import deepcopy import bittensor as bt @@ -28,21 +29,24 @@ def get_remaining_bandwidth(self, uid, provider, model): def update_remain_capacity_based_on_new_capacity(self, new_uid_to_capacity): - for uid, capacity in new_uid_to_capacity.items(): - if not capacity: - continue - for provider, model_to_cap in capacity.items(): - for model, cap in model_to_cap.items(): - if self.get_remaining_bandwidth(uid, provider, model) is None: - utils.update_nested_dict(self.remain_resources, keys=[uid, provider, model], value=cap) - else: - diff = self.uid_to_capacity[uid][provider][model] - cap - if diff: - bt.logging.debug(f"diff {diff} found in {uid}, {provider}, {model}") - self.remain_resources[uid][provider][model] -= diff + try: + for uid, capacity in new_uid_to_capacity.items(): + if not capacity: + continue + for provider, model_to_cap in capacity.items(): + for model, cap in model_to_cap.items(): + if self.get_remaining_bandwidth(uid, provider, model) is None: + utils.update_nested_dict(self.remain_resources, keys=[uid, provider, model], value=cap) + else: + diff = self.uid_to_capacity[uid][provider][model] - cap + if diff: + bt.logging.debug(f"diff {diff} found in {uid}, {provider}, {model}") + self.remain_resources[uid][provider][model] -= diff - bt.logging.debug(f"remain_resources after epoch = {self.remain_resources}") - self.uid_to_capacity = deepcopy(self.remain_resources) + bt.logging.debug(f"remain_resources after epoch = {self.remain_resources}") + self.uid_to_capacity = deepcopy(self.remain_resources) + except Exception as err: + bt.logging.info(f"{err}, {traceback.format_exc()}") @error_handler def assign_task(self, synapse: ALL_SYNAPSE_TYPE): From 51780d294bf43138397cebc514f2332af94d9c8b Mon Sep 17 00:00:00 2001 From: acer-king Date: Tue, 12 Nov 2024 05:22:40 -0800 Subject: [PATCH 15/16] cursor support --- cursor/Dockerfile | 18 ++++ cursor/app.py | 30 ------ cursor/app/__init__.py | 0 cursor/app/core/__init__.py | 0 cursor/app/core/middleware.py | 33 ++++++ cursor/app/database.py | 21 ++++ cursor/app/endpoints/__init__.py | 0 cursor/app/endpoints/text.py | 173 +++++++++++++++++++++++++++++++ cursor/app/main.py | 49 +++++++++ cursor/app/models.py | 5 + cursor/docker-compose.yml | 30 ++++++ cursor/requirements.txt | 5 + 12 files changed, 334 insertions(+), 30 deletions(-) create mode 100644 cursor/Dockerfile delete mode 100644 cursor/app.py create mode 100644 cursor/app/__init__.py create mode 100644 cursor/app/core/__init__.py create mode 100644 cursor/app/core/middleware.py create mode 100644 cursor/app/database.py create mode 100644 cursor/app/endpoints/__init__.py create mode 100644 cursor/app/endpoints/text.py create mode 100644 cursor/app/main.py create mode 100644 cursor/app/models.py create mode 100644 cursor/docker-compose.yml create mode 100644 cursor/requirements.txt diff --git a/cursor/Dockerfile b/cursor/Dockerfile new file mode 100644 index 00000000..3e928ffe --- /dev/null +++ b/cursor/Dockerfile @@ -0,0 +1,18 @@ +# Use official Python image +FROM python:3.10 + +# Set working directory +WORKDIR /app + +# Copy and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the app files to the container +COPY . . + +# Expose the FastAPI port +EXPOSE 8000 + +# Start FastAPI app +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/cursor/app.py b/cursor/app.py deleted file mode 100644 index 543e00ba..00000000 --- a/cursor/app.py +++ /dev/null @@ -1,30 +0,0 @@ -from fastapi import FastAPI, HTTPException -from fastapi.responses import StreamingResponse -import httpx -import asyncio - -app = FastAPI() -organic_server_url = "https://0.0.0.0:8000" - - -@app.post("/apis.datura.ai/s18_sigma") -async def stream_prompt(): - try: - # Create an asynchronous HTTP client session - async with httpx.AsyncClient() as client: - # Make a streaming GET request to the external server - response = await client.stream("GET", organic_server_url) - response.raise_for_status() # Raise an error for bad responses (4xx, 5xx) - - # Define an async generator to read chunks of data from the external server - async def stream_generator(): - async for chunk in response.aiter_bytes(): - yield chunk - await asyncio.sleep(0) # Allow other tasks to run - - # Return a StreamingResponse that forwards the data from the external server - return StreamingResponse(stream_generator(), media_type="application/json") - except httpx.HTTPStatusError as e: - raise HTTPException(status_code=e.response.status_code, detail=str(e)) - except httpx.RequestError as e: - raise HTTPException(status_code=500, detail="Failed to fetch data from external server") diff --git a/cursor/app/__init__.py b/cursor/app/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cursor/app/core/__init__.py b/cursor/app/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cursor/app/core/middleware.py b/cursor/app/core/middleware.py new file mode 100644 index 00000000..e16641a4 --- /dev/null +++ b/cursor/app/core/middleware.py @@ -0,0 +1,33 @@ +import time +from fastapi import HTTPException + + +async def verify_api_key_rate_limit(config, api_key): + # NOTE: abit dangerous but very useful + if not config.prod: + if api_key == "test": + return True + + rate_limit_key = f"rate_limit:{api_key}" + rate_limit = await config.redis_db.get(rate_limit_key) + if rate_limit is None: + async with await config.psql_db.connection() as connection: + # rate_limit = await get_api_key_rate_limit(connection, api_key) + if rate_limit is None: + raise HTTPException(status_code=403, detail="Invalid API key") + await config.redis_db.set(rate_limit_key, rate_limit, ex=30) + else: + rate_limit = int(rate_limit) + + minute = time.time() // 60 + current_rate_limit_key = f"current_rate_limit:{api_key}:{minute}" + current_rate_limit = await config.redis_db.get(current_rate_limit_key) + if current_rate_limit is None: + current_rate_limit = 0 + await config.redis_db.expire(current_rate_limit_key, 60) + else: + current_rate_limit = int(current_rate_limit) + + await config.redis_db.incr(current_rate_limit_key) + if current_rate_limit >= rate_limit: + raise HTTPException(status_code=429, detail="Too many requests") diff --git a/cursor/app/database.py b/cursor/app/database.py new file mode 100644 index 00000000..92f1c93f --- /dev/null +++ b/cursor/app/database.py @@ -0,0 +1,21 @@ +import psycopg2 +import os + +DATABASE_URL = os.getenv("DATABASE_URL") +TABEL_NAME = 'query_resp_data' +# PostgreSQL connection parameters +conn = psycopg2.connect(DATABASE_URL) + +# Create a cursor object to interact with the database +cur = conn.cursor() + + +async def create_table(app): + global conn, cur, TABEL_NAME + try: + pass + + except Exception as e: + print(f"Error creating table: {e}") + +create_table(None) diff --git a/cursor/app/endpoints/__init__.py b/cursor/app/endpoints/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cursor/app/endpoints/text.py b/cursor/app/endpoints/text.py new file mode 100644 index 00000000..18e7bcca --- /dev/null +++ b/cursor/app/endpoints/text.py @@ -0,0 +1,173 @@ +import json +from typing import Any, AsyncGenerator +import uuid +from fastapi import Depends, HTTPException +from fastapi.responses import JSONResponse, StreamingResponse +from redis.asyncio import Redis +from fastapi.routing import APIRouter +from cursor.app.models import RequestModel +import asyncio +from redis.asyncio.client import PubSub +import time + +COUNTER_TEXT_GENERATION_ERROR = metrics.get_meter(__name__).create_counter("validator.entry_node.text.error") +COUNTER_TEXT_GENERATION_SUCCESS = metrics.get_meter(__name__).create_counter("validator.entry_node.text.success") +GAUGE_TOKENS_PER_SEC = metrics.get_meter(__name__).create_gauge( + "validator.entry_node.text.tokens_per_sec", + description="Average tokens per second metric for LLM streaming for an organic LLM query" +) + + +def _construct_organic_message(payload: dict, job_id: str, task: str) -> str: + return json.dumps({"query_type": gcst.ORGANIC, "query_payload": payload, "task": task, "job_id": job_id}) + + +async def _wait_for_acknowledgement(pubsub: PubSub, job_id: str) -> bool: + async for message in pubsub.listen(): + channel = message["channel"].decode() + if channel == f"{gcst.ACKNLOWEDGED}:{job_id}" and message["type"] == "message": + logger.info(f"Job {job_id} confirmed by worker") + break + await pubsub.unsubscribe(f"{gcst.ACKNLOWEDGED}:{job_id}") + return True + + +async def _stream_results(pubsub: PubSub, job_id: str, task: str, first_chunk: str, start_time: float) -> \ + AsyncGenerator[str, str]: + yield first_chunk + num_tokens = 0 + async for message in pubsub.listen(): + channel = message["channel"].decode() + + if channel == f"{rcst.JOB_RESULTS}:{job_id}" and message["type"] == "message": + result = json.loads(message["data"].decode()) + if gcst.ACKNLOWEDGED in result: + continue + status_code = result[gcst.STATUS_CODE] + if status_code >= 400: + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": task, "kind": "nth_chunk_timeout", + "status_code": status_code}) + raise HTTPException(status_code=status_code, detail=result[gcst.ERROR_MESSAGE]) + + content = result[gcst.CONTENT] + num_tokens += 1 + yield content + if "[DONE]" in content: + break + COUNTER_TEXT_GENERATION_SUCCESS.add(1, {"task": task, "status_code": 200}) + completion_time = time.time() - start_time + + tps = num_tokens / completion_time + GAUGE_TOKENS_PER_SEC.set(tps, {"task": task}) + logger.info(f"Tokens per second for job_id: {job_id}, task: {task}: {tps}") + + await pubsub.unsubscribe(f"{rcst.JOB_RESULTS}:{job_id}") + + +async def _get_first_chunk(pubsub: PubSub, job_id: str) -> str | None: + async for message in pubsub.listen(): + if message["type"] == "message" and message["channel"].decode() == f"{rcst.JOB_RESULTS}:{job_id}": + result = json.loads(message["data"].decode()) + if gcst.STATUS_CODE in result and result[gcst.STATUS_CODE] >= 400: + raise HTTPException(status_code=result[gcst.STATUS_CODE], detail=result[gcst.ERROR_MESSAGE]) + return result[gcst.CONTENT] + return None + + +async def make_stream_organic_query( + redis_db: Redis, + payload: dict[str, Any], + task: str, +) -> AsyncGenerator[str, str]: + job_id = uuid.uuid4().hex + organic_message = _construct_organic_message(payload=payload, job_id=job_id, task=task) + + pubsub = redis_db.pubsub() + await pubsub.subscribe(f"{gcst.ACKNLOWEDGED}:{job_id}") + await redis_db.lpush(rcst.QUERY_QUEUE_KEY, organic_message) # type: ignore + + first_chunk = None + try: + await asyncio.wait_for(_wait_for_acknowledgement(pubsub, job_id), timeout=1) + except asyncio.TimeoutError: + logger.error( + f"Query node down? No confirmation received for job {job_id} within timeout period. Task: {task}, model: {payload['model']}" + ) + COUNTER_TEXT_GENERATION_ERROR.add(1, + {"task": task, "kind": "redis_acknowledgement_timeout", "status_code": 500}) + raise HTTPException(status_code=500, detail="Unable to process request ; redis_acknowledgement_timeout") + + await pubsub.subscribe(f"{rcst.JOB_RESULTS}:{job_id}") + logger.info("Here waiting for a message!") + start_time = time.time() + try: + first_chunk = await asyncio.wait_for(_get_first_chunk(pubsub, job_id), timeout=2) + except asyncio.TimeoutError: + logger.error( + f"Query node down? Timed out waiting for the first chunk of results for job {job_id}. Task: {task}, model: {payload['model']}" + ) + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": task, "kind": "first_chunk_timeout", "status_code": 500}) + raise HTTPException(status_code=500, detail="Unable to process request ; first_chunk_timeout") + + if first_chunk is None: + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": task, "kind": "first_chunk_missing", "status_code": 500}) + raise HTTPException(status_code=500, detail="Unable to process request ; first_chunk_missing") + return _stream_results(pubsub, job_id, task, first_chunk, start_time) + + +async def _handle_no_stream(text_generator: AsyncGenerator[str, str]) -> JSONResponse: + all_content = "" + async for chunk in text_generator: + chunks = load_sse_jsons(chunk) + if isinstance(chunks, list): + for chunk in chunks: + content = chunk["choices"][0]["delta"]["content"] + all_content += content + if content == "": + break + + return JSONResponse({"choices": [{"delta": {"content": all_content}}]}) + + +async def chat( + chat_request: request_models.ChatRequest, + config: Config = Depends(get_config), +) -> StreamingResponse | JSONResponse: + payload = request_models.chat_to_payload(chat_request) + payload.temperature = 0.5 + + try: + text_generator = await make_stream_organic_query( + redis_db=config.redis_db, + payload=payload.model_dump(), + task=payload.model, + ) + + logger.info("Here returning a response!") + + if chat_request.stream: + return StreamingResponse(text_generator, media_type="text/event-stream") + else: + return await _handle_no_stream(text_generator) + + except HTTPException as http_exc: + COUNTER_TEXT_GENERATION_ERROR.add(1, + {"task": payload.model, "kind": type(http_exc).__name__, "status_code": 500}) + logger.info(f"HTTPException in chat endpoint: {str(http_exc)}") + raise http_exc + + except Exception as e: + COUNTER_TEXT_GENERATION_ERROR.add(1, {"task": payload.model, "kind": type(e).__name__, "status_code": 500}) + logger.error(f"Unexpected error in chat endpoint: {str(e)}") + raise HTTPException(status_code=500, detail="An unexpected error occurred") + + +router = APIRouter() +router.add_api_route( + "/v1/chat/completions", + chat, + methods=["POST", "OPTIONS"], + tags=["StreamPrompting"], + response_model=None, + dependencies=[Depends(verify_api_key_rate_limit)], +) diff --git a/cursor/app/main.py b/cursor/app/main.py new file mode 100644 index 00000000..86e3d3e2 --- /dev/null +++ b/cursor/app/main.py @@ -0,0 +1,49 @@ +from contextlib import asynccontextmanager +from fastapi import FastAPI, Depends, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from . import curd, models, schemas +from .database import create_table, conn, cur +from typing import List +from .endpoints.text import router as chat_router + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Load the ML model + await create_table(None) + yield + + +app = FastAPI(lifespan=lifespan) +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Allows all origins +) +app.include_router(chat_router) + + +@app.on_event("shutdown") +async def shutdown_event(): + cur.close() + conn.close() + + +# Create an item +@app.post("/items") +def create_item(items: List[schemas.ItemCreate]): + return curd.create_items(items=items) + + +# Read all items +@app.post("/items/search") +def read_items(req_body: models.RequestBody): + items = curd.get_items(req_body) + return items + + +# Read a single item by ID +@app.get("/items/{p_key}", response_model=schemas.Item) +def read_item(p_key: int): + db_item = curd.get_item(p_key=p_key) + if db_item is None: + raise HTTPException(status_code=404, detail="Item not found") + return db_item diff --git a/cursor/app/models.py b/cursor/app/models.py new file mode 100644 index 00000000..f8bab3a1 --- /dev/null +++ b/cursor/app/models.py @@ -0,0 +1,5 @@ +from cortext.protocol import StreamPrompting + + +class RequestModel(StreamPrompting): + pass diff --git a/cursor/docker-compose.yml b/cursor/docker-compose.yml new file mode 100644 index 00000000..ec1b10b9 --- /dev/null +++ b/cursor/docker-compose.yml @@ -0,0 +1,30 @@ +version: "3.9" + +services: + db: + image: postgres:13 + restart: always + environment: + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} + volumes: + - postgres_data_score:/var/lib/postgresql/data + ports: + - "5432:5432" + + web: + build: . + restart: always + ports: + - "8000:8000" + depends_on: + - db + environment: + DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB} + POSTGRES_DB: ${POSTGRES_DB} + volumes: + - .:/app + +volumes: + postgres_data_score: \ No newline at end of file diff --git a/cursor/requirements.txt b/cursor/requirements.txt new file mode 100644 index 00000000..5c1bda28 --- /dev/null +++ b/cursor/requirements.txt @@ -0,0 +1,5 @@ +fastapi +uvicorn[standard] +psycopg2-binary +sqlalchemy +pydantic \ No newline at end of file From 5f881680d2db4409b6465118631b0d06ac202424 Mon Sep 17 00:00:00 2001 From: acer-king Date: Tue, 12 Nov 2024 05:59:02 -0800 Subject: [PATCH 16/16] validating capacity before being used --- validators/services/capacity.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/validators/services/capacity.py b/validators/services/capacity.py index 6bfb1acf..4d92c8bd 100644 --- a/validators/services/capacity.py +++ b/validators/services/capacity.py @@ -32,6 +32,25 @@ async def query_capacity_to_miners(self, available_uids): if isinstance(resp, Exception): bt.logging.error(f"exception happens while querying capacity to miner {uid}, {resp}") else: - uid_to_capacity[uid] = resp.bandwidth_rpm + uid_to_capacity[uid] = self.validate_capacity(resp.bandwidth_rpm) self.uid_to_capacity = deepcopy(uid_to_capacity) return uid_to_capacity + + def validate_capacity(self, bandwidth): + try: + open_ai_cap = bandwidth.get("OpenAI").get("gpt-4o") + anthropic_cap = bandwidth.get("Anthropic").get("claude-3-5-sonnet-20240620") + groq_cap = bandwidth.get("Groq").get("llama-3.1-70b-versatile") + return { + "OpenAI": { + "gpt-4o": int(open_ai_cap) + }, + "Anthropic": { + "claude-3-5-sonnet-20240620": int(anthropic_cap) + }, + "Groq": { + "llama-3.1-70b-versatile": int(groq_cap) + } + } + except Exception as err: + return None