From 8f6d34faa4dd40c1a2da0d522f3c584cf3a50f36 Mon Sep 17 00:00:00 2001 From: acer-king Date: Wed, 2 Oct 2024 06:30:35 -0700 Subject: [PATCH] send max 100 request for each miner per each interation --- validators/weight_setter.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/validators/weight_setter.py b/validators/weight_setter.py index d9496f05..ebde2c64 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -5,6 +5,7 @@ import time from black.trans import defaultdict +from click.core import batch from substrateinterface import SubstrateInterface from functools import partial from typing import Tuple, List @@ -221,24 +222,43 @@ async def perform_synthetic_queries(self): query_synapses = await self.create_query_syns_for_remaining_bandwidth() for query_syn in query_synapses: uid = self.task_mgr.assign_task(query_syn) - synthetic_tasks.append(self.query_miner(uid, query_syn)) + synthetic_tasks.append((uid, self.query_miner(uid, query_syn))) bt.logging.debug(f"{time.time() - start_time} elapsed for creating and submitting synthetic queries.") # restore capacities immediately after synthetic query consuming all bandwidth. self.task_mgr.restore_capacities_for_all_miners() - batch_size = 30 - for batched_queries in [synthetic_tasks[i:i + batch_size] for i in - range(0, len(synthetic_tasks), batch_size)]: + batched_tasks, remain_tasks = self.pop_synthetic_tasks_max_100_per_miner(synthetic_tasks) + while batched_tasks: + start_time = time.time() await self.dendrite.aclose_session() - await asyncio.gather(*batched_queries) + await asyncio.gather(*batched_tasks) + batched_tasks, remain_tasks = self.pop_synthetic_tasks_max_100_per_miner(remain_tasks) self.synthetic_task_done = True bt.logging.info( f"synthetic queries has been processed successfully." f"total queries are {len(query_synapses)}") + def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks): + batch_size = 300 + max_query_cnt_per_miner = 100 + batch_tasks = [] + remain_tasks = [] + uid_to_task_cnt = defaultdict(int) + for uid, synthetic_task in synthetic_tasks: + if uid_to_task_cnt[uid] < max_query_cnt_per_miner: + batch_tasks.append(synthetic_task) + if len(batch_tasks) > batch_size: + break + uid_to_task_cnt[uid] += 1 + continue + else: + remain_tasks.append((uid, synthetic_task)) + continue + return batch_tasks, remain_tasks + def choose_validator_from_model(self, model): text_validator = ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, metagraph=self.metagraph) # image_validator = ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config,