diff --git a/.gitignore b/.gitignore index e8cec8aa..cb1ad171 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,8 @@ wandb/ validators/.ipynb_checkpoints/ **/validator.ipynb **/.env -**/Cortex.t.egg-info \ No newline at end of file +**/Cortex.t.egg-info +**/test.ipynb +.env +**/server/**/*.py +cache.db \ No newline at end of file diff --git a/README.md b/README.md index dd17332b..0d3b7980 100644 --- a/README.md +++ b/README.md @@ -200,6 +200,7 @@ Before starting make sure update your system and have pm2 installed to run the s ```bash apt update -y && apt-get install git -y && apt install python3-pip -y + ``` Download the repository, navigate to the folder and then create virtual env and install the necessary requirements with the following chained command. @@ -217,9 +218,11 @@ After installing it, copy `env.example` to `.env` and substitute all env vars with values appropriate for your accounts. ## Mining - +# step1. +go to cortext/constants.py and change bandwidth_to_model value as per limit of api. +currently we support only 3 models: "gpt-4o", "claude-3-5-sonnet-20240620", "llama-3.1-70b-versatile". +so don't add more than that. You can launch your miners via python3 using the following command. - ```bash bash start_miner.sh ``` diff --git a/cache.db b/cache.db new file mode 100644 index 00000000..125bc855 Binary files /dev/null and b/cache.db differ diff --git a/cortext/__init__.py b/cortext/__init__.py index c073a18f..0002530f 100644 --- a/cortext/__init__.py +++ b/cortext/__init__.py @@ -19,7 +19,7 @@ # version must stay on line 22 -__version__ = "4.0.10" +__version__ = "4.0.11" version_split = __version__.split(".") __spec_version__ = ( (1000 * int(version_split[0])) @@ -52,8 +52,6 @@ IMAGE_BLACKLIST_STAKE = 5000 EMBEDDING_BLACKLIST_STAKE = 5000 ISALIVE_BLACKLIST_STAKE = min(PROMPT_BLACKLIST_STAKE, IMAGE_BLACKLIST_STAKE, EMBEDDING_BLACKLIST_STAKE) -MIN_REQUEST_PERIOD = 2 -MAX_REQUESTS = 20 # must have the test_key whitelisted to avoid a global blacklist testnet_key = ["5EhEZN6soubtKJm8RN7ANx9FGZ2JezxBUFxr45cdsHtDp3Uk"] test_key = ["5DcRHcCwD33YsHfj4PX5j2evWLniR1wSWeNmpf5RXaspQT6t"] @@ -3769,3 +3767,6 @@ ALL_SYNAPSE_TYPE = Union[StreamPrompting, Embeddings, ImageResponse, IsAlive] + +REDIS_RESULT_STREAM = 'result_stream' +REDIS_RESULT = 'result' \ No newline at end of file diff --git a/cortext/constants.py b/cortext/constants.py new file mode 100644 index 00000000..d90cad6e --- /dev/null +++ b/cortext/constants.py @@ -0,0 +1,87 @@ +TEXT_MODEL = "gpt-4-turbo-2024-04-09" +TEXT_PROVIDER = "OpenAI" +TEXT_MAX_TOKENS = 4096 +TEXT_TEMPERATURE = 0.001 +TEXT_WEIGHT = 1 +TEXT_TOP_P = 0.01 +TEXT_TOP_K = 1 +VISION_MODELS = ["gpt-4o", "claude-3-opus-20240229", "anthropic.claude-3-sonnet-20240229-v1:0", + "claude-3-5-sonnet-20240620"] +TEXT_VALI_MODELS_WEIGHTS = { + # from https://openai.com/api/pricing/ + "OpenAI": { + "gpt-4o": 15.00, + # "gpt-3.5-turbo": 2.00, + # "o1-preview": 60.00, + # "o1-mini": 12.00, + }, + # from https://ai.google.dev/pricing + # "Gemini": { + # "gemini-1.5-flash": 0.30, + # "gemini-1.5-pro": 10.50, + # }, + # + "Anthropic": { + "claude-3-5-sonnet-20240620": 15.00, + # "claude-3-opus-20240229": 75, + # "claude-3-haiku-20240307": 1.25, + }, + # model IDs from https://console.groq.com/docs/tool-use?hss_channel=tw-842860575289819136 + # prices not available yet, default to bedrock pricing + # free tier: 30 rpm + "Groq": { + # "gemma2-9b-it": 0.22, + # "llama-3.1-8b-instant": 0.22, + "llama-3.1-70b-versatile": .99, + # "llama-3.1-405b-reasoning": 16, + # "mixtral-8x7b-32768": 0.7, + }, + # from https://aws.amazon.com/bedrock/pricing/ + # model IDs from https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html#model-ids-arns + "Bedrock": { + # "mistral.mixtral-8x7b-instruct-v0:1": 0.7, + # "mistral.mistral-large-2402-v1:0": 24, + # "meta.llama3-1-8b-instruct-v1:0": 0.22, + # "meta.llama3-1-70b-instruct-v1:0": 0.99, + # "meta.llama3-1-405b-instruct-v1:0": 16, + } +} + +bandwidth_to_model = { + "OpenAI": { + "gpt-4o": 2, + # "gpt-3.5-turbo": 1, + # "o1-preview": 1, + # "o1-mini": 1, + }, + # from https://ai.google.dev/pricing + # "Gemini": { + # "gemini-1.5-flash": 1, + # "gemini-1.5-pro": 1, + # }, + # + "Anthropic": { + "claude-3-5-sonnet-20240620": 2, + # "claude-3-opus-20240229": 1, + # "claude-3-haiku-20240307": 1, + }, + # model IDs from https://console.groq.com/docs/tool-use?hss_channel=tw-842860575289819136 + # prices not available yet, default to bedrock pricing + # free tier: 30 rpm + "Groq": { + # "gemma2-9b-it": 1, + # "llama-3.1-8b-instant": 1, + "llama-3.1-70b-versatile": 1, + # "llama-3.1-405b-reasoning": 16, + # "mixtral-8x7b-32768": 1, + }, + # from https://aws.amazon.com/bedrock/pricing/ + # model IDs from https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html#model-ids-arns + # "Bedrock": { + # "mistral.mixtral-8x7b-instruct-v0:1": 1, + # "mistral.mistral-large-2402-v1:0": 1, + # "meta.llama3-1-8b-instruct-v1:0": 1, + # "meta.llama3-1-70b-instruct-v1:0": 1, + # "meta.llama3-1-405b-instruct-v1:0": 16, + # } +} diff --git a/cortext/protocol.py b/cortext/protocol.py index 4fec3371..1b03f3c2 100644 --- a/cortext/protocol.py +++ b/cortext/protocol.py @@ -15,7 +15,8 @@ class IsAlive(bt.Synapse): class Bandwidth(bt.Synapse): - bandwidth_rpm: Optional[Dict[str, int]] = None + bandwidth_rpm: Optional[Dict[str, dict]] = None + class ImageResponse(bt.Synapse): """ A class to represent the response for an image-related request. """ @@ -124,6 +125,15 @@ class ImageResponse(bt.Synapse): description="A list of fields required for the hash." ) + process_time: int = pydantic.Field( + default=9999, + title="process time", + description="processed time of querying dendrite.", + ) + task_id: str = pydantic.Field( + default="9999" + ) + def deserialize(self) -> Optional[Dict]: """ Deserialize the completion data of the image response. """ return self.completion @@ -286,6 +296,14 @@ class StreamPrompting(bt.StreamingSynapse): title="streaming", description="whether to stream the output", ) + deserialize_flag: bool = pydantic.Field( + default=True + ) + task_id: str = pydantic.Field( + default="9999", + title="task_id", + description="task id of the request from this syanpse." + ) async def process_streaming_response(self, response: StreamingResponse) -> AsyncIterator[str]: if self.completion is None: @@ -297,9 +315,6 @@ async def process_streaming_response(self, response: StreamingResponse) -> Async self.completion += token yield tokens - def deserialize(self) -> str: - return self.completion - def extract_response_json(self, response: StreamingResponse) -> dict: headers = { k.decode("utf-8"): v.decode("utf-8") @@ -332,4 +347,4 @@ def extract_info(prefix: str) -> dict[str, str]: "timeout": self.timeout, "streaming": self.streaming, "uid": self.uid, - } \ No newline at end of file + } diff --git a/cortext/reward.py b/cortext/reward.py index 24bd52b8..1469a592 100644 --- a/cortext/reward.py +++ b/cortext/reward.py @@ -18,8 +18,8 @@ # DEALINGS IN THE SOFTWARE. from __future__ import annotations from transformers import logging as hf_logging -hf_logging.set_verbosity_error() +hf_logging.set_verbosity_error() import re import io @@ -37,10 +37,13 @@ from sklearn.feature_extraction.text import TfidfVectorizer from transformers import CLIPProcessor, CLIPModel + # ==== TEXT ==== def calculate_text_similarity(text1: str, text2: str): try: + text1 = str(text1).lower() + text2 = str(text2).lower() # Initialize the TF-IDF Vectorizer vectorizer = TfidfVectorizer() @@ -50,12 +53,12 @@ def calculate_text_similarity(text1: str, text2: str): # Calculate the Cosine Similarity similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0] - bt.logging.debug(f"Similarity: {similarity}") return similarity except Exception as e: bt.logging.error(f"Error in calculate_text_similarity: {traceback.format_exc()}") raise + async def api_score(api_answer: str, response: str, weight: float, temperature: float, provider: str) -> float: try: if api_answer is None or response is None: @@ -153,6 +156,7 @@ def calculate_image_similarity(image, description, max_length: int = 77): # Calculate cosine similarity return torch.cosine_similarity(image_embedding, text_embedding, dim=1).item() + async def dalle_score(uid, url, desired_size, description, weight, similarity_threshold=0.21) -> float: """Calculate the image score based on similarity and size asynchronously.""" @@ -191,11 +195,11 @@ async def dalle_score(uid, url, desired_size, description, weight, similarity_th return 0 - # IMAGES ---- DETERMINISTIC async def deterministic_score(uid: int, syn, weight: float): - vali_b64s = await utils.call_stability(syn.messages, syn.seed, syn.steps, syn.cfg_scale, syn.width, syn.height, syn.samples, syn.sampler) + vali_b64s = await utils.call_stability(syn.messages, syn.seed, syn.steps, syn.cfg_scale, syn.width, syn.height, + syn.samples, syn.sampler) for miner_b64, vali_b64 in zip(syn.completion["b64s"], vali_b64s): if miner_b64[:50] != vali_b64[:50]: @@ -206,7 +210,6 @@ async def deterministic_score(uid: int, syn, weight: float): return weight - # ==== Embeddings ===== async def embeddings_score(openai_answer: list, response: list, weight: float, threshold: float = .95) -> float: @@ -214,7 +217,6 @@ async def embeddings_score(openai_answer: list, response: list, weight: float, t bt.logging.info("The number of embeddings in openai_answer and response do not match.") return 0 - # Calculate similarity for each pair of embeddings similarities = [] for oa_emb, resp_emb in zip(openai_answer, response): diff --git a/cortext/utils.py b/cortext/utils.py index 0ded5311..f97a3119 100644 --- a/cortext/utils.py +++ b/cortext/utils.py @@ -310,7 +310,7 @@ async def get_item_from_list(items, vision): async with list_update_lock: items = state[category][item_type] - bt.logging.debug(f"Queue for {list_type}: {len(items) if items else 0} items") + bt.logging.trace(f"Queue for {list_type}: {len(items) if items else 0} items") item = await get_item_from_list(items, vision) @@ -319,7 +319,7 @@ async def get_item_from_list(items, vision): items = await get_items(category, item_type, theme) bt.logging.trace(f"Items generated: {items}") state[category][item_type] = items - bt.logging.debug(f"Fetched new list for {list_type}, containing {len(items)} items") + bt.logging.trace(f"Fetched new list for {list_type}, containing {len(items)} items") item = await get_item_from_list(items, vision) @@ -457,8 +457,8 @@ def extract_python_list(text: str): async def call_openai(messages, temperature, model, seed=1234, max_tokens=2048, top_p=1): for _ in range(2): - bt.logging.debug( - f"Calling Openai. Temperature = {temperature}, Model = {model}, Seed = {seed}, Messages = {messages}" + bt.logging.trace( + f"Calling Openai to get answer. Temperature = {temperature}, Model = {model}, Seed = {seed}, Messages = {messages}" ) try: message = messages[0] @@ -503,7 +503,7 @@ async def call_openai(messages, temperature, model, seed=1234, max_tokens=2048, async def call_gemini(messages, temperature, model, max_tokens, top_p, top_k): - bt.logging.debug(f"Calling Gemini. Temperature = {temperature}, Model = {model}, Messages = {messages}") + bt.logging.trace(f"Calling Gemini. Temperature = {temperature}, Model = {model}, Messages = {messages}") try: model = genai.GenerativeModel(model) response = model.generate_content( @@ -554,7 +554,7 @@ async def call_gemini(messages, temperature, model, max_tokens, top_p, top_k): async def call_anthropic_bedrock(prompt, temperature, model, max_tokens=2048, top_p=1, top_k=10000): try: - bt.logging.debug( + bt.logging.trace( f"Calling Bedrock via Anthropic. Model = {model}, Prompt = {prompt}, Temperature = {temperature}, Max Tokens = {max_tokens}" ) completion = await anthropic_bedrock_client.completions.create( @@ -610,7 +610,7 @@ async def generate_messages_to_claude(messages): async def call_anthropic(messages, temperature, model, max_tokens, top_p, top_k): try: - bt.logging.info( + bt.logging.trace( f"calling Anthropic for {messages} with temperature: {temperature}, model: {model}, max_tokens: {max_tokens}, top_p: {top_p}, top_k: {top_k}" ) filtered_messages, system_prompt = await generate_messages_to_claude(messages) @@ -633,7 +633,7 @@ async def call_anthropic(messages, temperature, model, max_tokens, top_p, top_k) async def call_groq(messages, temperature, model, max_tokens, top_p, seed): try: - bt.logging.info( + bt.logging.trace( f"calling groq for {messages} with temperature: {temperature}, model: {model}, max_tokens: {max_tokens}, top_p: {top_p}" ) @@ -655,7 +655,7 @@ async def call_groq(messages, temperature, model, max_tokens, top_p, seed): async def call_bedrock(messages, temperature, model, max_tokens, top_p, seed): try: - bt.logging.info( + bt.logging.trace( f"calling AWS Bedrock for {messages} with temperature: {temperature}, model: {model}, max_tokens: {max_tokens}, top_p: {top_p}" ) @@ -746,7 +746,7 @@ async def extract_message(message): async def call_stability(prompt, seed, steps, cfg_scale, width, height, samples, sampler): # bt.logging.info(f"calling stability for {prompt, seed, steps, cfg_scale, width, height, samples, sampler}") - bt.logging.info(f"calling stability for {prompt[:50]}...") + bt.logging.trace(f"calling stability for {prompt[:50]}...") # Run the synchronous stability_api.generate function in a separate thread meta = await asyncio.to_thread( diff --git a/miner/config.py b/miner/config.py index 7cdcb40f..68478ec1 100644 --- a/miner/config.py +++ b/miner/config.py @@ -36,7 +36,8 @@ def __init__(self): self.BT_SUBTENSOR_NETWORK = 'test' if self.ENV == 'test' else 'finney' self.WANDB_OFF = False if self.ENV == 'prod' else True - self.LOGGING_TRACE = False if self.ENV == 'prod' else True + # still can use the --logging.debug and --logging.trace to turn on logging + self.LOGGING_TRACE = False # if self.ENV == 'prod' else True self.BLACKLIST_AMT = 5000 if self.ENV == 'prod' else 0 self.BLOCKS_PER_EPOCH = int(os.getenv('BLOCKS_PER_EPOCH', 100)) self.WAIT_NEXT_BLOCK_TIME = int(os.getenv('WAIT_NEXT_BLOCK_TIME', 1)) diff --git a/miner/constants.py b/miner/constants.py deleted file mode 100644 index ef7371a5..00000000 --- a/miner/constants.py +++ /dev/null @@ -1,28 +0,0 @@ -from cortext import ImageResponse, StreamPrompting -from miner.providers import OpenAI, Anthropic, AnthropicBedrock, Groq, Gemini, Bedrock - -task_image = ImageResponse.__name__ -task_stream = StreamPrompting.__name__ - -openai_provider = OpenAI.__name__ -anthropic_provider = Anthropic.__name__ -anthropic_bedrock_provider = AnthropicBedrock.__name__ -groq_provider = Groq.__name__ -gemini_provider = Gemini.__name__ -bedrock_provider = Bedrock.__name__ - -capacity_to_task_and_provider = { - f"{task_image}_{openai_provider}": 1, - f"{task_image}_{anthropic_provider}": 1, - f"{task_image}_{anthropic_bedrock_provider}": 1, - f"{task_image}_{groq_provider}": 1, - f"{task_image}_{gemini_provider}": 1, - f"{task_image}_{bedrock_provider}": 1, - - f"{task_stream}_{openai_provider}": 1, - f"{task_stream}_{anthropic_provider}": 1, - f"{task_stream}_{anthropic_bedrock_provider}": 1, - f"{task_stream}_{groq_provider}": 1, - f"{task_stream}_{gemini_provider}": 1, - f"{task_stream}_{bedrock_provider}": 1, -} diff --git a/miner/miner.py b/miner/miner.py index 5e1b46b0..290acdc8 100644 --- a/miner/miner.py +++ b/miner/miner.py @@ -124,7 +124,7 @@ def run(self): f"on network: {self.subtensor.chain_endpoint} " f"with netuid: {self.config.netuid}" ) - self.axon.serve(self.config.netuid, subtensor=self.subtensor) + # self.axon.serve(self.config.netuid, subtensor=self.subtensor) bt.logging.info(f"Starting axon server on port: {self.config.axon.port}") self.axon.start() self.last_epoch_block = self.subtensor.get_current_block() diff --git a/miner/providers/anthropicbedrock.py b/miner/providers/anthropicbedrock.py index b1de716d..dabe552b 100644 --- a/miner/providers/anthropicbedrock.py +++ b/miner/providers/anthropicbedrock.py @@ -11,10 +11,9 @@ class AnthropicBedrock(Provider): def __init__(self, synapse): super().__init__(synapse) bedrock_client_parameters = { - "service_name": 'bedrock-runtime', - "aws_access_key_id": config.AWS_ACCESS_KEY, - "aws_secret_access_key": config.AWS_SECRET_KEY, - "region_name": "us-east-1" + "aws_secret_key": config.AWS_ACCESS_KEY, + "aws_access_key": config.AWS_SECRET_KEY, + "aws_region": "us-east-1" } self.anthropic_bedrock_client = AsyncAnthropicBedrock(timeout=config.ASYNC_TIME_OUT, @@ -22,15 +21,21 @@ def __init__(self, synapse): @error_handler async def _prompt(self, synapse: StreamPrompting, send: Send): - stream = await self.anthropic_bedrock_client.completions.create( - prompt=f"\n\nHuman: {self.messages}\n\nAssistant:", - max_tokens_to_sample=self.max_tokens, - temperature=self.temperature, # must be <= 1.0 - top_k=self.top_k, - top_p=self.top_p, - model=self.model, - stream=True, - ) + stream = [] + try: + stream = await self.anthropic_bedrock_client.completions.create( + prompt=f"\n\nHuman: {self.messages}\n\nAssistant:", + max_tokens_to_sample=self.max_tokens, + temperature=self.temperature, # must be <= 1.0 + top_k=self.top_k, + top_p=self.top_p, + model=self.model, + stream=True, + ) + except Exception as err: + bt.logging.exception(err) + await send({"type": "http.response.body", "body": b'', "more_body": False}) + async for completion in stream: if completion.completion: diff --git a/miner/services/__init__.py b/miner/services/__init__.py index 0823ea8d..f7388167 100644 --- a/miner/services/__init__.py +++ b/miner/services/__init__.py @@ -2,9 +2,8 @@ from .prompt import PromptService from .image import ImageService from .embedding import EmbeddingService -from .text import TextService from .check_status import IsAliveService from .capacity import CapacityService -ALL_SERVICE_TYPE = Union[PromptService, ImageService, EmbeddingService, TextService, IsAliveService, CapacityService] +ALL_SERVICE_TYPE = Union[PromptService, ImageService, EmbeddingService, IsAliveService, CapacityService] __all__ = [PromptService, ImageService, EmbeddingService, CapacityService, ALL_SERVICE_TYPE] diff --git a/miner/services/base.py b/miner/services/base.py index 6affce99..41b724f9 100644 --- a/miner/services/base.py +++ b/miner/services/base.py @@ -60,28 +60,6 @@ def base_blacklist(self, synapse) -> Tuple[bool, str]: if stake < self.blacklist_amt: return True, f"Blacklisted a low stake {synapse_type} request: {stake} < {self.blacklist_amt} from {hotkey}" - time_window = cortext.MIN_REQUEST_PERIOD * 60 - current_time = time.time() - - if hotkey not in BaseService.request_timestamps: - BaseService.request_timestamps[hotkey] = deque() - - # Remove timestamps outside the current time window - while (BaseService.request_timestamps[hotkey] and - current_time - BaseService.request_timestamps[hotkey][0] > time_window): - BaseService.request_timestamps[hotkey].popleft() - - # Check if the number of requests exceeds the limit - if len(BaseService.request_timestamps[hotkey]) >= cortext.MAX_REQUESTS: - return ( - True, - f"Request frequency for {hotkey} exceeded: " - f"{len(BaseService.request_timestamps[hotkey])} requests in {cortext.MIN_REQUEST_PERIOD} minutes. " - f"Limit is {cortext.MAX_REQUESTS} requests." - ) - - BaseService.request_timestamps[hotkey].append(current_time) - return False, f"accepting {synapse_type} request from {hotkey}" except Exception: diff --git a/miner/services/capacity.py b/miner/services/capacity.py index eab539b9..d64b0c48 100644 --- a/miner/services/capacity.py +++ b/miner/services/capacity.py @@ -5,7 +5,7 @@ from .base import BaseService from cortext import ISALIVE_BLACKLIST_STAKE -from miner.constants import capacity_to_task_and_provider +from cortext.constants import bandwidth_to_model class CapacityService(BaseService): @@ -14,7 +14,7 @@ def __init__(self, metagraph, blacklist_amt=ISALIVE_BLACKLIST_STAKE): async def forward_fn(self, synapse: Bandwidth): bt.logging.debug("capacity request is being processed") - synapse.bandwidth_rpm = capacity_to_task_and_provider + synapse.bandwidth_rpm = bandwidth_to_model bt.logging.info("check status is executed.") return synapse diff --git a/organic.py b/organic.py index e3e7e210..e71012bb 100644 --- a/organic.py +++ b/organic.py @@ -5,6 +5,7 @@ from starlette.responses import StreamingResponse import asyncio import random +import traceback class StreamPrompting(bt.StreamingSynapse): @@ -175,15 +176,16 @@ async def handle_response(responses): async def main(): print("synching metagraph, this takes way too long.........") - subtensor = bt.subtensor( network="finney" ) - meta = subtensor.metagraph( netuid=18 ) + subtensor = bt.subtensor( network="test" ) + meta = subtensor.metagraph( netuid=37 ) print("metagraph synched!") # This needs to be your validator wallet that is running your subnet 18 validator - wallet = bt.wallet( name="validator", hotkey="default" ) + wallet = bt.wallet( name="1", hotkey="1" ) dendrite = bt.dendrite( wallet=wallet ) vali_uid = meta.hotkeys.index( wallet.hotkey.ss58_address) axon_to_use = meta.axons[vali_uid] + print(f"axon to use: {axon_to_use}") # This is the question to send your validator to send your miner. prompt = "Give me a long story about a cat" @@ -191,14 +193,12 @@ async def main(): # You can edit this to pick a specific miner uid, just change miner_uid to the uid that you desire. # Currently, it just picks a random miner form the top 100 uids. Or it can be hardcoded to a specific uid. - miner_uid = 202 - + # miner_uid = 2 synapse = StreamPrompting( messages = messages, # get available providers and models from : https://github.com/corcel-api/cortex.t/blob/2807988d66523a432f6159d46262500b060f13dc/cortext/protocol.py#L238 - provider = "OpenAI", - model = "gpt-3.5-turbo", - uid = miner_uid, + provider = "OpenAI", + model = "gpt-4o", ) timeout = 60 streaming = True diff --git a/validators/models/__init__.py b/validators/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/validators/models/enum.py b/validators/models/enum.py new file mode 100644 index 00000000..ae2b1a3d --- /dev/null +++ b/validators/models/enum.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class QueryType(str, Enum): # Inherit from str to enforce the value type as string + organic_type = 'organic' + synthetic_type = 'synthetic' diff --git a/validators/services/cache.py b/validators/services/cache.py new file mode 100644 index 00000000..d153b551 --- /dev/null +++ b/validators/services/cache.py @@ -0,0 +1,72 @@ +import sqlite3 +import time +import hashlib + + +class QueryResponseCache: + def __init__(self): + # Connect to (or create) the SQLite database + conn = sqlite3.connect('cache.db') + cursor = conn.cursor() + + # Create a table for caching (key, value, and expiry time) + cursor.execute(''' + CREATE TABLE IF NOT EXISTS cache ( + p_key TEXT PRIMARY KEY, + question TEXT, + answer TEXT, + provider TEXT, + model TEXT, + timestamp REAL + ) + ''') + cursor.execute(''' + CREATE INDEX IF NOT EXISTS idx_provider_model ON cache (provider, model); + ''') + conn.commit() + self.conn = conn + + @staticmethod + def generate_hash(input_string): + return hashlib.sha256(input_string.encode('utf-8')).hexdigest() + + def set_cache(self, question, answer, provider, model, ttl=3600*24): + p_key = self.generate_hash(str(question) + str(provider) + str(model)) + expires_at = time.time() + ttl + cursor = self.conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO cache (p_key, question, answer, provider, model, timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ''', (p_key, question, answer, provider, model, expires_at)) + self.conn.commit() + + def get_answer(self, question, provider, model): + p_key = self.generate_hash(str(question) + str(provider) + str(model)) + cursor = self.conn.cursor() + cursor.execute(''' + SELECT answer FROM cache WHERE p_key = ? + ''', (p_key,)) + result = cursor.fetchone() + return result[0] if result else None + + def get_cache(self, key): + cursor = self.conn.cursor() + cursor.execute(''' + SELECT * FROM cache WHERE p_key = ? + ''', key) + result = cursor.fetchone() + return result[0] if result else None + + def get_all_question_to_answers(self, provider, model): + cursor = self.conn.cursor() + cursor.execute(''' + SELECT question, answer FROM cache WHERE provider = ? AND model = ? + ''', (provider, model)) + results = [(row[0], row[1]) for row in cursor.fetchall()] + return results + + def close(self): + self.conn.close() + + +cache_service = QueryResponseCache() \ No newline at end of file diff --git a/validators/services/capacity.py b/validators/services/capacity.py index 51a26f42..6b3e2ed4 100644 --- a/validators/services/capacity.py +++ b/validators/services/capacity.py @@ -1,4 +1,6 @@ import asyncio +from copy import deepcopy +from typing import List from cortext.protocol import Bandwidth import bittensor as bt @@ -9,6 +11,8 @@ def __init__(self, metagraph, dendrite): self.metagraph = metagraph self.dendrite: bt.dendrite = dendrite self.timeout = 4 + self.uid_to_capacity = {} + self.remain_uid_to_capacity = {} async def query_capacity_to_miners(self, available_uids): capacity_query_tasks = [] @@ -22,11 +26,12 @@ async def query_capacity_to_miners(self, available_uids): capacity_query_tasks.append(task) # Query responses is (uid. syn) - query_responses = await asyncio.gather(*capacity_query_tasks, return_exceptions=True) + query_responses: List[Bandwidth] = await asyncio.gather(*capacity_query_tasks, return_exceptions=True) uid_to_capacity = {} for uid, resp in zip(available_uids, query_responses): if isinstance(resp, Exception): bt.logging.error(f"exception happens while querying capacity to miner {uid}, {resp}") else: - uid_to_capacity[uid] = resp + uid_to_capacity[uid] = resp.bandwidth_rpm + self.uid_to_capacity = deepcopy(uid_to_capacity) return uid_to_capacity diff --git a/validators/services/validators/base_validator.py b/validators/services/validators/base_validator.py index 50e7d7cf..e19caca2 100644 --- a/validators/services/validators/base_validator.py +++ b/validators/services/validators/base_validator.py @@ -1,13 +1,16 @@ from abc import abstractmethod import asyncio -from datasets import load_dataset +import json +from collections import defaultdict + import random -from typing import List, Tuple +from typing import Tuple import bittensor as bt from cortext.metaclasses import ValidatorRegistryMeta -from cortext import utils +from validators.utils import error_handler, get_bandwidth +from cortext.constants import TEXT_VALI_MODELS_WEIGHTS dataset = None @@ -28,37 +31,6 @@ def __init__(self, config, metagraph): self.num_samples = 100 self.wandb_data = {} - def get_random_texts(self) -> list[str]: - global dataset - if dataset is None: - dataset = load_dataset('wikitext', 'wikitext-2-v1') - texts = [item['text'] for item in dataset['train']] - return random.sample(texts, self.num_samples) - - async def load_questions(self, available_uids, item_type: str = "text", vision=False): - self.uid_to_questions = dict() - - for index, uid in enumerate(available_uids): - - if item_type == "images": - content = await utils.get_question("images", len(available_uids)) - self.uid_to_questions[uid] = content # Store messages for each UID - elif item_type == "text": - question = await utils.get_question("text", len(available_uids), vision) - if isinstance(question, str): - bt.logging.info(f"Question is str, dict expected: {question}") - prompt = question.get("prompt") - image_url = question.get("image") - self.uid_to_questions[uid] = {"prompt": prompt} - self.uid_to_questions[uid]["image"] = image_url - else: - random_texts = self.get_random_texts() - num_texts_per_uid = len(random_texts) // len(available_uids) - start_index = index * num_texts_per_uid - end_index = start_index + num_texts_per_uid - prompt = random_texts[start_index:end_index] - self.uid_to_questions[uid] = prompt - async def query_miner(self, metagraph, uid, syn): try: responses = await self.dendrite([metagraph.axons[uid]], syn, deserialize=False, timeout=self.timeout, @@ -69,44 +41,52 @@ async def query_miner(self, metagraph, uid, syn): bt.logging.error(f"Exception during query for uid {uid}: {e}") return uid, None + @abstractmethod + def select_random_provider_and_model(self): + pass + + def get_provider_to_models(self): + pass + async def handle_response(self, uid, response) -> Tuple[int, bt.Synapse]: if type(response) == list and response: response = response[0] return uid, response @abstractmethod - async def start_query(self, available_uids: List[int]) -> bt.Synapse: + async def create_query(self, uid): pass @abstractmethod async def build_wandb_data(self, scores, responses): pass - def should_i_score(self): - return True - @abstractmethod - async def get_answer_task(self, uid, synapse=None): + async def get_answer_task(self, uid, synapse, response): pass @abstractmethod async def get_scoring_task(self, uid, answer, response): pass - async def score_responses(self, responses): + @staticmethod + def get_synapse_from_json(data): + pass + + @error_handler + async def score_responses(self, uid_to_query_resps, uid_to_capacity): answering_tasks = [] scoring_tasks = [] - uid_scores_dict = {} scored_response = [] - for uid, syn in responses: - task = self.get_answer_task(uid, syn) + for uid, query_resp in uid_to_query_resps: + task = self.get_answer_task(uid, query_resp.get("query"), query_resp.get("response")) answering_tasks.append((uid, task)) answers_results = await asyncio.gather(*[task for _, task in answering_tasks]) - for (uid, response), answer in zip(responses, answers_results): - task = self.get_scoring_task(uid, answer, response) + for (uid, query_resp), answer in zip(uid_to_query_resps, answers_results): + task = self.get_scoring_task(uid, answer, query_resp.get("response")) scoring_tasks.append((uid, task)) # Await all scoring tasks @@ -115,20 +95,65 @@ async def score_responses(self, responses): scored_responses) if scored_responses else 0 bt.logging.debug(f"scored responses = {scored_responses}, average score = {average_score}") - for (uid, _), scored_response in zip(scoring_tasks, scored_responses): - if scored_response is not None: - uid_scores_dict[uid] = float(scored_response) - else: - uid_scores_dict[uid] = 0 + uid_scores_dict = self.get_uid_to_scores_dict(uid_to_query_resps, scored_responses, uid_to_capacity) - if uid_scores_dict != {}: - bt.logging.info(f"text_scores is {uid_scores_dict}") bt.logging.trace("score_responses process completed.") - return uid_scores_dict, scored_response, responses + return uid_scores_dict, scored_response, uid_to_query_resps + + def get_uid_to_scores_dict(self, uid_to_query_resps, scored_responses: tuple[float], uid_to_capacity): + uid_provider_model_scores_dict = defaultdict(list) - async def get_and_score(self, available_uids: List[int]): - bt.logging.trace("starting query") - query_responses = await self.start_query(available_uids) - bt.logging.trace("scoring query with query responses") - return await self.score_responses(query_responses) + # collect all scores per each uid, provider, model + for (uid, query_resp), scored_response in zip(uid_to_query_resps, scored_responses): + synapse = query_resp.get('query') + provider = synapse.provider + model = synapse.model + if scored_response is not None: + bt.logging.trace(f"scored response is {scored_response} for uid {uid} for provider {provider} " + f"and for model {model}") + uid_provider_model_scores_dict[f"{uid}::{provider}::{model}"].append(float(scored_response)) + else: + uid_provider_model_scores_dict[f"{uid}::{provider}::{model}"].append(0) + + # get avg score value for each uid, provider, model + uid_provider_model_scores_avg_dict = {} + for key, scores in uid_provider_model_scores_dict.items(): + if len(scores) == 0: + bt.logging.debug(f"no scores found for this uid {key}") + avg_score = sum(scores) / len(scores) + uid_provider_model_scores_avg_dict[key] = avg_score + + # apply weight for each model and calculate score based on weight of models. + uid_scores_dict = defaultdict(float) + uid_model_to_scores_dict = defaultdict(dict) + for key, avg_score in uid_provider_model_scores_avg_dict.items(): + uid = int(str(key).split("::")[0]) + provider = str(key).split("::")[1] + model = str(key).split("::")[2] + model_weight = TEXT_VALI_MODELS_WEIGHTS.get(provider).get(model) + if model_weight is None: + bt.logging.debug(f"not weight found for this provider {provider} and model {model}") + model_weight = 0 + + band_width = get_bandwidth(uid_to_capacity, uid, provider, model) + + if band_width is None: + bt.logging.debug(f"no band_width found for this uid {uid}") + band_width = 1 + weighted_score = avg_score * model_weight * band_width + uid_scores_dict[uid] += weighted_score + uid_model_to_scores_dict[uid][model] = weighted_score + bt.logging.debug(f""" + score details for all miners: + {json.dumps(uid_model_to_scores_dict, indent=4)} + """) + + if not len(uid_scores_dict): + validator_type = self.__class__.__name__ + bt.logging.debug(f"{validator_type} scores is {uid_scores_dict}") + return uid_scores_dict + + @classmethod + def get_task_type(cls): + pass diff --git a/validators/services/validators/constants.py b/validators/services/validators/constants.py deleted file mode 100644 index fb751637..00000000 --- a/validators/services/validators/constants.py +++ /dev/null @@ -1,48 +0,0 @@ -TEXT_MODEL = "gpt-4-turbo-2024-04-09" -TEXT_PROVIDER = "OpenAI" -TEXT_MAX_TOKENS = 4096 -TEXT_TEMPERATURE = 0.001 -TEXT_WEIGHT = 1 -TEXT_TOP_P = 0.01 -TEXT_TOP_K = 1 -VISION_MODELS = ["gpt-4o", "claude-3-opus-20240229", "anthropic.claude-3-sonnet-20240229-v1:0", - "claude-3-5-sonnet-20240620"] -TEXT_VALI_MODELS_WEIGHTS = { - "AnthropicBedrock": { - "anthropic.claude-v2:1": 1 - }, - "OpenAI": { - "gpt-4o": 1, - "gpt-3.5-turbo": 1000, - "o1-preview": 1, - "o1-mini": 1, - }, - "Gemini": { - "gemini-pro": 1, - "gemini-1.5-flash": 1, - "gemini-1.5-pro": 1, - }, - "Anthropic": { - "claude-3-5-sonnet-20240620": 1, - "claude-3-opus-20240229": 1, - "claude-3-sonnet-20240229": 1, - "claude-3-haiku-20240307": 1000, - }, - "Groq": { - "gemma-7b-it": 500, - "llama3-70b-8192": 1, - "llama3-8b-8192": 500, - "mixtral-8x7b-32768": 1, - }, - "Bedrock": { - # "anthropic.claude-3-sonnet-20240229-v1:0": 1, - "cohere.command-r-v1:0": 1, - # "meta.llama2-70b-chat-v1": 1, - # "amazon.titan-text-express-v1": 1, - "mistral.mistral-7b-instruct-v0:2": 1, - "ai21.j2-mid-v1": 1, - # "anthropic.claude-3-5-sonnet-20240620-v1:0": 1, - # "anthropic.claude-3-opus-20240229-v1:0": 1, - # "anthropic.claude-3-haiku-20240307-v1:0": 1 - } -} diff --git a/validators/services/validators/embeddings_validator.py b/validators/services/validators/embeddings_validator.py index 53d89260..02390a56 100644 --- a/validators/services/validators/embeddings_validator.py +++ b/validators/services/validators/embeddings_validator.py @@ -8,9 +8,9 @@ from validators.services.validators.base_validator import BaseValidator -class EmbeddingsValidator(BaseValidator): - def __init__(self, config): - super().__init__(config) +class EmbeddingsValidator: + def __init__(self, config, metagraph=None): + super().__init__(config, metagraph) self.streaming = False self.config = config self.query_type = "embeddings" diff --git a/validators/services/validators/image_validator.py b/validators/services/validators/image_validator.py index 0bcd2432..27975ee3 100644 --- a/validators/services/validators/image_validator.py +++ b/validators/services/validators/image_validator.py @@ -1,17 +1,16 @@ import asyncio import random -import traceback import wandb import cortext.reward from cortext.protocol import ImageResponse -from validators.services.validators.base_validator import BaseValidator from validators import utils -from validators.utils import error_handler +from validators.utils import error_handler, save_or_get_answer_from_cache +from cortext.utils import get_question import bittensor as bt -class ImageValidator(BaseValidator): +class ImageValidator: def __init__(self, config, metagraph=None): super().__init__(config, metagraph) self.num_uids_to_pick = 30 @@ -48,46 +47,43 @@ def select_random_provider_and_model(self): elif self.provider == "OpenAI": self.model = "dall-e-3" - async def start_query(self, available_uids): - try: - query_tasks = [] + def get_provider_to_models(self): + return [("OpenAI", "dall-e-3")] - self.select_random_provider_and_model() - await self.load_questions(available_uids, "images") + async def get_question(self): + question = await get_question("images", 1) + return question - # Query all images concurrently - for uid, content in self.uid_to_questions.items(): - syn = ImageResponse(messages=content, model=self.model, size=self.size, quality=self.quality, - style=self.style, provider=self.provider, seed=self.seed, steps=self.steps) - bt.logging.info(f"uid = {uid}, syn = {syn}") - task = self.query_miner(self.metagraph, uid, syn) - query_tasks.append(task) - - # Query responses is (uid. syn) - query_responses = await asyncio.gather(*query_tasks) - return query_responses - except: - bt.logging.error(f"error in start_query {traceback.format_exc()}") + async def create_query(self, uid, provider=None, model=None) -> bt.Synapse: + question = await self.get_question() + syn = ImageResponse(messages=question, model=model, size=self.size, quality=self.quality, + style=self.style, provider=provider, seed=self.seed, steps=self.steps) + bt.logging.info(f"uid = {uid}, syn = {syn}") + return syn def should_i_score(self): rand = random.random() return rand < 1 / 1 async def get_scoring_task(self, uid, answer, response: ImageResponse): - if answer is None: + if response is None: + bt.logging.trace(f"response is None. so return score with 0 for this uid {uid}.") return 0 if response.provider == "OpenAI": - completion = answer.completion + completion = response.completion if completion is None: + bt.logging.trace(f"response completion is None for uid {uid}. so return score with 0") return 0 image_url = completion["url"] score = await cortext.reward.dalle_score(uid, image_url, self.size, response.messages, - self.weight) + self.weight) else: + bt.logging.trace(f"not found provider type {response.provider}") score = 0 # cortext.reward.deterministic_score(uid, syn, self.weight) return score - async def get_answer_task(self, uid, synapse=None): + @save_or_get_answer_from_cache + async def get_answer_task(self, uid, synapse: ImageResponse, response): return synapse @error_handler @@ -112,3 +108,12 @@ async def build_wandb_data(self, scores, responses): self.wandb_data["images"][uid] = wandb.Image(image) if image is not None else '' self.wandb_data["prompts"][uid] = self.uid_to_questions[uid] return self.wandb_data + + @classmethod + def get_task_type(cls): + return ImageResponse.__name__ + + @staticmethod + def get_synapse_from_json(data): + synapse = ImageResponse.parse_raw(data) + return synapse \ No newline at end of file diff --git a/validators/services/validators/text_validator.py b/validators/services/validators/text_validator.py index 8a14cf3c..cffe1705 100644 --- a/validators/services/validators/text_validator.py +++ b/validators/services/validators/text_validator.py @@ -1,10 +1,9 @@ -import asyncio import random import bittensor as bt from typing import AsyncIterator from cortext.reward import model -from . import constants +from cortext import constants import cortext.reward from validators.services.validators.base_validator import BaseValidator from validators.utils import error_handler @@ -12,11 +11,10 @@ from cortext.protocol import StreamPrompting from cortext.utils import (call_anthropic_bedrock, call_bedrock, call_anthropic, call_gemini, call_groq, call_openai, get_question) -from validators.utils import get_should_i_score_arr_for_text +from validators.utils import save_or_get_answer_from_cache, get_query_synapse_from_cache class TextValidator(BaseValidator): - gen_should_i_score = get_should_i_score_arr_for_text() def __init__(self, config, provider: str = None, model: str = None, metagraph=None): super().__init__(config, metagraph) self.streaming = True @@ -71,50 +69,25 @@ async def organic(self, metagraph, query: dict[str, list[dict[str, str]]]) -> As bt.logging.trace(resp) yield uid, resp - async def handle_response(self, uid: str, responses) -> tuple[str, str]: - full_response = "" - for resp in responses: - async for chunk in resp: - if isinstance(chunk, str): - bt.logging.trace(chunk) - full_response += chunk - bt.logging.trace(f"full_response for uid {uid}: {full_response}") - break - return uid, full_response - - async def start_query(self, available_uids): - try: - self.select_random_provider_and_model() - is_vision_model = self.model in constants.VISION_MODELS - await self.load_questions(available_uids, "text", is_vision_model) - - query_tasks = [] - bt.logging.trace(f"provider = {self.provider} model = {self.model}") - for uid, question in self.uid_to_questions.items(): - prompt = question.get("prompt") - image = question.get("image") - if image: - messages = [{'role': 'user', 'content': prompt, "image": image}] - else: - messages = [{'role': 'user', 'content': prompt}] - - syn = StreamPrompting(messages=messages, model=self.model, seed=self.seed, max_tokens=self.max_tokens, - temperature=self.temperature, provider=self.provider, top_p=self.top_p, - top_k=self.top_k) - - image = image if image else '' - bt.logging.info( - f"Sending {syn.model} {self.query_type} request to uid: {uid}, " - f"timeout {self.timeout}: {syn.messages[0]['content']} {image}" - ) - task = self.query_miner(self.metagraph, uid, syn) - query_tasks.append(task) - - query_responses = await asyncio.gather(*query_tasks) - - return query_responses - except Exception as err: - bt.logging.exception(err) + async def get_question(self, miner_cnt=1): + is_vision_model = self.model in constants.VISION_MODELS + question = await get_question("text", miner_cnt, is_vision_model) + return question + + @get_query_synapse_from_cache + async def create_query(self, uid, provider=None, model=None) -> bt.Synapse: + question = await self.get_question() + prompt = question.get("prompt") + image = question.get("image") + if image: + messages = [{'role': 'user', 'content': prompt, "image": image}] + else: + messages = [{'role': 'user', 'content': prompt}] + + syn = StreamPrompting(messages=messages, model=model, seed=self.seed, max_tokens=self.max_tokens, + temperature=self.temperature, provider=provider, top_p=self.top_p, + top_k=self.top_k) + return syn def select_random_provider_and_model(self): # AnthropicBedrock should only be used if a validators' anthropic account doesn't work @@ -126,9 +99,13 @@ def select_random_provider_and_model(self): self.model = random.choices(list(model_to_weights.keys()), weights=list(model_to_weights.values()), k=1)[0] - @classmethod - def should_i_score(cls): - return next(cls.gen_should_i_score) + def get_provider_to_models(self): + provider_models = [] + for provider in constants.TEXT_VALI_MODELS_WEIGHTS: + models = constants.TEXT_VALI_MODELS_WEIGHTS.get(provider).keys() + for model_ in models: + provider_models.append((provider, model_)) + return provider_models @error_handler async def build_wandb_data(self, uid_to_score, responses): @@ -140,7 +117,9 @@ async def build_wandb_data(self, uid_to_score, responses): self.wandb_data["responses"][uid] = response return self.wandb_data - async def call_api(self, prompt: str, image_url: Optional[str], provider: str) -> str: + async def call_api(self, prompt: str, image_url: Optional[str], query_syn: StreamPrompting) -> str: + provider = query_syn.provider + self.model = query_syn.model if provider == "OpenAI": return await call_openai( [{"role": "user", "content": prompt, "image": image_url}], self.temperature, self.model, self.seed, @@ -181,11 +160,23 @@ async def call_api(self, prompt: str, image_url: Optional[str], provider: str) - else: bt.logging.error(f"provider {provider} not found") - async def get_answer_task(self, uid: int, syn=None): - question = self.uid_to_questions[uid] - prompt = question.get("prompt") - image_url = question.get("image") - return await self.call_api(prompt, image_url, self.provider) + @save_or_get_answer_from_cache + async def get_answer_task(self, uid: int, query_syn: StreamPrompting, response): + prompt = query_syn.messages[0].get("content") + image_url = query_syn.messages[0].get("image") + answer = await self.call_api(prompt, image_url, query_syn) + return answer async def get_scoring_task(self, uid, answer, response): - return await cortext.reward.api_score(answer, response, self.weight, self.temperature, self.provider) + response_str, _ = response + return await cortext.reward.api_score(answer, response_str, self.weight, self.temperature, self.provider) + + @classmethod + def get_task_type(cls): + return StreamPrompting.__name__ + + @staticmethod + def get_synapse_from_json(data): + synapse = StreamPrompting.parse_raw(data) + return synapse + diff --git a/validators/task_manager.py b/validators/task_manager.py new file mode 100644 index 00000000..d2a10e33 --- /dev/null +++ b/validators/task_manager.py @@ -0,0 +1,73 @@ +import asyncio +import random +from copy import deepcopy +import bittensor as bt + +from cortext import ALL_SYNAPSE_TYPE +from validators.utils import error_handler +from validators import utils + + +class TaskMgr: + def __init__(self, uid_to_capacities, dendrite, metagraph, loop): + # Initialize Redis client + self.remain_resources = deepcopy(uid_to_capacities) + self.uid_to_capacity = deepcopy(uid_to_capacities) + self.dendrite = dendrite + self.metagraph = metagraph + self.loop = loop + + def restore_capacities_for_all_miners(self): + self.remain_resources = deepcopy(self.uid_to_capacity) + bt.logging.debug(f"resource is restored. remain_resources = {self.remain_resources}") + + def get_remaining_bandwidth(self, uid, provider, model): + if self.remain_resources.get(uid): + if self.remain_resources.get(uid).get(provider): + return self.remain_resources.get(uid).get(provider).get(model) + + + def update_remain_capacity_based_on_new_capacity(self, new_uid_to_capacity): + for uid, capacity in new_uid_to_capacity.items(): + if not capacity: + continue + for provider, model_to_cap in capacity.items(): + for model, cap in model_to_cap.items(): + if self.get_remaining_bandwidth(uid, provider, model) is None: + utils.update_nested_dict(self.remain_resources, keys=[uid, provider, model], value=cap) + else: + diff = self.uid_to_capacity[uid][provider][model] - cap + if diff: + bt.logging.debug(f"diff {diff} found in {uid}, {provider}, {model}") + self.remain_resources[uid][provider][model] -= diff + + bt.logging.debug(f"remain_resources after epoch = {self.remain_resources}") + self.uid_to_capacity = deepcopy(self.remain_resources) + + @error_handler + def assign_task(self, synapse: ALL_SYNAPSE_TYPE): + # find miner which bandwidth > 0. + uid = self.choose_miner(synapse) # Example: Assign to worker with max remaining bandwidth + if uid is None: + bt.logging.debug(f"no available resources to process this request.") + return None + bt.logging.trace(f"Assigning task to miner {uid}") + return uid + + def get_axon_from_uid(self, uid): + uid = int(uid) + return self.metagraph.axons[uid] + + def choose_miner(self, synapse: ALL_SYNAPSE_TYPE): + provider = synapse.provider + model = synapse.model + available_uids = [] + for uid in self.remain_resources: + capacity = self.remain_resources.get(uid) + bandwidth = capacity.get(provider).get(model) + if bandwidth is not None and bandwidth > 0: + # decrease resource by one after choosing this miner for the request. + available_uids.append(uid) + uid = random.choice(available_uids) + self.remain_resources[uid][provider][model] -= 1 + return uid diff --git a/validators/utils.py b/validators/utils.py index 7d4f8b5b..62388ca8 100644 --- a/validators/utils.py +++ b/validators/utils.py @@ -1,13 +1,18 @@ +import random import aiohttp import asyncio import base64 -import itertools +import hashlib +import inspect import bittensor as bt from PIL import Image from io import BytesIO from functools import wraps -import logging +import traceback + +from cortext import ImageResponse, ALL_SYNAPSE_TYPE +from validators.services.cache import QueryResponseCache async def download_image(url): @@ -30,20 +35,149 @@ def error_handler(func): async def wrapper(*args, **kwargs): try: result = await func(*args, **kwargs) + except GeneratorExit as err: + bt.logging.error(f"{err}. {traceback.format_exc()}") except Exception as err: - logging.exception(err) + bt.logging.error(f"{err}. {traceback.format_exc()}") return None + else: + return result + + @wraps(func) + def wrapper_sync(*args, **kwargs): + try: + result = func(*args, **kwargs) + except Exception as err: + bt.logging.error(f"{err}. {traceback.format_exc()}") + return None + else: + return result + + if inspect.iscoroutine(func): + return wrapper + else: + return wrapper_sync + + +async def handle_response_stream(responses) -> tuple[str, str]: + full_response = "" + async for chunk in responses: + if isinstance(chunk, str): + bt.logging.trace(chunk) + full_response += chunk + return full_response - return result + +def save_or_get_answer_from_cache(func): + @wraps(func) + async def wrapper(*args, **kwargs): + query_syn: ALL_SYNAPSE_TYPE = args[2] + provider = query_syn.provider + model = query_syn.model + + cache_service = QueryResponseCache() + answer = cache_service.get_answer(question=str(query_syn.json()), provider=provider, model=model) + if answer: + return answer + + answer = await func(*args, **kwargs) + try: + cache_service.set_cache(question=str(query_syn.json()), answer=str(answer), provider=provider, model=model) + except Exception as err: + bt.logging.error(f"Exception during cache for uid {args[1]}, {err}") + else: + bt.logging.trace(f"saved answer to cache successfully.") + finally: + return answer return wrapper -def get_should_i_score_arr_for_text(): - for i in itertools.count(): - yield (i % 3) == 0 +def get_query_synapse_from_cache(func): + @wraps(func) + async def wrapper(*args, **kwargs): + cache_service = QueryResponseCache() + vali = args[0] + provider = args[2] + model = args[3] + questions_answers = cache_service.get_all_question_to_answers(provider=provider, model=model) + if not questions_answers or random.random() > 0: + query_syn = await func(*args, **kwargs) + return query_syn + # select one of questions_answers + query, answer = random.choice(questions_answers) + query_syn = vali.get_synapse_from_json(query) + return query_syn + + return wrapper + + +def create_hash_value(input_string): + # Create a SHA-256 hash object based on random and synpase + input_string = str(input_string) + str(random.Random().random()) + hash_object = hashlib.sha256() + # Encode the string to bytes and update the hash object + hash_object.update(input_string.encode('utf-8')) + # Get the hexadecimal representation of the hash + hash_value = hash_object.hexdigest() + return hash_value + + +@error_handler +async def get_result_entry_from_redis(redis_client, stream_name, last_id, max_try_cnt): + result_entries = None + while max_try_cnt: + result_entries = redis_client.xread({stream_name: last_id}, block=100) + await asyncio.sleep(0.1) + if result_entries: + break + else: + max_try_cnt -= 1 + return result_entries + + +def find_positive_values(data: dict): + positive_values = {} + + for key, value in data.items(): + if isinstance(value, dict): + # Recursively handle nested dictionaries + nested_result = find_positive_values(value) + if nested_result: + positive_values[key] = nested_result + elif isinstance(value, (int, float)) and value > 0: + # Store key-value pairs where the value is greater than 0 + positive_values[key] = value + + return positive_values + + +def update_nested_dict(data, keys, value): + """ + Updates the value in the nested dictionary or creates the key path if it doesn't exist. + + :param data: The dictionary to update. + :param keys: A list of keys representing the path in the nested dictionary. + :param value: The value to set at the specified key path. + """ + if len(keys) == 1: + data[keys[0]] = value + else: + if keys[0] not in data or not isinstance(data[keys[0]], dict): + data[keys[0]] = {} + update_nested_dict(data[keys[0]], keys[1:], value) + + +def setup_max_capacity(item): + for key, value in item.items(): + if isinstance(value, dict): # If the value is another dictionary, recurse + setup_max_capacity(value) + elif isinstance(value, (int, float)): # If the value is a number, increment by 5 + item[key] = min(value, 50) -def get_should_i_score_arr_for_image(): - for i in itertools.count(): - yield (i % 1) != 0 +def get_bandwidth(data, uid, provider, model): + if data is None: + return 0 + value = (data.get(uid, {}) or {}).get(provider, {}).get(model, 0) + return value diff --git a/validators/validator.py b/validators/validator.py index 9ec4507f..8ead33d9 100644 --- a/validators/validator.py +++ b/validators/validator.py @@ -10,11 +10,13 @@ import cortext from cortext import utils from validators.weight_setter import WeightSetter +from validators.services.cache import cache_service # Load environment variables from .env file load_dotenv() random.seed(time.time()) + class NestedNamespace(argparse.Namespace): def __setattr__(self, name, value): if '.' in name: @@ -53,7 +55,7 @@ def get(self, key, default=None): def parse_arguments(): parser = argparse.ArgumentParser(description="Validator Configuration") - parser.add_argument("--subtensor.chain_endpoint", type=str, default="wss://entrypoint-finney.opentensor.ai:443") + parser.add_argument("--subtensor.chain_endpoint", type=str, default="wss://entrypoint-finney.opentensor.ai:443") #for testnet: wss://test.finney.opentensor.ai:443 parser.add_argument("--wallet.name", type=str, default="default") parser.add_argument("--wallet.hotkey", type=str, default="default") parser.add_argument("--netuid", type=int, default=18) @@ -89,8 +91,7 @@ def init_wandb(config): if not config.wandb_on: return - wallet = bt.wallet(name=config.wallet.name, hotkey=config.wallet.hotkey) - run_name = f"validator-{wallet.hotkey.ss58_address}-{cortext.__version__}" + run_name = f"validator-{config.wallet.hotkey.ss58_address}-{cortext.__version__}" config.run_name = run_name config.version = cortext.__version__ config.type = "validator" @@ -104,7 +105,7 @@ def init_wandb(config): reinit=True ) - signature = wallet.hotkey.sign(run.id.encode()).hex() + signature = config.wallet.hotkey.sign(run.id.encode()).hex() config.signature = signature wandb.config.update(config.__dict__, allow_val_change=True) @@ -115,15 +116,17 @@ def main(): Config.check_required_env_vars() args = parse_arguments() config = Config(args) + + setup_logging(config) + config.wallet = bt.wallet(name=config.wallet.name, hotkey=config.wallet.hotkey) config.dendrite = bt.dendrite(wallet=config.wallet) - setup_logging(config) bt.logging.info(f"Config: {vars(config)}") init_wandb(config) loop = asyncio.get_event_loop() - weight_setter = WeightSetter(config=config) + weight_setter = WeightSetter(config=config, cache=cache_service, loop=loop) state_path = os.path.join(config.full_path, "state.json") utils.get_state(state_path) try: @@ -136,6 +139,8 @@ def main(): bt.logging.info("updating status before exiting validator") state = utils.get_state(state_path) utils.save_state_to_file(state, state_path) + bt.logging.info("closing connection of cache database.") + cache_service.close() if config.wandb_on: wandb.finish() diff --git a/validators/weight_setter.py b/validators/weight_setter.py index e8987001..22e53cfb 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -2,30 +2,43 @@ import concurrent import random import torch -import traceback +import time + +from black.trans import defaultdict +from click.core import batch from substrateinterface import SubstrateInterface from functools import partial -from typing import Tuple -import wandb +from typing import Tuple, List import bittensor as bt - +from bittensor import StreamingSynapse import cortext from starlette.types import Send from cortext.protocol import IsAlive, StreamPrompting, ImageResponse, Embeddings from cortext.metaclasses import ValidatorRegistryMeta -from validators.services import BaseValidator, TextValidator, CapacityService +from validators.services import CapacityService, BaseValidator, TextValidator, ImageValidator +from validators.services.cache import QueryResponseCache +from validators.utils import error_handler, setup_max_capacity +from validators.task_manager import TaskMgr scoring_organic_timeout = 60 - +NUM_INTERVALS_PER_CYCLE = 10 class WeightSetter: - def __init__(self, config): + def __init__(self, config, cache: QueryResponseCache, loop=None): + + # Cache object using sqlite3. + self.next_block_to_wait = None + self.synthetic_task_done = False + self.task_mgr: TaskMgr = None + self.in_cache_processing = False + self.batch_size = config.max_miners_cnt + self.cache = cache + self.uid_to_capacity = {} - self.available_uids = None - self.NUM_QUERIES_PER_UID = 10 - self.remaining_queries = [] + self.available_uid_to_axons = {} + self.uids_to_query = [] bt.logging.info("Initializing WeightSetter") self.config = config self.wallet = config.wallet @@ -36,37 +49,61 @@ def __init__(self, config): self.my_uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address) bt.logging.info(f"Running validator on subnet: {self.netuid} with uid: {self.my_uid}") - # Initialize scores + # Scoring and querying parameters + self.max_score_cnt_per_model = 1 + + # Initialize scores and counts self.total_scores = {} self.score_counts = {} self.moving_average_scores = None - + # Set up axon and dendrite self.axon = bt.axon(wallet=self.wallet, config=self.config) bt.logging.info(f"Axon server started on port {self.config.axon.port}") self.dendrite = config.dendrite - # Set up async-related attributes - self.lock = asyncio.Lock() - self.loop = asyncio.get_event_loop() - self.request_timestamps = {} - self.organic_scoring_tasks = set() - - # Initialize prompt cache - self.prompt_cache = {} - # Get network tempo self.tempo = self.subtensor.tempo(self.netuid) self.weights_rate_limit = self.node_query('SubtensorModule', 'WeightsSetRateLimit', [self.netuid]) + # Set up async-related attributes + self.lock = asyncio.Lock() + self.loop = loop or asyncio.get_event_loop() + + # Initialize shared query database + self.query_database = [] + + # initialize uid and capacities. + asyncio.run(self.initialize_uids_and_capacities()) + self.set_up_next_block_to_wait() # Set up async tasks self.thread_executor = concurrent.futures.ThreadPoolExecutor(thread_name_prefix='asyncio') - self.loop.create_task(self.consume_organic_scoring()) - self.loop.create_task(self.perform_synthetic_scoring_and_update_weights()) + self.loop.create_task(self.consume_organic_queries()) + self.loop.create_task(self.perform_synthetic_queries()) + self.loop.create_task(self.process_queries_from_database()) async def run_sync_in_async(self, fn): - return await self.loop.run_in_executor(self.thread_executor, fn) - + return await self.loop.run_in_executor(None, fn) + + async def refresh_metagraph(self): + await self.run_sync_in_async(lambda: self.metagraph.sync()) + + async def initialize_uids_and_capacities(self): + self.available_uid_to_axons = await self.get_available_uids() + self.uids_to_query = list(self.available_uid_to_axons.keys()) + bt.logging.info(f"Available UIDs: {list(self.available_uid_to_axons.keys())}") + self.uid_to_capacity = await self.get_capacities_for_uids(self.available_uid_to_axons) + bt.logging.info(f"Capacities for miners: {self.uid_to_capacity}") + # Initialize total_scores, score_counts. + self.total_scores = {uid: 0.0 for uid in self.available_uid_to_axons.keys()} + self.score_counts = {uid: 0 for uid in self.available_uid_to_axons.keys()} + + # update task_mgr after synthetic query at the end of iterator. + if self.task_mgr: + self.task_mgr.update_remain_capacity_based_on_new_capacity(self.uid_to_capacity) + else: + self.task_mgr = TaskMgr(uid_to_capacities=self.uid_to_capacity, dendrite=self.dendrite, + metagraph=self.metagraph, loop=self.loop) def node_query(self, module, method, params): try: @@ -76,98 +113,167 @@ def node_query(self, module, method, params): # reinitilize node self.node = SubstrateInterface(url=self.config.subtensor.chain_endpoint) result = self.node.query(module, method, params).value - + return result def get_blocks_til_epoch(self, block): return self.tempo - (block + 19) % (self.tempo + 1) - async def refresh_metagraph(self): - await self.run_sync_in_async(lambda: self.metagraph.sync()) + def is_epoch_end(self): + current_block = self.node_query('System', 'Number', []) + last_update = current_block - self.node_query('SubtensorModule', 'LastUpdate', [self.netuid])[self.my_uid] + bt.logging.info(f"last update: {last_update} blocks ago") + if last_update >= self.tempo * 2 or ( + self.get_blocks_til_epoch(current_block) < 10 and last_update >= self.weights_rate_limit): + return True + return False - async def initialize_uids_and_capacities(self): - self.available_uids = await self.get_available_uids() - bt.logging.info(f"Available UIDs: {list(self.available_uids.keys())}") - # self.uid_to_capacity = await self.get_capacities_for_uids(self.available_uids) - # bt.logging.info(f"Capacities for miners: {self.uid_to_capacity}") - self.total_scores = {uid: 0.0 for uid in self.available_uids.keys()} - self.score_counts = {uid: 0 for uid in self.available_uids.keys()} - self.remaining_queries = self.shuffled(list(self.available_uids.keys()) * self.NUM_QUERIES_PER_UID) - - async def update_and_refresh(self, last_update): - bt.logging.info(f"setting weights, last update {last_update} blocks ago") + async def update_and_refresh(self): await self.update_weights() - bt.logging.info("Refreshing metagraph...") await self.refresh_metagraph() + await self.initialize_uids_and_capacities() + bt.logging.info("Metagraph refreshed.") - bt.logging.info("Refreshing available UIDs...") - self.available_uids = await self.get_available_uids() - bt.logging.info(f"Available UIDs: {list(self.available_uids.keys())}") + async def query_miner(self, uid, query_syn: cortext.ALL_SYNAPSE_TYPE): + if query_syn.streaming: + if uid is None: + bt.logging.error("Can't create task.") + return + bt.logging.trace(f"synthetic task is created and uid is {uid}") - # bt.logging.info("Refreshing capacities...") - # self.uid_to_capacity = await self.get_capacities_for_uids(self.available_uids) - - self.total_scores = {uid: 0 for uid in self.available_uids.keys()} - self.score_counts = {uid: 0 for uid in self.available_uids.keys()} - self.remaining_queries = self.shuffled(list(self.available_uids.keys()) * self.NUM_QUERIES_PER_UID) + async def handle_response(responses): + start_time = time.time() + response_text = '' + for resp in responses: + async for chunk in resp: + if isinstance(chunk, str): + response_text += chunk + bt.logging.trace(f"Streamed text: {chunk}") + # Store the query and response in the shared database + async with self.lock: + self.query_database.append({ + 'uid': uid, + 'synapse': query_syn, + 'response': (response_text, time.time() - start_time), + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, + metagraph=self.metagraph) + }) + + axon = self.metagraph.axons[uid] + responses = await self.dendrite( + axons=[axon], + synapse=query_syn, + deserialize=False, + timeout=query_syn.timeout, + streaming=True, + ) + await handle_response(responses) + else: + pass + + async def create_query_syns_for_remaining_bandwidth(self): + query_tasks = [] + for uid, provider_to_cap in self.task_mgr.remain_resources.items(): + for provider, model_to_cap in provider_to_cap.items(): + for model, bandwidth in model_to_cap.items(): + if bandwidth > 0: + # create task and send remaining requests to the miner + vali = self.choose_validator_from_model(model) + query_task = vali.create_query(uid, provider, model) + query_tasks += [query_task] * bandwidth + else: + continue + query_synapses = await asyncio.gather(*query_tasks) + random.shuffle(query_synapses) + return query_synapses + + def set_up_next_block_to_wait(self): + # score all miners based on uid. + if self.next_block_to_wait: + current_block = self.next_block_to_wait + else: + current_block = self.node_query('System', 'Number', []) + next_block = current_block + (self.tempo / NUM_INTERVALS_PER_CYCLE) # 36 blocks per cycle. + self.next_block_to_wait = next_block + + def is_cycle_end(self): + current_block = self.node_query('System', 'Number', []) + bt.logging.info(current_block, self.next_block_to_wait) + if current_block >= self.next_block_to_wait: + return True + else: + return False - async def perform_synthetic_scoring_and_update_weights(self): + async def perform_synthetic_queries(self): while True: - if self.available_uids is None: - await self.initialize_uids_and_capacities() - - current_block = self.node_query('System', 'Number', []) - last_update = current_block - self.node_query('SubtensorModule', 'LastUpdate', [self.netuid])[self.my_uid] - bt.logging.info(f"last update: {last_update} blocks ago") - - if last_update >= self.tempo * 2 or ( - self.get_blocks_til_epoch(current_block) < 10 and last_update >= self.weights_rate_limit): - - await self.update_and_refresh(last_update) - - if not self.remaining_queries: - bt.logging.info("No more queries to perform until next epoch.") + if not self.is_cycle_end(): + await asyncio.sleep(12) continue - - bt.logging.debug(f"not setting weights, last update {last_update} blocks ago, " - f"{self.get_blocks_til_epoch(current_block)} blocks til epoch") - - selected_validator = self.select_validator() - num_uids_to_query = min(self.config.max_miners_cnt, len(self.remaining_queries)) - - # Pop UIDs to query from the remaining_queries list - uids_to_query = [self.remaining_queries.pop() for _ in range(num_uids_to_query)] - uid_to_scores = await self.process_modality(selected_validator, uids_to_query) - - bt.logging.info(f"Remaining queries: {len(self.remaining_queries)}") - - if uid_to_scores is None: - bt.logging.trace("uid_to_scores is None.") + self.set_up_next_block_to_wait() + start_time = time.time() + # don't process any organic query while processing synthetic queries. + synthetic_tasks = [] + async with self.lock: + # check available bandwidth and send synthetic requests to all miners. + query_synapses = await self.create_query_syns_for_remaining_bandwidth() + for query_syn in query_synapses: + uid = self.task_mgr.assign_task(query_syn) + synthetic_tasks.append((uid, self.query_miner(uid, query_syn))) + + bt.logging.debug(f"{time.time() - start_time} elapsed for creating and submitting synthetic queries.") + + # restore capacities immediately after synthetic query consuming all bandwidth. + self.task_mgr.restore_capacities_for_all_miners() + + batched_tasks, remain_tasks = self.pop_synthetic_tasks_max_100_per_miner(synthetic_tasks) + while batched_tasks: + start_time = time.time() + await self.dendrite.aclose_session() + await asyncio.gather(*batched_tasks) + batched_tasks, remain_tasks = self.pop_synthetic_tasks_max_100_per_miner(remain_tasks) + + self.synthetic_task_done = True + bt.logging.info( + f"synthetic queries has been processed successfully." + f"total queries are {len(query_synapses)}") + + def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks): + batch_size = 300 + max_query_cnt_per_miner = 50 + batch_tasks = [] + remain_tasks = [] + uid_to_task_cnt = defaultdict(int) + for uid, synthetic_task in synthetic_tasks: + if uid_to_task_cnt[uid] < max_query_cnt_per_miner: + batch_tasks.append(synthetic_task) + if len(batch_tasks) > batch_size: + break + uid_to_task_cnt[uid] += 1 continue + else: + remain_tasks.append((uid, synthetic_task)) + continue + return batch_tasks, remain_tasks - for uid, score in uid_to_scores.items(): - async with self.lock: - self.total_scores[uid] += score - self.score_counts[uid] += 1 - - # Slow down the validator steps if necessary - await asyncio.sleep(1) - - def select_validator(self): - rand = random.random() + def choose_validator_from_model(self, model): text_validator = ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, metagraph=self.metagraph) - image_validator = ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config, - metagraph=self.metagraph) - if rand > self.config.image_validator_probability: + # image_validator = ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config, + # metagraph=self.metagraph) + if model != 'dall-e-3': + text_validator.model = model return text_validator - else: - return image_validator + # else: + # return image_validator async def get_capacities_for_uids(self, uids): capacity_service = CapacityService(metagraph=self.metagraph, dendrite=self.dendrite) uid_to_capacity = await capacity_service.query_capacity_to_miners(uids) + # apply limit on max_capacity for each miner. + setup_max_capacity(uid_to_capacity) return uid_to_capacity async def get_available_uids(self): @@ -180,8 +286,6 @@ async def get_available_uids(self): # Create a dictionary of UID to axon info for active UIDs available_uids = {uid: axon_info for uid, axon_info in zip(tasks.keys(), results) if axon_info is not None} - bt.logging.info(f"Available UIDs: {list(available_uids.keys())}") - return available_uids async def check_uid(self, axon, uid): @@ -199,43 +303,19 @@ async def check_uid(self, axon, uid): bt.logging.error(f"Error checking UID {uid}: {err}") return None - @staticmethod - def shuffled(list_: list) -> list: - list_ = list_.copy() - random.shuffle(list_) - return list_ - - async def process_modality(self, selected_validator: BaseValidator, available_uids): - if not available_uids: - bt.logging.info("No available uids.") - return None - bt.logging.info(f"starting query {selected_validator.__class__.__name__} for miners {available_uids}") - query_responses = await selected_validator.start_query(available_uids) - - if not selected_validator.should_i_score(): - bt.logging.info("we don't score this time.") - return None - - bt.logging.debug(f"scoring query with query responses for " - f"these uids: {available_uids}") - uid_scores_dict, scored_responses, responses = await selected_validator.score_responses(query_responses) - wandb_data = await selected_validator.build_wandb_data(uid_scores_dict, responses) - if self.config.wandb_on and not wandb_data: - wandb.log(wandb_data) - bt.logging.success("wandb_log successful") - return uid_scores_dict - async def update_weights(self): - """Update weights based on average scores, using min-max normalization.""" + """Update weights based on average scores.""" bt.logging.info("Updating weights...") avg_scores = {} - for uid in self.total_scores: - count = self.score_counts[uid] - if count > 0: - avg_scores[uid] = self.total_scores[uid] / count - else: - avg_scores[uid] = 0.0 + # Compute average scores per UID + async with self.lock: + for uid in self.total_scores: + count = self.score_counts[uid] + if count > 0: + avg_scores[uid] = self.total_scores[uid] / count + else: + avg_scores[uid] = 0.0 bt.logging.info(f"Average scores = {avg_scores}") @@ -244,15 +324,10 @@ async def update_weights(self): for uid, score in avg_scores.items(): weights[uid] = score - # Check if all weights are zero - if torch.all(weights == 0): - bt.logging.warning("All weights are zero. Setting all weights to 1 to prevent error.") - weights = torch.ones(len(self.metagraph.uids)) - await self.set_weights(weights) async def set_weights(self, scores): - # alpha of .3 means that each new score replaces 30% of the weight of the previous weights + # Alpha of .3 means that each new score replaces 30% of the weight of the previous weights alpha = .3 if self.moving_average_scores is None: self.moving_average_scores = scores.clone() @@ -304,31 +379,70 @@ def base_blacklist(self, synapse, blacklist_amt=20000) -> Tuple[bool, str]: bt.logging.exception(err) async def images(self, synapse: ImageResponse) -> ImageResponse: - bt.logging.info(f"received {synapse}") - - synapse = await self.dendrite(self.metagraph.axons[synapse.uid], synapse, deserialize=False, - timeout=synapse.timeout) - - bt.logging.info(f"new synapse = {synapse}") - return synapse + bt.logging.info(f"Received {synapse}") + + axon = self.metagraph.axons[synapse.uid] + start_time = time.time() + synapse_response: ImageResponse = await self.dendrite(axon, synapse, deserialize=False, + timeout=synapse.timeout) + synapse_response.process_time = time.time() - start_time + + bt.logging.info(f"New synapse = {synapse_response}") + # Store the query and response in the shared database + async with self.lock: + self.query_database.append({ + 'uid': synapse.uid, + 'synapse': synapse, + 'response': synapse_response, + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config, + metagraph=self.metagraph) + }) + + return synapse_response async def embeddings(self, synapse: Embeddings) -> Embeddings: - bt.logging.info(f"received {synapse}") - - synapse = await self.dendrite(self.metagraph.axons[synapse.uid], synapse, deserialize=False, - timeout=synapse.timeout) - - bt.logging.info(f"new synapse = {synapse}") - return synapse - - async def prompt(self, synapse: StreamPrompting) -> StreamPrompting: - bt.logging.info(f"received {synapse}") - - # Return the streaming response as before - async def _prompt(synapse, send: Send): + bt.logging.info(f"Received {synapse}") + + axon = self.metagraph.axons[synapse.uid] + synapse_response = await self.dendrite(axon, synapse, deserialize=False, + timeout=synapse.timeout) + + bt.logging.info(f"New synapse = {synapse_response}") + # Store the query and response in the shared database + async with self.lock: + self.query_database.append({ + 'uid': synapse.uid, + 'synapse': synapse, + 'response': synapse_response, + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('EmbeddingsValidator')(config=self.config, + metagraph=self.metagraph) + }) + + return synapse_response + + async def prompt(self, synapse: StreamPrompting) -> StreamingSynapse.BTStreamingResponse: + bt.logging.info(f"Received {synapse}") + + # Return the streaming response + async def _prompt(query_synapse: StreamPrompting, send: Send): bt.logging.info(f"Sending {synapse} request to uid: {synapse.uid}") + synapse.deserialize_flag = False + synapse.streaming = True + uid = self.task_mgr.assign_task(query_synapse) + if uid is None: + bt.logging.error("Can't create task.") + await send({"type": "http.response.body", "body": b'', "more_body": False}) + return + bt.logging.trace(f"task is created and uid is {uid}") + async def handle_response(responses): + start_time = time.time() + response_text = '' for resp in responses: async for chunk in resp: if isinstance(chunk, str): @@ -337,10 +451,25 @@ async def handle_response(responses): "body": chunk.encode("utf-8"), "more_body": True, }) - bt.logging.info(f"Streamed text: {chunk}") + response_text += chunk + bt.logging.trace(f"Streamed text: {chunk}") + + # Store the query and response in the shared database + async with self.lock: + self.query_database.append({ + 'uid': synapse.uid, + 'synapse': synapse, + 'response': (response_text, time.time() - start_time), + 'query_type': 'organic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, + metagraph=self.metagraph) + }) + await send({"type": "http.response.body", "body": b'', "more_body": False}) - axon = self.metagraph.axons[synapse.uid] + axon = self.metagraph.axons[uid] + await self.dendrite.aclose_session() responses = await self.dendrite( axons=[axon], synapse=synapse, @@ -353,7 +482,7 @@ async def handle_response(responses): token_streamer = partial(_prompt, synapse) return synapse.create_streaming_response(token_streamer) - async def consume_organic_scoring(self): + async def consume_organic_queries(self): bt.logging.info("Attaching forward function to axon.") self.axon.attach( forward_fn=self.prompt, @@ -365,13 +494,129 @@ async def consume_organic_scoring(self): forward_fn=self.embeddings, blacklist_fn=self.blacklist_embeddings, ) - self.axon.serve(netuid=self.netuid) + self.axon.serve(netuid=self.netuid, subtensor=self.subtensor) + print(f"axon: {self.axon}") self.axon.start() bt.logging.info(f"Running validator on uid: {self.my_uid}") + + def get_scoring_tasks_from_query_responses(self, queries_to_process): + + grouped_query_resps = defaultdict(list) + validator_to_query_resps = defaultdict(list) + type_to_validator = {} + + # Process queries outside the lock to prevent blocking + for query_data in queries_to_process: + uid = query_data['uid'] + synapse = query_data['synapse'] + response = query_data['response'] + validator = query_data['validator'] + vali_type = type(validator).__name__ + type_to_validator[vali_type] = validator + + provider = synapse.provider + model = synapse.model + + grouped_key = f"{vali_type}:{uid}:{provider}:{model}" + grouped_query_resps[grouped_key].append( + (uid, {'query': synapse, 'response': response})) + + for key, uid_to_query_resps in grouped_query_resps.items(): + vali_type = str(key).split(":")[0] + if not uid_to_query_resps: + continue + query_resp_to_score_for_uids = random.choices(uid_to_query_resps, k=self.max_score_cnt_per_model) + validator_to_query_resps[vali_type] += query_resp_to_score_for_uids + + score_tasks = [] + for vali_type in type_to_validator: + validator = type_to_validator[vali_type] + text_score_task = validator.score_responses(validator_to_query_resps[vali_type], self.uid_to_capacity) + score_tasks.append(text_score_task) + return score_tasks + + async def process_queries_from_database(self): while True: - try: - # Check for organic scoring tasks here - await asyncio.sleep(60) - except Exception as err: - bt.logging.exception(err) - await asyncio.sleep(10) \ No newline at end of file + await asyncio.sleep(1) # Adjust the sleep time as needed + # accumulate all query results for 36 blocks + if not self.synthetic_task_done or not self.is_epoch_end(): + bt.logging.trace("no data in query_database. so continue...") + continue + + bt.logging.info(f"start scoring process...") + + async with self.lock: + queries_to_process = self.query_database.copy() + self.query_database.clear() + + self.synthetic_task_done = False + + # with all query_respones, select one per uid, provider, model randomly and score them. + score_tasks = self.get_scoring_tasks_from_query_responses(queries_to_process) + + resps = await asyncio.gather(*score_tasks) + resps = [item for item in resps if item is not None] + # Update total_scores and score_counts + async with self.lock: + for uid_scores_dict, _, _ in resps: + for uid, score in uid_scores_dict.items(): + self.total_scores[uid] += score + self.score_counts[uid] += 1 + bt.logging.info(f"current total score are {self.total_scores}") + await self.update_and_refresh() + + @property + def batch_list_of_all_uids(self): + uids = list(self.available_uid_to_axons.keys()) + batch_size = self.batch_size + batched_list = [] + for i in range(0, len(uids), batch_size): + batched_list.append(uids[i:i + batch_size]) + return batched_list + + async def score_miners_based_cached_answer(self, vali, query, answer): + total_query_resps = [] + provider = query.provider + model = query.model + + async def mock_create_query(*args, **kwargs): + return query + + origin_ref_create_query = vali.create_query + origin_ref_provider_to_models = vali.get_provider_to_models + + for batch_uids in self.batch_list_of_all_uids: + vali.create_query = mock_create_query + vali.get_provider_to_models = lambda: [(provider, model)] + query_responses = await self.perform_queries(vali, batch_uids) + total_query_resps += query_responses + + vali.create_query = origin_ref_create_query + vali.get_provider_to_models = origin_ref_provider_to_models + + bt.logging.debug(f"total cached query_resps: {len(total_query_resps)}") + queries_to_process = [] + for uid, response_data in total_query_resps: + # Decide whether to score this query + queries_to_process.append({ + 'uid': uid, + 'synapse': response_data['query'], + 'response': response_data['response'], + 'query_type': 'synthetic', + 'timestamp': asyncio.get_event_loop().time(), + 'validator': vali + }) + + async def mock_answer(*args, **kwargs): + return answer + + origin_ref_answer_task = vali.get_answer_task + vali.get_answer_task = mock_answer + score_tasks = self.get_scoring_tasks_from_query_responses(queries_to_process) + responses = await asyncio.gather(*score_tasks) + vali.get_answer_task = origin_ref_answer_task + + responses = [item for item in responses if item is not None] + for uid_scores_dict, _, _ in responses: + for uid, score in uid_scores_dict.items(): + self.total_scores[uid] += score