Skip to content

Commit

Permalink
clean logs and add batch questions for each uid
Browse files Browse the repository at this point in the history
  • Loading branch information
acer-king committed Oct 3, 2024
1 parent bfbdb89 commit dcaccc8
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
2 changes: 2 additions & 0 deletions miner/providers/groq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from .base import Provider
from miner.config import config
from cortext.protocol import StreamPrompting
from ..error_handler import error_handler


class Groq(Provider):
def __init__(self, synapse):
super().__init__(synapse)
self.groq_client = AsyncGroq(timeout=config.ASYNC_TIME_OUT, api_key=config.GROQ_API_KEY)

@error_handler
async def _prompt(self, synapse: StreamPrompting, send: Send):
stream_kwargs = {
"messages": self.messages,
Expand Down
8 changes: 4 additions & 4 deletions validators/services/validators/text_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ async def organic(self, metagraph, query: dict[str, list[dict[str, str]]]) -> As
bt.logging.trace(resp)
yield uid, resp

async def get_question(self, miner_cnt=1):
async def get_question(self, batch_size=1):
is_vision_model = self.model in constants.VISION_MODELS
question = await get_question("text", miner_cnt, is_vision_model)
question = await get_question("text", batch_size, is_vision_model)
return question

@get_query_synapse_from_cache
async def create_query(self, uid, provider=None, model=None) -> bt.Synapse:
question = await self.get_question()
async def create_query(self, uid, provider=None, model=None, batch_size=1) -> bt.Synapse:
question = await self.get_question(batch_size=batch_size)
prompt = question.get("prompt")
image = question.get("image")
if image:
Expand Down
5 changes: 3 additions & 2 deletions validators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ async def wrapper(*args, **kwargs):
provider = args[2]
model = args[3]
questions_answers = cache_service.get_all_question_to_answers(provider=provider, model=model)
if not questions_answers or random.random() > 0:
if not questions_answers or random.random() > 0.1:
# create question using openai service
query_syn = await func(*args, **kwargs)
return query_syn
# select one of questions_answers
# select one of questions_answers from cache database.
query, answer = random.choice(questions_answers)
query_syn = vali.get_synapse_from_json(query)
return query_syn
Expand Down
11 changes: 7 additions & 4 deletions validators/weight_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class WeightSetter:
def __init__(self, config, cache: QueryResponseCache, loop=None):

# Cache object using sqlite3.
self.current_block = None
self.next_block_to_wait = None
self.synthetic_task_done = False
self.task_mgr: TaskMgr = None
Expand Down Expand Up @@ -122,7 +123,9 @@ def get_blocks_til_epoch(self, block):
def is_epoch_end(self):
current_block = self.node_query('System', 'Number', [])
last_update = current_block - self.node_query('SubtensorModule', 'LastUpdate', [self.netuid])[self.my_uid]
bt.logging.info(f"last update: {last_update} blocks ago")
if self.current_block != current_block:
bt.logging.info(f"last update: {last_update} blocks ago")
self.current_block = current_block
if last_update >= self.tempo * 2 or (
self.get_blocks_til_epoch(current_block) < 10 and last_update >= self.weights_rate_limit):
return True
Expand Down Expand Up @@ -185,7 +188,7 @@ async def create_query_syns_for_remaining_bandwidth(self):
if bandwidth > 0:
# create task and send remaining requests to the miner
vali = self.choose_validator_from_model(model)
query_task = vali.create_query(uid, provider, model)
query_task = vali.create_query(uid, provider, model, batch_size=bandwidth)
query_tasks += [query_task] * bandwidth
else:
continue
Expand All @@ -199,7 +202,7 @@ def set_up_next_block_to_wait(self):
current_block = self.next_block_to_wait
else:
current_block = self.node_query('System', 'Number', [])
next_block = current_block + (self.tempo / NUM_INTERVALS_PER_CYCLE) # 36 blocks per cycle.
next_block = current_block + (self.tempo / NUM_INTERVALS_PER_CYCLE) # 36 blocks per cycle.
self.next_block_to_wait = next_block

def is_cycle_end(self):
Expand All @@ -218,8 +221,8 @@ async def perform_synthetic_queries(self):
self.set_up_next_block_to_wait()
start_time = time.time()
# don't process any organic query while processing synthetic queries.
synthetic_tasks = []
async with self.lock:
synthetic_tasks = []
# check available bandwidth and send synthetic requests to all miners.
query_synapses = await self.create_query_syns_for_remaining_bandwidth()
for query_syn in query_synapses:
Expand Down

0 comments on commit dcaccc8

Please sign in to comment.