forked from jo-nike/clipplex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
clipplexAPI.py
286 lines (239 loc) · 10.7 KB
/
clipplexAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
from cmath import e
from datetime import datetime, timedelta
import os
import subprocess
import xml.etree.ElementTree as ET
import ffmpeg
import requests
MEDIA_STATIC_PATH = "app/static/media"
class ServerInfo:
def __init__(self, platform, username):
self.platform = os.environ.get("SERVER_PLATFORM")
self.plex_url = os.environ.get("SERVER_URL")
self.plex_token = os.environ.get("SERVER_TOKEN")
self.params = (("X-Plex-Token", {self.plex_token}),)
self.sessions_xml = self._get_current_sessions_xml()
self.username = username
self.session_id = self._get_session_id(username)
self.media_key = self._get_media_key()
self.media_path_xml = self._get_media_path_xml()
self.media_path = self._get_file_path()
self.media_fps = self._get_media_fps()
self.media_type = self._get_file_type()
self.media_title = self._get_file_title()
self.current_media_time_int = self._get_current_media_time()
self.current_media_time_str = Utils(offset=self.current_media_time_int).offset_to_time
def _get_media_fps(self) -> float:
"""Get the frame rate of the video currently played by the user.
Returns:
float: Frame Rate of the video
"""
framerate = None
if self.platform == "plex":
media_dict = list(list(list(list(list(self.media_path_xml)[0]))[0])[0])[0].attrib
framerate = float(media_dict["frameRate"])
else:
framerate = -1.
return framerate
def _get_current_media_time(self) -> int:
"""Get the offset between the start of the video and the current view position of the user.
Returns:
int: Offset between start of the video and current view time
"""
media_time = None
if self.platform == "plex":
media_dict = list(list(self.sessions_xml))[self.session_id].attrib
media_time = int(media_dict["viewOffset"])
else:
media_time = -1
return media_time
def _get_current_sessions_xml(self) -> ET:
"""Get the XML from plex for the current user session.
Returns:
ET: XML tree of the current user session
"""
xml_content = None
if self.platform == "plex":
response = requests.get(f"{self.plex_url}/status/sessions", params=self.params)
xml_content = ET.fromstring(response.content)
return xml_content
def _get_file_path(self) -> str:
"""Get the file path of the video currently played by the user.
Returns:
str: Path of the video being played
"""
file_path = None
if self.platform == "plex":
media_dict = list(list(list(list(self.media_path_xml)[0]))[0])[0].attrib # REPLACE THAT BY A FIND PART TAG
file_path = media_dict["file"]
return file_path
def _get_file_title(self) -> str:
"""Get the title of the video currently played by the user.
Returns:
str: If TV show, returns show + episode name, if movie, returns movie name.
"""
file_title = None
if self.platform == "plex":
if self.media_type == "episode":
video_dict = list(list(self.media_path_xml))[0].attrib
title = video_dict["title"]
show_name = video_dict["grandparentTitle"]
file_title = f"{show_name} - {title}"
else:
video_dict = list(list(self.media_path_xml))[0].attrib
file_title = video_dict["title"]
return file_title
def _get_file_type(self) -> str:
"""Get the type of file of the video currently played by the user.
Returns:
str: File type of the video being played
"""
file_type = None
if self.platform == "plex":
video_dict = list(list(self.media_path_xml))[0].attrib
file_type = video_dict["type"]
return file_type
def _get_media_path_xml(self) -> ET:
"""Get the XML from plex for the current user session.
Returns:
ET: XML tree of the current video being played
"""
xml_content = None
if self.platform == "plex":
response = requests.get(f"{self.plex_url}{self.media_key}", params=self.params)
xml_content = ET.fromstring(response.content)
return xml_content
def _get_media_key(self) -> str:
"""Get the plex media key of the video being played.
Returns:
str: Plex media key
"""
media_key = None
if self.platform == "plex":
media_info = list(list(self.sessions_xml))[self.session_id].attrib
media_key = media_info["key"]
return media_key
def _get_session_id(self, username: str) -> int:
"""Get the plex session id of the current session for the current user.
Args:
username (str): Username of the queried user.
Returns:
int: Return the index of the session
"""
session_id = None
if self.platform == "plex":
for sessions in list(self.sessions_xml):
for session in sessions:
if session.tag == "User" and session.attrib["title"] == username:
session_id = list(self.sessions_xml).index(sessions)
break
raise Exception(f"No stream running for user {username}")
return session_id
class Snapshot:
def __init__(self, media_path: str, time: str, fps: float):
self.media_path = media_path
self.time = time
self.fps = int(fps)
def _download_frames(self):
cmd = f"ffmpeg -ss {self.time} -i {self.media_path} -vframes {self.fps} -qscale:v 2 {MEDIA_STATIC_PATH}/images/{self.time.replace(':','_')}_%03d.jpg"
a = subprocess.call(cmd, shell=True, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
class Video:
def __init__(self, server_data: ServerInfo, time: str, duration, file_name: str):
self.media_path = server_data.media_path
server_attributes = list(list(server_data.media_path_xml))[0].attrib
self.metadata_title = server_attributes["title"]
self.metadata_current_media_time = server_data.current_media_time_str
print(server_data.username)
self.metadata_username = server_data.username
if server_data.media_type == "episode":
self.metadata_season = server_attributes["parentIndex"]
self.metadata_episode_number = server_attributes["index"]
self.metadata_showname = server_attributes["grandparentTitle"]
else:
self.metadata_season = ""
self.metadata_episode_number = ""
self.metadata_showname = ""
self.time = time
self.duration = duration
self.file_name = file_name
def extract_video(self):
(
ffmpeg
.input(self.media_path, ss=self.time, t=self.duration)
.output(f"{MEDIA_STATIC_PATH}/videos/{self.file_name}.mp4",
map_metadata=-1,
vcodec="libx264",
acodec="libvorbis",
pix_fmt="yuv420p",
crf=18,
**{"metadata:g:0":f"title={self.metadata_title}",
"metadata:g:1":f"season_number={self.metadata_season}",
"metadata:g:2":f"show={self.metadata_showname}",
"metadata:g:3":f"episode_id={self.metadata_episode_number}",
"metadata:g:4":f"comment={self.metadata_current_media_time}",
"metadata:g:5":f"artist={self.metadata_username}"})
.run(capture_stdout=True)
)
class Utils:
def __init__(self, offset: int=0):
self.offset_to_time = self.milli_to_string(offset)
def milli_to_string(self, millisec: int) -> str:
time = str(timedelta(milliseconds=millisec))
if len(time.split(":")[0]) < 2:
time = f"0{time}"
return time.split(".")[0]
def add_time(self, current_time: str, time_to_add: int) -> str:
time_obj = datetime.strptime(current_time, "%H:%M:%S")
time_obj_with_time_added = time_obj + timedelta(seconds=time_to_add)
return time_obj_with_time_added.strftime("%H:%M:%S")
def _pad_time(self, time) -> str:
if len(str(time)) < 2:
time = f"0{time}"
return time
def calculate_clip_time(self, start, end) -> int:
start = start.split(":")
start_total_sec = (int(start[0])*3600) + (int(start[1])*60) + (int(start[2]))
end = end.split(":")
end_total_sec = (int(end[0])*3600) + (int(end[1])*60) + (int(end[2]))
total_second = end_total_sec - start_total_sec
return total_second
def get_images_in_folder() -> list:
folder = os.path.join(MEDIA_STATIC_PATH, "images")
folder_list = []
for a in os.listdir(folder):
a = f"{folder}/{a}"
folder_list.append(f"{a.split('/')[-4]}/{a.split('/')[-3]}/{a.split('/')[-2]}/{a.split('/')[-1]}")
return sorted(folder_list)
def get_videos_in_folder() -> list:
folder = os.path.join(MEDIA_STATIC_PATH, "videos")
folder_list = []
for file in os.listdir(folder):
file_dict = {}
file = os.path.join(folder, file)
metadata = ffmpeg.probe(file)["format"]["tags"]
file_dict["file_path"] = "/".join(file.split("/")[1:])
file_dict["title"] = metadata.get("title") or ""
file_dict["original_start_time"] = metadata.get("comment") or ""
file_dict["username"] = metadata.get("artist") or ""
file_dict["show"] = metadata.get("show") or ""
file_dict["episode_number"] = metadata.get("episode_id") or ""
file_dict["season_number"] = metadata.get("season_number") or ""
folder_list.append(file_dict)
return folder_list
def delete_file(self, file_path) -> bool:
try:
os.remove(f"./app/{file_path}")
return True
except:
return False
def streamable_upload(self, file_path) -> str:
file_processed = {
"file": (file_path.split("/")[-1], open(f"app/{file_path}", "rb")),
}
email = os.environ.get("STREAMABLE_LOGIN") or ""
password = os.environ.get("STREAMABLE_PASSWORD") or ""
try:
response = requests.post("https://api.streamable.com/upload", auth=(email, password), files=file_processed).json()
return response
except Exception as e:
raise Exception(f"Problem uploading to streamable: {e}")