Skip to content

Commit

Permalink
Merge pull request #109 from Datura-ai/upgrade-version
Browse files Browse the repository at this point in the history
Upgrade version
  • Loading branch information
surcyf123 authored Dec 18, 2024
2 parents c0315b3 + 66ed153 commit cfa1026
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 392 deletions.
2 changes: 1 addition & 1 deletion cortext/axon.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import bittensor
import bittensor as bt
from substrateinterface import Keypair
from bittensor.errors import SynapseDendriteNoneException
from bittensor.core.errors import SynapseDendriteNoneException


class CortexAxon(bt.axon):
Expand Down
2 changes: 1 addition & 1 deletion cortext/dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async def call_stream(
async with session.post(
url,
headers=synapse.to_headers(),
json=synapse.dict(),
json=synapse.model_dump(),
) as response:
# Use synapse subclass' process_streaming_response method to yield the response chunks
try:
Expand Down
12 changes: 6 additions & 6 deletions cortext/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class StreamPrompting(bt.StreamingSynapse):
"This attribute is immutable and cannot be updated.",
)

completion: str = pydantic.Field(
completion: Optional[str] = pydantic.Field(
None,
title="Completion",
description="Completion status of the current StreamPrompting object. "
Expand Down Expand Up @@ -347,15 +347,15 @@ def to_headers(self) -> dict:
headers.update(
{
f"bt_header_axon_{k}": str(v)
for k, v in self.axon.dict().items()
for k, v in self.axon.model_dump().items()
if v is not None
}
)
if self.dendrite:
headers.update(
{
f"bt_header_dendrite_{k}": str(v)
for k, v in self.dendrite.dict().items()
for k, v in self.dendrite.model_dump().items()
if v is not None
}
)
Expand All @@ -379,9 +379,9 @@ async def process_streaming_response(self, response: StreamingResponse, organic=
self.completion += tokens
yield tokens
except asyncio.TimeoutError as err:
self.completion += remain_chunk
yield remain_chunk

# self.completion += remain_chunk
# yield remain_chunk
pass

def extract_response_json(self, response: StreamingResponse) -> dict:
headers = {
Expand Down
2 changes: 1 addition & 1 deletion cursor/app/core/query_to_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from cursor.app.models import ChatRequest
from cursor.app.core.protocol import StreamPrompting
from cursor.app.core.config import config
from cursor.app.core.dendrite import CortexDendrite
from cortext.dendrite import CortexDendrite
import traceback

subtensor = bt.subtensor(network="finney")
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
bittensor==6.9.4
bittensor==8.4.3
datasets==2.*
envparse==0.2.0
openai>=1.3.2, ==1.*
Expand Down
21 changes: 20 additions & 1 deletion server/app/database.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import time
import psycopg2
import os
from contextlib import asynccontextmanager

DATABASE_URL = os.getenv("DATABASE_URL")
TABEL_NAME = 'query_resp_data'
TIME_EXPIRATION = 3600 * 24 * 10 # delete records after 10 days since it's creation.
LAST_EXECUTION = time.time()
# PostgreSQL connection parameters
conn = psycopg2.connect(DATABASE_URL)

Expand Down Expand Up @@ -49,6 +51,7 @@ async def create_table(app):
CREATE INDEX IF NOT EXISTS miner_id_index ON {TABEL_NAME} (miner_uid);
CREATE INDEX IF NOT EXISTS miner_hot_key_index ON {TABEL_NAME} (miner_hot_key);
CREATE INDEX IF NOT EXISTS idx_score_sim_timestamp ON {TABEL_NAME} (score, similarity, timestamp);
CREATE INDEX IF NOT EXISTS idx_block__cycle_epoch_num_ ON {TABEL_NAME} (block_num, cycle_num, epoch_num);
"""
cur.execute(create_index_query)
conn.commit()
Expand All @@ -57,4 +60,20 @@ async def create_table(app):
except Exception as e:
print(f"Error creating table: {e}")


def delete_records():
global TIME_EXPIRATION, LAST_EXECUTION
if (time.time() - LAST_EXECUTION) < TIME_EXPIRATION:
return
LAST_EXECUTION = time.time()
timestamp = time.time() - TIME_EXPIRATION
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
query_str = f"""
delete from query_resp_data where timestamp <= {timestamp}
"""
cur.execute(query_str)
conn.commit()


create_table(None)
9 changes: 8 additions & 1 deletion server/app/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware

from . import curd, models, schemas
from .database import create_table, conn, cur
from .database import create_table, conn, cur, delete_records
from typing import List

scheduler = BackgroundScheduler()
scheduler.add_job(delete_records, CronTrigger(day_of_week="mon", hour=10, minute=0))
scheduler.start()


@asynccontextmanager
async def lifespan(app: FastAPI):
Expand Down
3 changes: 2 additions & 1 deletion server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ fastapi
uvicorn[standard]
psycopg2-binary
sqlalchemy
pydantic
pydantic
apscheduler
Loading

0 comments on commit cfa1026

Please sign in to comment.