diff --git a/organic.py b/organic.py index 202d84fe..f91afec1 100644 --- a/organic.py +++ b/organic.py @@ -4,6 +4,49 @@ import traceback from cortext.protocol import StreamPrompting from cortext.dendrite import CortexDendrite +import aiohttp +from validators.services.cache import cache_service + + + +def load_entire_questions(): + # Asynchronous function to fetch a URL + async def fetch(session, url): + async with session.get(url) as response: + try: + return await response.json() + except Exception as err: + bt.logging.error(f"{err} {traceback.format_exc()}") + + # Asynchronous function to gather multiple HTTP requests + async def gather_requests(urls): + async with aiohttp.ClientSession() as session: + tasks = [] + for url in urls: + tasks.append(fetch(session, url)) # Create a task for each URL + results = await asyncio.gather(*tasks) # Run all tasks concurrently + return results + + # Main function to run the event loop + def main(urls): + loop = asyncio.get_event_loop() + results = loop.run_until_complete(gather_requests(urls)) + return results + + urls = [] + for q_id in range(0, 80000, 100): + url = f"https://datasets-server.huggingface.co/rows?dataset=microsoft%2Fms_marco&config=v1.1&split=train&offset={q_id}&length=100" + urls.append(url) + responses = main(urls) + queries = [] + for response in responses: + if response is None: + continue + for row in response.get('rows', []): + query = row['row']['query'] + queries.append(query) + + return queries async def generate_prompts(num_prompts=100): subjects = [ @@ -87,38 +130,41 @@ async def handle_response(resp): async def main(): print("synching metagraph, this takes way too long.........") - subtensor = bt.subtensor(network="finney") - meta = subtensor.metagraph(netuid=18) + subtensor = bt.subtensor(network="test") + meta = subtensor.metagraph(netuid=196) print("metagraph synched!") # This needs to be your validator wallet that is running your subnet 18 validator - wallet = bt.wallet(name="default", hotkey="default") + wallet = bt.wallet(name="miner", hotkey="default") dendrite = CortexDendrite(wallet=wallet) vali_uid = meta.hotkeys.index(wallet.hotkey.ss58_address) axon_to_use = meta.axons[vali_uid] print(f"axon to use: {axon_to_use}") num_prompts = 10 - prompts = await generate_prompts(num_prompts) + prompts = load_entire_questions() + prompts = prompts[:10000] synapses = [StreamPrompting( messages=[{"role": "user", "content": prompt}], provider="OpenAI", model="gpt-4o" ) for prompt in prompts] + an_synapses = [StreamPrompting( + messages=[{"role": "user", "content": prompt}], + provider="Anthropic", + model="claude-3-5-sonnet-20240620" + ) for prompt in prompts] + synapses += an_synapses + async def query_and_log(synapse): return await query_miner(dendrite, axon_to_use, synapse) responses = await asyncio.gather(*[query_and_log(synapse) for synapse in synapses]) - import csv - with open('miner_responses.csv', 'w', newline='', encoding='utf-8') as file: - writer = csv.writer(file) - writer.writerow(['Prompt', 'Response']) - for prompt, response in zip(prompts, responses): - writer.writerow([prompt, response]) + cache_service.set_cache_in_batch(synapses) - print("Responses saved to miner_responses.csv") + print("Responses saved to cache database") if __name__ == "__main__": diff --git a/validators/services/cache.py b/validators/services/cache.py index a5ebab82..d9d80783 100644 --- a/validators/services/cache.py +++ b/validators/services/cache.py @@ -50,7 +50,7 @@ def set_cache_in_batch(self, syns: List[StreamPrompting], ttl=3600 * 24): expires_at = time.time() for syn in syns: p_key = self.generate_hash(str(expires_at) + str(syn.json())) - datas.append((p_key, syn.json(exclude={"dendrite"}), syn.completion, syn.provider, syn.model, expires_at)) + datas.append((p_key, syn.json(exclude={"dendrite", "completion"}), syn.completion, syn.provider, syn.model, expires_at)) # Insert multiple records cursor = self.conn.cursor()