diff --git a/cortext/axon.py b/cortext/axon.py index 54389859..1c684119 100644 --- a/cortext/axon.py +++ b/cortext/axon.py @@ -1,7 +1,7 @@ import bittensor import bittensor as bt from substrateinterface import Keypair -from bittensor.errors import SynapseDendriteNoneException +from bittensor.core.errors import SynapseDendriteNoneException class CortexAxon(bt.axon): diff --git a/cortext/dendrite.py b/cortext/dendrite.py index 14abe123..de885103 100644 --- a/cortext/dendrite.py +++ b/cortext/dendrite.py @@ -56,7 +56,7 @@ async def call_stream( async with session.post( url, headers=synapse.to_headers(), - json=synapse.dict(), + json=synapse.model_dump(), ) as response: # Use synapse subclass' process_streaming_response method to yield the response chunks try: diff --git a/cortext/protocol.py b/cortext/protocol.py index aeb92d34..eb731205 100644 --- a/cortext/protocol.py +++ b/cortext/protocol.py @@ -232,7 +232,7 @@ class StreamPrompting(bt.StreamingSynapse): "This attribute is immutable and cannot be updated.", ) - completion: str = pydantic.Field( + completion: Optional[str] = pydantic.Field( None, title="Completion", description="Completion status of the current StreamPrompting object. " @@ -347,7 +347,7 @@ def to_headers(self) -> dict: headers.update( { f"bt_header_axon_{k}": str(v) - for k, v in self.axon.dict().items() + for k, v in self.axon.model_dump().items() if v is not None } ) @@ -355,7 +355,7 @@ def to_headers(self) -> dict: headers.update( { f"bt_header_dendrite_{k}": str(v) - for k, v in self.dendrite.dict().items() + for k, v in self.dendrite.model_dump().items() if v is not None } ) @@ -379,9 +379,9 @@ async def process_streaming_response(self, response: StreamingResponse, organic= self.completion += tokens yield tokens except asyncio.TimeoutError as err: - self.completion += remain_chunk - yield remain_chunk - + # self.completion += remain_chunk + # yield remain_chunk + pass def extract_response_json(self, response: StreamingResponse) -> dict: headers = { diff --git a/cursor/app/core/query_to_validator.py b/cursor/app/core/query_to_validator.py index bc6b4fc9..ab3cf516 100644 --- a/cursor/app/core/query_to_validator.py +++ b/cursor/app/core/query_to_validator.py @@ -4,7 +4,7 @@ from cursor.app.models import ChatRequest from cursor.app.core.protocol import StreamPrompting from cursor.app.core.config import config -from cursor.app.core.dendrite import CortexDendrite +from cortext.dendrite import CortexDendrite import traceback subtensor = bt.subtensor(network="finney") diff --git a/requirements.txt b/requirements.txt index 29b72eed..25752284 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -bittensor==6.9.4 +bittensor==8.4.3 datasets==2.* envparse==0.2.0 openai>=1.3.2, ==1.* diff --git a/server/app/database.py b/server/app/database.py index 63f5e577..7403221d 100644 --- a/server/app/database.py +++ b/server/app/database.py @@ -1,9 +1,11 @@ +import time import psycopg2 import os -from contextlib import asynccontextmanager DATABASE_URL = os.getenv("DATABASE_URL") TABEL_NAME = 'query_resp_data' +TIME_EXPIRATION = 3600 * 24 * 10 # delete records after 10 days since it's creation. +LAST_EXECUTION = time.time() # PostgreSQL connection parameters conn = psycopg2.connect(DATABASE_URL) @@ -49,6 +51,7 @@ async def create_table(app): CREATE INDEX IF NOT EXISTS miner_id_index ON {TABEL_NAME} (miner_uid); CREATE INDEX IF NOT EXISTS miner_hot_key_index ON {TABEL_NAME} (miner_hot_key); CREATE INDEX IF NOT EXISTS idx_score_sim_timestamp ON {TABEL_NAME} (score, similarity, timestamp); + CREATE INDEX IF NOT EXISTS idx_block__cycle_epoch_num_ ON {TABEL_NAME} (block_num, cycle_num, epoch_num); """ cur.execute(create_index_query) conn.commit() @@ -57,4 +60,20 @@ async def create_table(app): except Exception as e: print(f"Error creating table: {e}") + +def delete_records(): + global TIME_EXPIRATION, LAST_EXECUTION + if (time.time() - LAST_EXECUTION) < TIME_EXPIRATION: + return + LAST_EXECUTION = time.time() + timestamp = time.time() - TIME_EXPIRATION + conn = psycopg2.connect(DATABASE_URL) + cur = conn.cursor() + query_str = f""" + delete from query_resp_data where timestamp <= {timestamp} + """ + cur.execute(query_str) + conn.commit() + + create_table(None) diff --git a/server/app/main.py b/server/app/main.py index 9ca90d31..754bc576 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -1,10 +1,17 @@ +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger from contextlib import asynccontextmanager from fastapi import FastAPI, Depends, HTTPException from fastapi.middleware.cors import CORSMiddleware + from . import curd, models, schemas -from .database import create_table, conn, cur +from .database import create_table, conn, cur, delete_records from typing import List +scheduler = BackgroundScheduler() +scheduler.add_job(delete_records, CronTrigger(day_of_week="mon", hour=10, minute=0)) +scheduler.start() + @asynccontextmanager async def lifespan(app: FastAPI): diff --git a/server/requirements.txt b/server/requirements.txt index 5c1bda28..4d33355c 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -2,4 +2,5 @@ fastapi uvicorn[standard] psycopg2-binary sqlalchemy -pydantic \ No newline at end of file +pydantic +apscheduler \ No newline at end of file diff --git a/validators/core/axon.py b/validators/core/axon.py index 5ab01b60..4d0f8753 100644 --- a/validators/core/axon.py +++ b/validators/core/axon.py @@ -19,8 +19,10 @@ from starlette.requests import Request from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint from typing import List, Optional, Tuple, Callable, Any, Dict +from bittensor.core.synapse import Synapse +from bittensor.core.settings import DEFAULTS, version_as_int -from bittensor.errors import ( +from bittensor.core.errors import ( InvalidRequestNameError, SynapseDendriteNoneException, SynapseParsingError, @@ -31,10 +33,14 @@ RunException, PostProcessException, InternalServerError, + SynapseException ) -from bittensor.threadpool import PriorityThreadPoolExecutor +from bittensor.core.threadpool import PriorityThreadPoolExecutor +from bittensor.core.axon import AxonMiddleware import bittensor +from bittensor.utils import networking import bittensor as bt +from bittensor_wallet import Wallet from substrateinterface import Keypair from cursor.app.core.config import config @@ -135,8 +141,8 @@ def stop(self): class CortexAxon(bt.axon): def __init__( self, - wallet: Optional["bittensor.wallet"] = None, - config: Optional["bittensor.config"] = None, + wallet: Optional["Wallet"] = None, + config: Optional["Config"] = None, port: Optional[int] = None, ip: Optional[str] = None, external_ip: Optional[str] = None, @@ -164,36 +170,30 @@ def __init__( if config is None: config = CortexAxon.config() config = copy.deepcopy(config) - config.axon.ip = ip or config.axon.get("ip") - config.axon.port = port or config.axon.get("port") - config.axon.external_ip = external_ip or config.axon.get( - "external_ip" - ) - config.axon.external_port = external_port or config.axon.get( - "external_port" - ) - config.axon.max_workers = max_workers or config.axon.get( - "max_workers" - ) + config.axon.ip = ip or config.axon.ip + config.axon.port = port or config.axon.port + config.axon.external_ip = external_ip or config.axon.external_ip + config.axon.external_port = external_port or config.axon.external_port + config.axon.max_workers = max_workers or config.axon.max_workers CortexAxon.check_config(config) self.config = config # Get wallet or use default. - self.wallet = wallet or bittensor.wallet() + self.wallet = wallet or Wallet() # Build axon objects. self.uuid = str(uuid.uuid1()) self.ip = self.config.axon.ip self.port = self.config.axon.port self.external_ip = ( - self.config.axon.external_ip - if self.config.axon.external_ip != None - else bittensor.utils.networking.get_external_ip() + self.config.axon.external_ip # type: ignore + if self.config.axon.external_ip is not None # type: ignore + else networking.get_external_ip() ) self.external_port = ( - self.config.axon.external_port - if self.config.axon.external_port != None - else self.config.axon.port + self.config.axon.external_port # type: ignore + if self.config.axon.external_port is not None # type: ignore + else self.config.axon.port # type: ignore ) self.full_address = str(self.config.axon.ip) + ":" + str(self.config.axon.port) self.started = False @@ -229,7 +229,8 @@ def ping(r: bittensor.Synapse) -> bittensor.Synapse: self.attach( forward_fn=ping, verify_fn=None, blacklist_fn=None, priority_fn=None ) - self.app.add_middleware(CortexAxonMiddleware, axon=self) + self.middleware_cls = CortexAxonMiddleware + self.app.add_middleware(self.middleware_cls, axon=self) self.app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins @@ -238,7 +239,6 @@ def ping(r: bittensor.Synapse) -> bittensor.Synapse: allow_headers=["*"], # Allows all headers ) - def default_verify(self, synapse: bittensor.Synapse): if synapse.dendrite is not None: keypair = Keypair(ss58_address=synapse.dendrite.hotkey) @@ -274,11 +274,12 @@ def create_error_response(synapse: bittensor.Synapse): content={"message": synapse.axon.status_message}, ) + def log_and_handle_error( - synapse: bittensor.Synapse, - exception: Exception, - status_code: int, - start_time: float, + synapse: bittensor.Synapse, + exception: Exception, + status_code: int, + start_time: float, ): # Display the traceback for user clarity. bittensor.logging.trace(f"Forward exception: {traceback.format_exc()}") @@ -303,7 +304,7 @@ def log_and_handle_error( return synapse -class CortexAxonMiddleware(BaseHTTPMiddleware): +class CortexAxonMiddleware(AxonMiddleware): """ The `AxonMiddleware` class is a key component in the Axon server, responsible for processing all incoming requests. @@ -324,17 +325,6 @@ class CortexAxonMiddleware(BaseHTTPMiddleware): then handling any postprocessing steps such as response header updating and logging. """ - def __init__(self, app: "AxonMiddleware", axon: "bittensor.axon"): - """ - Initialize the AxonMiddleware class. - - Args: - app (object): An instance of the application where the middleware processor is used. - axon (object): The axon instance used to process the requests. - """ - super().__init__(app) - self.axon = axon - async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: @@ -366,8 +356,7 @@ async def dispatch( # Records the start time of the request processing. start_time = time.time() - - if "v1/chat/completions" in request.url.path: + if "v1" in request.url.path: if request.method == "OPTIONS": return await call_next(request) try: @@ -384,7 +373,14 @@ async def dispatch( try: # Set up the synapse from its headers. - synapse: bittensor.Synapse = await self.preprocess(request) + try: + synapse: "Synapse" = await self.preprocess(request) + except Exception as exc: + if isinstance(exc, SynapseException) and exc.synapse is not None: + synapse = exc.synapse + else: + synapse = Synapse() + raise # Logs the start of the request processing if synapse.dendrite is not None: @@ -408,9 +404,6 @@ async def dispatch( # Call the run function response = await self.run(synapse, call_next, request) - # Call the postprocess function - response = await self.postprocess(synapse, response, start_time) - # Handle errors related to preprocess. except InvalidRequestNameError as e: if "synapse" not in locals(): @@ -479,328 +472,3 @@ async def dispatch( # Return the response to the requester. return response - - async def preprocess(self, request: Request) -> bittensor.Synapse: - """ - Performs the initial processing of the incoming request. This method is responsible for - extracting relevant information from the request and setting up the Synapse object, which - represents the state and context of the request within the Axon server. - - Args: - request (Request): The incoming request to be preprocessed. - - Returns: - bittensor.Synapse: The Synapse object representing the preprocessed state of the request. - - The preprocessing involves: - - 1. Extracting the request name from the URL path. - 2. Creating a Synapse instance from the request headers using the appropriate class type. - 3. Filling in the Axon and Dendrite information into the Synapse object. - 4. Signing the Synapse from the Axon side using the wallet hotkey. - - This method sets the foundation for the subsequent steps in the request handling process, - ensuring that all necessary information is encapsulated within the Synapse object. - """ - # Extracts the request name from the URL path. - try: - request_name = request.url.path.split("/")[1] - except: - raise InvalidRequestNameError( - f"Improperly formatted request. Could not parser request {request.url.path}." - ) - - # Creates a synapse instance from the headers using the appropriate forward class type - # based on the request name obtained from the URL path. - request_synapse = self.axon.forward_class_types.get(request_name) - if request_synapse is None: - raise UnknownSynapseError( - f"Synapse name '{request_name}' not found. Available synapses {list(self.axon.forward_class_types.keys())}" - ) - - try: - synapse = request_synapse.from_headers(request.headers) # type: ignore - except Exception as e: - raise SynapseParsingError( - f"Improperly formatted request. Could not parse headers {request.headers} into synapse of type {request_name}." - ) - synapse.name = request_name - - # Fills the local axon information into the synapse. - synapse.axon.__dict__.update( - { - "version": str(bittensor.__version_as_int__), - "uuid": str(self.axon.uuid), - "nonce": f"{time.monotonic_ns()}", - "status_message": "Success", - "status_code": "100", - } - ) - - # Fills the dendrite information into the synapse. - synapse.dendrite.__dict__.update( - {"port": str(request.client.port), "ip": str(request.client.host)} # type: ignore - ) - - # Signs the synapse from the axon side using the wallet hotkey. - message = f"{synapse.axon.nonce}.{synapse.dendrite.hotkey}.{synapse.axon.hotkey}.{synapse.axon.uuid}" - synapse.axon.signature = f"0x{self.axon.wallet.hotkey.sign(message).hex()}" - - # Return the setup synapse. - return synapse - - async def verify(self, synapse: bittensor.Synapse): - """ - Verifies the authenticity and integrity of the request. This method ensures that the incoming - request meets the predefined security and validation criteria. - - Args: - synapse (bittensor.Synapse): The Synapse object representing the request. - - Raises: - Exception: If the verification process fails due to unmet criteria or security concerns. - - The verification process involves: - - 1. Retrieving the specific verification function for the request's Synapse type. - 2. Executing the verification function and handling any exceptions that arise. - - Successful verification allows the request to proceed further in the processing pipeline, while - failure results in an appropriate exception being raised. - """ - # Start of the verification process. Verification is the process where we ensure that - # the incoming request is from a trusted source or fulfills certain requirements. - # We get a specific verification function from 'verify_fns' dictionary that corresponds - # to our request's name. Each request name (synapse name) has its unique verification function. - verify_fn = ( - self.axon.verify_fns.get(synapse.name) if synapse.name is not None else None - ) - - # If a verification function exists for the request's name - if verify_fn: - try: - # We attempt to run the verification function using the synapse instance - # created from the request. If this function runs without throwing an exception, - # it means that the verification was successful. - ( - await verify_fn(synapse) - if inspect.iscoroutinefunction(verify_fn) - else verify_fn(synapse) - ) - except Exception as e: - # If there was an exception during the verification process, we log that - # there was a verification exception. - bittensor.logging.trace(f"Verify exception {str(e)}") - - # Check if the synapse.axon object exists - if synapse.axon is not None: - # We set the status code of the synapse to "401" which denotes an unauthorized access. - synapse.axon.status_code = 401 - else: - # If the synapse.axon object doesn't exist, raise an exception. - raise Exception("Synapse.axon object is None") - - # We raise an exception to stop the process and return the error to the requester. - # The error message includes the original exception message. - raise NotVerifiedException(f"Not Verified with error: {str(e)}") - - async def blacklist(self, synapse: bittensor.Synapse): - """ - Checks if the request should be blacklisted. This method ensures that requests from disallowed - sources or with malicious intent are blocked from processing. This can be extremely useful for - preventing spam or other forms of abuse. The blacklist is a list of keys or identifiers that - are prohibited from accessing certain resources. - - Args: - synapse (bittensor.Synapse): The Synapse object representing the request. - - Raises: - Exception: If the request is found in the blacklist. - - The blacklist check involves: - - 1. Retrieving the blacklist checking function for the request's Synapse type. - 2. Executing the check and handling the case where the request is blacklisted. - - If a request is blacklisted, it is blocked, and an exception is raised to halt further processing. - """ - # A blacklist is a list of keys or identifiers - # that are prohibited from accessing certain resources. - # We retrieve the blacklist checking function from the 'blacklist_fns' dictionary - # that corresponds to the request's name (synapse name). - blacklist_fn = ( - self.axon.blacklist_fns.get(synapse.name) - if synapse.name is not None - else None - ) - - # If a blacklist checking function exists for the request's name - if blacklist_fn: - # We execute the blacklist checking function using the synapse instance as input. - # If the function returns True, it means that the key or identifier is blacklisted. - blacklisted, reason = ( - await blacklist_fn(synapse) - if inspect.iscoroutinefunction(blacklist_fn) - else blacklist_fn(synapse) - ) - if blacklisted: - # We log that the key or identifier is blacklisted. - bittensor.logging.trace(f"Blacklisted: {blacklisted}, {reason}") - - # Check if the synapse.axon object exists - if synapse.axon is not None: - # We set the status code of the synapse to "403" which indicates a forbidden access. - synapse.axon.status_code = 403 - else: - # If the synapse.axon object doesn't exist, raise an exception. - raise Exception("Synapse.axon object is None") - - # We raise an exception to halt the process and return the error message to the requester. - raise BlacklistedException(f"Forbidden. Key is blacklisted: {reason}.") - - async def priority(self, synapse: bittensor.Synapse): - """ - Executes the priority function for the request. This method assesses and assigns a priority - level to the request, determining its urgency and importance in the processing queue. - - Args: - synapse (bittensor.Synapse): The Synapse object representing the request. - - Raises: - Exception: If the priority assessment process encounters issues, such as timeouts. - - The priority function plays a crucial role in managing the processing load and ensuring that - critical requests are handled promptly. - """ - # Retrieve the priority function from the 'priority_fns' dictionary that corresponds - # to the request's name (synapse name). - priority_fn = self.axon.priority_fns.get(str(synapse.name), None) - - async def submit_task( - executor: PriorityThreadPoolExecutor, priority: float - ) -> Tuple[float, Any]: - """ - Submits the given priority function to the specified executor for asynchronous execution. - The function will run in the provided executor and return the priority value along with the result. - - Args: - executor: The executor in which the priority function will be run. - priority: The priority function to be executed. - - Returns: - tuple: A tuple containing the priority value and the result of the priority function execution. - """ - loop = asyncio.get_event_loop() - future = loop.run_in_executor(executor, lambda: priority) - result = await future - return priority, result - - # If a priority function exists for the request's name - if priority_fn: - try: - # Execute the priority function and get the priority value. - priority = ( - await priority_fn(synapse) - if inspect.iscoroutinefunction(priority_fn) - else priority_fn(synapse) - ) - - # Submit the task to the thread pool for execution with the given priority. - # The submit_task function will handle the execution and return the result. - _, result = await submit_task(self.axon.thread_pool, priority) - - except TimeoutError as e: - # If the execution of the priority function exceeds the timeout, - # it raises an exception to handle the timeout error. - bittensor.logging.trace(f"TimeoutError: {str(e)}") - - # Set the status code of the synapse to 408 which indicates a timeout error. - if synapse.axon is not None: - synapse.axon.status_code = 408 - - # Raise an exception to stop the process and return an appropriate error message to the requester. - raise PriorityException(f"Response timeout after: {synapse.timeout}s") - - async def run( - self, - synapse: bittensor.Synapse, - call_next: RequestResponseEndpoint, - request: Request, - ) -> Response: - """ - Executes the requested function as part of the request processing pipeline. This method calls - the next function in the middleware chain to process the request and generate a response. - - Args: - synapse (bittensor.Synapse): The Synapse object representing the request. - call_next (RequestResponseEndpoint): The next function in the middleware chain to process requests. - request (Request): The original HTTP request. - - Returns: - Response: The HTTP response generated by processing the request. - - This method is a critical part of the request lifecycle, where the actual processing of the - request takes place, leading to the generation of a response. - """ - try: - # The requested function is executed by calling the 'call_next' function, - # passing the original request as an argument. This function processes the request - # and returns the response. - response = await call_next(request) - - except Exception as e: - # If an exception occurs during the execution of the requested function, - # it is caught and handled here. - - # Log the exception for debugging purposes. - bittensor.logging.trace(f"Run exception: {str(e)}") - - # Set the status code of the synapse to "500" which indicates an internal server error. - if synapse.axon is not None: - synapse.axon.status_code = 500 - - # Raise an exception to stop the process and return an appropriate error message to the requester. - raise RunException(f"Internal server error with error: {str(e)}") - - # Return the starlet response - return response - - async def postprocess( - self, synapse: bittensor.Synapse, response: Response, start_time: float - ) -> Response: - """ - Performs the final processing on the response before sending it back to the client. This method - updates the response headers and logs the end of the request processing. - - Args: - synapse (bittensor.Synapse): The Synapse object representing the request. - response (Response): The response generated by processing the request. - start_time (float): The timestamp when the request processing started. - - Returns: - Response: The final HTTP response, with updated headers, ready to be sent back to the client. - - Postprocessing is the last step in the request handling process, ensuring that the response is - properly formatted and contains all necessary information. - """ - # Set the status code of the synapse to "200" which indicates a successful response. - if synapse.axon is not None: - synapse.axon.status_code = 200 - - # Set the status message of the synapse to "Success". - synapse.axon.status_message = "Success" - - try: - # Update the response headers with the headers from the synapse. - updated_headers = synapse.to_headers() - response.headers.update(updated_headers) - except Exception as e: - # If there is an exception during the response header update, we log the exception. - raise PostProcessException( - f"Error while parsing or updating response headers. Postprocess exception: {str(e)}." - ) - - # Calculate the processing time by subtracting the start time from the current time. - synapse.axon.process_time = str(time.time() - start_time) # type: ignore - - return response diff --git a/validators/services/capacity.py b/validators/services/capacity.py index 4d92c8bd..38038c71 100644 --- a/validators/services/capacity.py +++ b/validators/services/capacity.py @@ -17,7 +17,7 @@ def __init__(self, metagraph, dendrite): async def query_capacity_to_miners(self, available_uids): capacity_query_tasks = [] - bt.logging.info(f"querying capacity to uid = {available_uids.keys()}") + bt.logging.info(f"querying capacity to uid = {available_uids}") # Query all images concurrently for uid in available_uids: syn = Bandwidth() @@ -32,7 +32,9 @@ async def query_capacity_to_miners(self, available_uids): if isinstance(resp, Exception): bt.logging.error(f"exception happens while querying capacity to miner {uid}, {resp}") else: - uid_to_capacity[uid] = self.validate_capacity(resp.bandwidth_rpm) + cap = self.validate_capacity(resp.bandwidth_rpm) + if cap: + uid_to_capacity[int(uid)] = cap self.uid_to_capacity = deepcopy(uid_to_capacity) return uid_to_capacity diff --git a/validators/weight_setter.py b/validators/weight_setter.py index f6b54e72..672eb114 100644 --- a/validators/weight_setter.py +++ b/validators/weight_setter.py @@ -145,14 +145,12 @@ async def refresh_metagraph(self): async def initialize_uids_and_capacities(self): bt.logging.info("start initializing uids and capacities") - self.available_uid_to_axons = await self.get_available_uids() - self.uids_to_query = list(self.available_uid_to_axons.keys()) - bt.logging.info(f"Available UIDs: {list(self.available_uid_to_axons.keys())}") - self.uid_to_capacity = await self.get_capacities_for_uids(self.available_uid_to_axons) + bt.logging.info(f"Available UIDs: {self.metagraph.uids}") + self.uid_to_capacity = await self.get_capacities_for_uids(self.metagraph.uids) bt.logging.info(f"Capacities for miners: {self.uid_to_capacity}") # Initialize total_scores, score_counts. - self.total_scores = {uid: 0.0 for uid in self.available_uid_to_axons.keys()} - self.score_counts = {uid: 0 for uid in self.available_uid_to_axons.keys()} + self.total_scores = {uid: 0.0 for uid in self.uid_to_capacity.keys()} + self.score_counts = {uid: 0 for uid in self.uid_to_capacity.keys()} # update task_mgr after synthetic query at the end of iterator. if self.task_mgr: