This repository has been archived by the owner on May 6, 2021. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 20
/
bot.py
382 lines (296 loc) · 15.7 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
#!/usr/bin/env python3.6
# This is DuckHunt V3
# You have to use it with the rewrite version of discord.py
# You can install it using
# pip install -U git+https://github.com/Rapptz/discord.py@rewrite#egg=discord.py[voice]
# You also have to use python 3.6 to run this
# Have fun !
# The doc for d.py rewrite is here : http://discordpy.readthedocs.io/en/rewrite/index.html
# Installation tutorial : https://kb.api-d.com/en/code/duckhunt-how-to/
# Support server : https://discord.gg/G4skWae
# Importing and creating a logger
import logging
import traceback
import time
from typing import Union
import cogs.helpers.aux_inits as inits
from cogs.helpers import checks
from cogs.helpers.context import CustomContext
base_logger = inits.init_logger()
extra = {"channelid": 0, "userid": 0}
logger = logging.LoggerAdapter(base_logger, extra)
logger.info("Starting the bot")
# Setting up asyncio to use uvloop if possible, a faster implementation on the event loop
import asyncio
try:
import uvloop
except ImportError:
logger.warning("Using the not-so-fast default asyncio event loop. Consider installing uvloop.")
pass
else:
logger.info("Using the fast uvloop asyncio event loop")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Importing the discord API warpper
import discord
import discord.ext.commands as commands
# Prepare the bot object
async def get_prefix(bot, message):
extras = [await bot.db.get_pref(message.channel, "prefix"), "duckhunt", "dh!", "dh", "Dh", "Dh!", "dH!", "dH", "DH!", "DH"]
return commands.when_mentioned_or(*extras)(bot, message)
logger.debug("Creating a bot instance of commands.AutoShardedBot")
from cogs.helpers import context
class DuckHunt(commands.AutoShardedBot):
def __init__(self, command_prefix, **options):
super().__init__(command_prefix, **options)
self.shards_ready = set()
async def on_shard_ready(self, shard_id):
self.shards_ready.add(shard_id)
async def on_disconnect(self):
self.shards_ready = set()
async def on_message(self, message):
if message.author.bot:
return # ignore messages from other bots
if message.author.id in self.blacklisted_users:
return
if message.guild is None: # Private message
return
ctx = await self.get_context(message, cls=context.CustomContext)
if ctx.prefix is not None:
# ctx.command = self.all_commands.get(ctx.invoked_with.lower()) # This dosen't works for subcommands
await self.invoke(ctx)
async def on_command(self, ctx):
bot.commands_used[ctx.command.name] += 1
ctx.logger.info(f"<{ctx.command}> {ctx.message.clean_content}")
if await self.db.get_pref(ctx.channel, "delete_commands"):
await ctx.message.delete()
async def on_ready(self):
logger.info("We are all set, on_ready was fired! Yeah!")
logger.info(f"I see {len(self.guilds)} guilds")
await self.log(level=30, title="Bot is ready", message=f"The bot has restarted. We can now see {len(self.guilds)} guilds on {self.shard_count} shards, and ducks can now spawn", where=None)
async def log_guild_stats(self, e, guild):
e.add_field(name='Name', value=guild.name)
e.add_field(name='ID', value=guild.id)
e.add_field(name='Owner', value=f'{guild.owner} (ID: {guild.owner.id})')
bots = sum(m.bot for m in guild.members)
total = guild.member_count
online = sum(m.status is discord.Status.online for m in guild.members)
e.add_field(name='Members', value=str(total))
e.add_field(name='Bots', value=f'{bots} ({bots/total:.2%})')
e.add_field(name='Online', value=f'{online} ({online/total:.2%})')
if guild.icon:
e.set_thumbnail(url=guild.icon_url)
if guild.me:
e.timestamp = guild.me.joined_at
await self.send_message(where=self.get_channel(self.log_channel_id), embed=e, mention=False, can_pm=False)
async def on_guild_join(self, guild):
e = discord.Embed(colour=0x53dda4, title='New Guild') # green colour
await self.log_guild_stats(e, guild)
# await self.log(level=6, title="Joined Guild", message=f"Yay! A new server! Current guild total : {len(self.guilds)}", where=guild)
async def on_guild_remove(self, guild):
e = discord.Embed(colour=0xdd5f53, title='Left Guild') # red colour
await self.log_guild_stats(e, guild)
# await self.log(level=6, title="Left Guild", message=f"Goodbye! Current guild total : {len(self.guilds)}", where=guild)
logger.info(f"Guild removed (from discord) : {guild} :(!")
for channel in guild.channels:
self.ducks_planning.pop(channel, None)
logger.info(f"Before removing ducks, we had {len(bot.ducks_spawned)} ducks")
self.ducks_spawned = [duck for duck in self.ducks_spawned if duck.channel not in guild.channels]
logger.info(f"After removing ducks, we have {len(bot.ducks_spawned)} ducks")
async def on_guild_channel_delete(self, channel):
logger.info(f"Channel removed (from discord) : {channel} :(!")
self.ducks_planning.pop(channel, None)
self.ducks_spawned = [duck for duck in self.ducks_spawned if duck.channel != channel]
async def on_command_error(self, context, exception):
_ = self._;
language = await self.db.get_pref(context.channel, "language")
if isinstance(exception, discord.ext.commands.errors.CommandNotFound):
return
elif isinstance(exception, discord.ext.commands.errors.MissingRequiredArgument):
await self.send_message(ctx=context, message=_(":x: A required argument is missing.", language)) # Here is the command documentation : \n```\n", language) + context.command.__doc__ +
# "\n```")
return
elif isinstance(exception, checks.NotEnoughExp):
await self.send_message(ctx=context, message=_(":x: You don't have enough exp for this", language))
return
elif isinstance(exception, checks.NotServerAdmin):
await self.send_message(ctx=context, message=_(":x: You are not a server admin", language))
return
elif isinstance(exception, checks.NotSuperAdmin):
await self.send_message(ctx=context, message=_(":x: You are not a super admin.", language))
await self.hint(ctx=context, message=_("This command is reserved for the bot owners. "
"If you think this is an error, please contact my owner at the DuckHunt Support Server (see `dh!help`).", language))
return
elif isinstance(exception, checks.NoVotesOnDBL):
await self.send_message(ctx=context, message=_(":x: You haven't voted for DuckHunt on DiscordBotList for a while.", language))
await self.hint(ctx=context, message=_("Support the bot to use this command by voting at <https://discordbots.org/bot/duckhunt>. "
"Be aware that the votes can take a minute to be registered by Duckhunt"
"If you think this is an error, please contact my owner at the DuckHunt Support Server (see `dh!help`).", language))
return
elif isinstance(exception, discord.ext.commands.errors.CheckFailure):
return
elif isinstance(exception, discord.ext.commands.errors.CommandOnCooldown):
if context.message.author.id in self.admins:
await context.reinvoke()
return
else:
await self.send_message(context, message=_("You are on cooldown :(, try again in {seconds} seconds!", language).format(seconds=round(exception.retry_after, 1)))
return
logger.error('Ignoring exception in command {}:'.format(context.command))
logger.error("".join(traceback.format_exception(type(exception), exception, exception.__traceback__)))
async def hint(self, ctx, message):
hint_start = ctx.bot._(":bulb: HINT: ")
await self.send_message(ctx, message=hint_start + message)
async def log(self, title: str, message: str, where: Union[CustomContext, discord.TextChannel, discord.Guild, None], level: int):
if isinstance(where, CustomContext):
footer = f"On {where.channel.id} (#{where.channel.name}), by {where.author.id} ({where.author.name}#{where.author.discriminator})"
elif isinstance(where, discord.TextChannel):
footer = f"On {where.id} (#{where.name})"
elif isinstance(where, discord.Guild):
footer = f"On {where.id} (#{where.name})"
elif where is None:
footer = ""
else:
footer = ""
logger.warning("Unknown where on logging to a channel : " + str(where))
embed = discord.Embed(description=message)
embed.title = title
embed.set_footer(text=footer)
if 0 <= level <= 5:
embed.colour = discord.Colour.green()
elif 6 <= level <= 10:
embed.colour = discord.Colour.greyple()
elif 11 <= level <= 20:
embed.colour = discord.Colour.orange()
elif 21 <= level <= 30:
embed.colour = discord.Colour.red()
else:
embed.colour = discord.Colour.dark_red()
await self.send_message(where=self.get_channel(self.log_channel_id), embed=embed, mention=False, can_pm=False)
async def send_message(self, ctx: context.CustomContext = None, from_: discord.Member = None, where: discord.TextChannel = None, message: str = "", embed: discord.Embed = None,
can_pm: bool = True, force_pm: bool = False, mention=True, try_: int = 1, return_message: bool = False):
if not return_message:
async def send_m():
return await self.send_message(ctx=ctx, from_=from_, where=where, message=message, embed=embed, can_pm=can_pm, force_pm=force_pm, mention=mention, try_=try_, return_message=True)
return asyncio.ensure_future(send_m())
s = time.time()
original_message = message
if 10000 > len(message) > 1900:
message = message.split("\n")
current_message = 0
to_send = ""
for line in message:
line = line + "\n"
if len(to_send) + len(line) <= 1800:
to_send += line
else:
if "```" in to_send:
# TODO: Check if number of ``` match.
to_send += "```"
line = "```" + line
await self.send_message(ctx=ctx, from_=from_, where=where, message=to_send, embed=embed, can_pm=can_pm, force_pm=force_pm, mention=False, try_=try_, return_message=True) #
# Return message at true to ensure order
to_send = line
m = await self.send_message(ctx=ctx, from_=from_, where=where, message=to_send, embed=embed, can_pm=can_pm, force_pm=force_pm, mention=False, try_=try_, return_message=True)
return m
if ctx:
from_ = ctx.message.author
where = ctx.channel
logger = ctx.logger
else:
extra = {"channelid": where.id if where else 0, "userid": from_.id if from_ else 0}
logger = logging.LoggerAdapter(bot.base_logger, extra)
# bot.logger.debug(f"Long message took : {time.time() - s}.")
if where: # Where is a TextChannel
if force_pm or (can_pm and await self.db.get_pref(where, "pm_most_messages")):
if from_: # If I have someone to PM
where = await from_.create_dm()
permissions = True
else:
logger.warning(f"I want to PM this message, but I can't since I don't have a from_ User\n"
f"ctx={ctx}, from_={from_}, where={where},\n"
f"message : {message}")
permissions = True # TODO: Check perms
else:
permissions = True # TODO: Check perms
if mention and from_:
message = f"{from_.mention} > {message}"
else: # Where will be a DMChannel
if from_:
where = await from_.create_dm()
permissions = True
else:
logger.error(f"Can't send the message : don't know where to send it")
raise TypeError(f"Need a `ctx`, `to` or `where`, but ctx={ctx} | from_={from_} | where={where} given")
# bot.logger.debug(f"Where took : {time.time() - s}.")
if not permissions:
logger.warning("No permissions to speak here")
return False
# Where can be a TextChannel or a DMChannel
# from_ is a Member or None
# ctx can be a Context or None
try:
m = await where.send(message, embed=embed)
return m
except discord.errors.Forbidden:
logger.warning(f"Could not send {message} to channel : no I/O permissions")
return False
except discord.errors.NotFound:
logger.warning(f"Could not send {message} to channel : channel not found.")
return False
except Exception as e:
if try_ >= 3:
logger.warning(f"Could not send {message} to channel after 3 tries")
logger.exception("Exception for not sending is :")
await bot.log(level=15, title="Error when sending message (x3)", message=f"See bot console for exception ({e})", where=ctx)
return False
else:
return await self.send_message(ctx=ctx, from_=from_, where=where, message=original_message, embed=embed, can_pm=can_pm, force_pm=force_pm, mention=mention, try_=try_ + 1,
return_message=return_message)
bot = DuckHunt(command_prefix=get_prefix, case_insensitive=True, fetch_offline_members=False)
logger.debug("Configuring the bot")
from cogs.helpers.config import config
config(bot)
bot.base_logger = base_logger
bot.logger = logger
logger.debug("Loading the BG loop :")
from cogs import spawning
bot.loop.create_task(spawning.background_loop(bot))
logger.debug("> Loading complete")
logger.debug("Loading cogs : ")
######################
# | #
# ADD COGS HERE | #
# V #
################# ##
################# ##
################ ###
############### ####
############## #####
cogs = ['jishaku', 'cogs.admin_commands', 'cogs.analytics', 'cogs.experience_related_commands', 'cogs.helpers.database', 'cogs.meta', 'cogs.scores', 'cogs.setup_wizzard', 'cogs.shop', 'cogs.superadmin_commands',
'cogs.user_commands', 'cogs.api' # This must be the last to run, comment if you don't need it
]
for extension in cogs:
try:
bot.load_extension(extension)
logger.debug(f"> {extension} loaded!")
except Exception as e:
logger.exception('> Failed to load extension {}\n{}: {}'.format(extension, type(e).__name__, e))
logger.debug("Everything seems fine, we are now connecting to discord and entering the mainloop.")
try:
# bot.loop.set_debug(True)
bot.loop.run_until_complete(bot.start(bot.token))
except KeyboardInterrupt:
pass
except Exception as e:
logger.exception("I AM RESTARTING BECAUSE SOMETHING FAILED HERE...")
finally:
# Stop cleanly : make ducks leave
# try:
game = discord.Game(name=f"Restarting...")
bot.loop.run_until_complete(bot.change_presence(status=discord.Status.dnd, activity=game))
bot.loop.run_until_complete(spawning.make_all_ducks_leave(bot))
# except:
# pass
bot.loop.run_until_complete(bot.logout())
asyncio.sleep(3)
bot.loop.close()