Skip to content

Commit

Permalink
process organic queries first and then make synthetic query
Browse files Browse the repository at this point in the history
  • Loading branch information
acer-king committed Sep 20, 2024
1 parent 5d9ab9f commit cb74d79
Showing 1 changed file with 42 additions and 7 deletions.
49 changes: 42 additions & 7 deletions validators/dendrite.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,72 @@
import asyncio
from collections import defaultdict

from typing import Union, AsyncGenerator, Any, List
from enum import Enum

from pydantic import BaseModel
import bittensor
from bittensor import dendrite, axon
import bittensor as bt
from cortext import ALL_SYNAPSE_TYPE, MIN_REQUEST_PERIOD


class RequestType(str, Enum): # Inherit from str to enforce the value type as string
organic_type = 'organic'
synthetic_type = 'synthetic'


class Request(BaseModel):
target_axon: Union[bittensor.AxonInfo, bittensor.axon]
synapse: ALL_SYNAPSE_TYPE = bittensor.Synapse(),
timeout: float = 12.0,
synapse: ALL_SYNAPSE_TYPE = bittensor.Synapse()
timeout: float = 12.0
deserialize: bool = True
type: RequestType
stream: False


class Dendrite(dendrite):
# class variable to store all status of miners.
hotkey_to_uid_capacity = defaultdict(tuple)
requests_queue: List[Request] = []
synthetic_requests_queue: List[Request] = []
organic_requests_queue: List[Request] = []

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

@classmethod
def push_request_queue(cls, request):
cls.push_request_queue(request)
def push_request_queue(cls, request: Request):
if request.type == RequestType.organic_type:
cls.organic_requests_queue.append(request)
if request.type == RequestType.synthetic_type:
cls.synthetic_requests_queue.append(request)

@classmethod
def process_requests(cls):
while True:
if cls.organic_requests_queue:
# distribute organic queries to miners according to bandwidth.
bt.logging.info("# distribute organic queries to miners according to bandwidth.")
organic_tasks = []
for request in cls.organic_requests_queue:
uid, cap = cls.get_remaining_capacity(request)
if cap > 0:
if request.stream:
task = super().call_stream(target_axon=request.target_axon, synapse=request.synapse,
timeout=request.timeout,
deserialize=request.deserialize)
else:
task = super().call(target_axon=request.target_axon, synapse=request.synapse,
timeout=request.timeout,
deserialize=request.deserialize)
results = asyncio.gather(*organic_tasks)

@classmethod
def get_remaining_capacity(cls, target_axon: axon, synapse: ALL_SYNAPSE_TYPE):
def get_remaining_capacity(cls, request):
target_axon = request.target_axon
synapse = request.synapse
hotkey = target_axon.info().hotkey
uid, cap = cls.miners_to_capacity[hotkey]
uid, cap = cls.hotkey_to_uid_capacity[hotkey]
provider = synapse.provider
model = synapse.model
return uid, cap.get(provider).get(model)
Expand Down

0 comments on commit cb74d79

Please sign in to comment.