-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.py
89 lines (69 loc) · 2.83 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: leeyoshinari
import os
import socket
import traceback
from urllib.parse import unquote
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from tortoise.contrib.fastapi import register_tortoise
import settings
from karaoke.responses import StreamResponse
from karaoke.results import Result
import karaoke.urls as my_urls
prefix = '' # url prefix, url的前缀
app = FastAPI()
register_tortoise(app=app, config=settings.TORTOISE_ORM)
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
def get_local_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("114.114.114.114", 80))
host = s.getsockname()[0]
except:
host = "0.0.0.0"
finally:
s.close()
return host
async def read_file(file_path, start_index=0):
with open(file_path, 'rb') as f:
f.seek(start_index)
while True:
chunk = f.read(65536)
if not chunk:
break
yield chunk
@app.get(prefix + "/")
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "prefix": prefix})
@app.get(prefix + "/dealVideo")
async def index(request: Request):
return templates.TemplateResponse("tool.html", {"request": request, "prefix": prefix})
@app.get(prefix + "/sing")
async def play(request: Request):
return templates.TemplateResponse("playing.html", {"request": request, "prefix": prefix})
@app.get(prefix + "/song")
async def index(request: Request):
return templates.TemplateResponse("client.html", {"request": request, "prefix": prefix})
@app.get(prefix + "/download/{file_name}", summary="Download file (获取文件)")
async def download_file(file_name: str):
try:
file_name = unquote(file_name)
file_path = os.path.join(settings.FILE_PATH, file_name)
file_format = file_name.split('.')[-1]
headers = {'Accept-Ranges': 'bytes', 'Content-Length': str(os.path.getsize(file_path)),
'Content-Disposition': f'inline;filename="{file_name}"'}
return StreamResponse(read_file(file_path), media_type=settings.CONTENT_TYPE.get(file_format, 'application/octet-stream'), headers=headers)
except:
print(traceback.format_exc())
return Result(code=1, msg="System Error")
app.include_router(my_urls.router, prefix=prefix)
if __name__ == "__main__":
import uvicorn
local_ip = settings.get_config('host')
if not local_ip:
local_ip = get_local_ip()
uvicorn.run(app="main:app", host=local_ip, port=int(settings.get_config('port')), reload=False)