-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpodwhisper.py
143 lines (115 loc) · 4.46 KB
/
podwhisper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import sys
import os
import time
import requests
import statistics
import whisper
import json
import pathlib
import logging
from hash import hash
from dotenv import load_dotenv
from pyPodcastParser.Podcast import Podcast
from pathvalidate import sanitize_filename
def timestampstr_from_seconds(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
sec = int((seconds % 3600) % 60)
return f"{str(hours).rjust(2, '0')}:{str(minutes).rjust(2, '0')}:{str(round(sec, 1)).rjust(2, '0')}"
def main():
logger = logging.getLogger(__name__)
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO"))
if len(sys.argv) < 2:
logger.error("Missing URL. Aborting.")
exit()
feed_url = sys.argv[1]
logger.info("Processing feed: " + feed_url)
load_dotenv()
model_name = os.getenv("WHISPER_MODEL")
logger.info("Whisper model used: " + model_name)
model = whisper.load_model(model_name)
try:
podcast = Podcast(requests.get(feed_url).content)
except requests.exceptions.ConnectionError as e:
logger.error(e)
except requests.exceptions.HTTPError as e:
logger.error(e)
if not os.path.exists(os.path.join("tmp", sanitize_filename(podcast.title))):
os.makedirs(os.path.join("tmp", sanitize_filename(podcast.title)))
if not os.path.exists(os.path.join("out", sanitize_filename(podcast.title))):
os.makedirs(os.path.join("out", sanitize_filename(podcast.title)))
timings = []
for i, x in enumerate(podcast.items, start=1):
start_time = time.perf_counter()
logger.info(f"Processing item {i}/{len(podcast.items)}: {x.title}")
if len(timings) > 0:
logger.info(
f"ETA: {timestampstr_from_seconds((len(podcast.items)-i+1)*statistics.mean(timings))}"
)
audio_url = x.enclosure_url
file_extension = pathlib.Path(audio_url).suffix
# Paths for intermediary and output files.
audio_path = os.path.join(
"tmp",
sanitize_filename(podcast.title),
hash(x.guid + x.title) + file_extension,
)
text_path = os.path.join(
"out", sanitize_filename(podcast.title), sanitize_filename(x.title + ".txt")
)
segment_path = os.path.join(
"out",
sanitize_filename(podcast.title),
sanitize_filename(x.title + ".json"),
)
timestamped_path = os.path.join(
"out", sanitize_filename(podcast.title), sanitize_filename(x.title + ".md")
)
already_transcribed = (
os.path.exists(text_path)
and os.path.exists(segment_path)
and os.path.exists(timestamped_path)
)
already_downloaded = os.path.exists(audio_path)
# Download if necessary.
if not already_transcribed and not already_downloaded:
logger.info("Downloading...")
while True:
r = requests.get(audio_url)
if r.status_code == 200:
break
logger.warning("Download failed. Retrying in 1 second...")
time.sleep(1)
with open(audio_path, "wb") as f:
f.write(r.content)
else:
logger.info("Audio exists. Skipping download...")
# Transcribe.
if not already_transcribed:
logger.info("Transcribing...")
result = model.transcribe(audio_path)
# Output raw text.
with open(text_path, "w") as f:
f.write(result["text"])
# Output segments.
with open(segment_path, "w") as f:
json.dump(result["segments"], f, indent=4)
# Timestamp formatted.
with open(timestamped_path, "w") as f:
lines = [f"# {podcast.title} - {x.title}"]
for s in result["segments"]:
lines.append(
f"**{timestampstr_from_seconds(s['start'])}** {s['text'].strip()}"
)
lines.append("")
f.write("\n".join(lines))
logger.info("Finishing item.")
else:
logger.info("Item already transcribed. Skipping...")
logging.info(
f"Episode processed in {timestampstr_from_seconds(time.perf_counter()-start_time)}."
)
timings.append(time.perf_counter() - start_time)
logger.info("Done!")
if __name__ == "__main__":
main()