-
Notifications
You must be signed in to change notification settings - Fork 75
/
reddit.py
173 lines (141 loc) · 5.5 KB
/
reddit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
'''
Uses HPI [[https://github.com/karlicoss/HPI/blob/master/doc/MODULES.org#myreddit][reddit]] module
'''
from __future__ import annotations
import typing
from itertools import chain
from promnesia.common import Loc, Results, Visit, extract_urls, logger
if typing.TYPE_CHECKING:
from my.reddit.common import Comment, RedditBase, Save, Submission, Upvote
def index(*, render_markdown: bool = False, renderer: type[RedditRenderer] | None = None) -> Results:
from . import hpi # noqa: F401
try:
from my.reddit.all import comments, saved, submissions, upvoted
except ModuleNotFoundError as e:
if "No module named 'my.reddit.all'" in str(e):
import warnings
warnings.warn("DEPRECATED/reddit: Using an old version of HPI, please update")
from my.reddit import comments, saved, submissions, upvoted
else:
raise e
if renderer is not None:
assert callable(renderer), f"{renderer} is not a callable (should be a subclass of RedditRenderer)"
r = renderer(render_markdown=render_markdown)
else:
r = RedditRenderer(render_markdown=render_markdown)
logger.info('processing saves')
for s in saved():
try:
yield from r._from_save(s)
except Exception as e:
yield e
logger.info('processing comments')
for c in comments():
try:
yield from r._from_comment(c)
except Exception as e:
yield e
logger.info('processing submissions')
for sub in submissions():
try:
yield from r._from_submission(sub)
except Exception as e:
yield e
logger.info('processing upvotes')
for u in upvoted():
try:
yield from r._from_upvote(u)
except Exception as e:
yield e
# mostly here so we can keep track of how the user
# wants to render markdown
class RedditRenderer:
def __init__(self, *, render_markdown: bool = False) -> None:
self._link_extractor = None
self._parser_cls = None
try:
from .markdown import TextParser, extract_from_text
self._link_extractor = extract_from_text
self._parser_cls = TextParser
except ImportError as import_err:
# TODO: add dummy _link_extractor and _parser_cls classes incase
# these are called by a subclass?
# only send error if the user is trying to enable this feature
if render_markdown:
logger.exception(import_err)
logger.critical("Could not import markdown module to render reddit markdown. Try 'python3 -m pip install mistletoe'")
render_markdown = False # force to be false, couldn't import
self.render_markdown = render_markdown
def _from_comment(self, i: Comment) -> Results:
locator = Loc.make(
title='Reddit comment',
href=i.url,
)
yield from self._from_common(i, locator=locator)
def _from_submission(self, i: Submission) -> Results:
locator = Loc.make(
title=f'Reddit submission: {i.title}',
href=i.url,
)
yield from self._from_common(i, locator=locator)
def _from_upvote(self, i: Upvote) -> Results:
locator = Loc.make(
title='Reddit upvote',
href=i.url,
)
yield from self._from_common(i, locator=locator)
def _from_save(self, i: Save) -> Results:
locator = Loc.make(
title='Reddit save',
href=i.url,
)
yield from self._from_common(i, locator=locator)
# to allow for possible subclassing by the user?
def _render_body(self, text: str) -> str:
if self.render_markdown and self._parser_cls is not None:
return self._parser_cls(text)._doc_ashtml()
else:
return text
def _from_common(self, i: RedditBase, locator: Loc) -> Results:
urls = [i.url]
# TODO this should belong to HPI.. fix permalink handling I guess
# ok, it's not present for all of them..
lurl = i.raw.get('link_url')
if lurl is not None:
urls.append(lurl)
lurl = i.raw.get('url')
if lurl is not None:
urls.append(lurl)
context = self._render_body(i.text)
emitted: set[str] = set()
for url in chain(urls, extract_urls(i.text)):
if url in emitted:
continue
yield Visit(
url=url,
dt=i.created,
context=context,
locator=locator,
)
emitted.add(url)
# extract from markdown links like [link text](https://...)
# incase URLExtract missed any
#
# this should try to do this, even if the user didn't enable
# the render_markdown flag, as it may catch extra links that URLExtract didnt
# would still require mistletoe to be installed, but
# the user may already have it installed for the auto/markdown modules
if self._link_extractor is not None:
for res in self._link_extractor(i.text):
if isinstance(res, Exception):
yield res
continue
if res.url in emitted:
continue
yield Visit(
url=res.url,
dt=i.created,
context=context,
locator=locator,
)
emitted.add(res.url)