Skip to content

Commit

Permalink
send max 100 request for each miner per each interation
Browse files Browse the repository at this point in the history
  • Loading branch information
acer-king committed Oct 2, 2024
1 parent da99a9f commit 8f6d34f
Showing 1 changed file with 25 additions and 5 deletions.
30 changes: 25 additions & 5 deletions validators/weight_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from black.trans import defaultdict
from click.core import batch
from substrateinterface import SubstrateInterface
from functools import partial
from typing import Tuple, List
Expand Down Expand Up @@ -221,24 +222,43 @@ async def perform_synthetic_queries(self):
query_synapses = await self.create_query_syns_for_remaining_bandwidth()
for query_syn in query_synapses:
uid = self.task_mgr.assign_task(query_syn)
synthetic_tasks.append(self.query_miner(uid, query_syn))
synthetic_tasks.append((uid, self.query_miner(uid, query_syn)))

bt.logging.debug(f"{time.time() - start_time} elapsed for creating and submitting synthetic queries.")

# restore capacities immediately after synthetic query consuming all bandwidth.
self.task_mgr.restore_capacities_for_all_miners()

batch_size = 30
for batched_queries in [synthetic_tasks[i:i + batch_size] for i in
range(0, len(synthetic_tasks), batch_size)]:
batched_tasks, remain_tasks = self.pop_synthetic_tasks_max_100_per_miner(synthetic_tasks)
while batched_tasks:
start_time = time.time()
await self.dendrite.aclose_session()
await asyncio.gather(*batched_queries)
await asyncio.gather(*batched_tasks)
batched_tasks, remain_tasks = self.pop_synthetic_tasks_max_100_per_miner(remain_tasks)

self.synthetic_task_done = True
bt.logging.info(
f"synthetic queries has been processed successfully."
f"total queries are {len(query_synapses)}")

def pop_synthetic_tasks_max_100_per_miner(self, synthetic_tasks):
batch_size = 300
max_query_cnt_per_miner = 100
batch_tasks = []
remain_tasks = []
uid_to_task_cnt = defaultdict(int)
for uid, synthetic_task in synthetic_tasks:
if uid_to_task_cnt[uid] < max_query_cnt_per_miner:
batch_tasks.append(synthetic_task)
if len(batch_tasks) > batch_size:
break
uid_to_task_cnt[uid] += 1
continue
else:
remain_tasks.append((uid, synthetic_task))
continue
return batch_tasks, remain_tasks

def choose_validator_from_model(self, model):
text_validator = ValidatorRegistryMeta.get_class('TextValidator')(config=self.config, metagraph=self.metagraph)
# image_validator = ValidatorRegistryMeta.get_class('ImageValidator')(config=self.config,
Expand Down

0 comments on commit 8f6d34f

Please sign in to comment.