From f78784cac415570a92ec82ed537a77e7dee991bb Mon Sep 17 00:00:00 2001 From: Reversedeer Date: Fri, 9 Aug 2024 23:49:25 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=E6=96=B0=E5=A2=9E=E6=96=87=E5=AD=97?= =?UTF-8?q?=E8=BD=AC=E5=9B=BE=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nonebot_plugin_eventmonitor/config.py | 11 ++- nonebot_plugin_eventmonitor/handle.py | 111 ++++++++++++++++++------- nonebot_plugin_eventmonitor/txt2img.py | 64 ++++++++++++++ nonebot_plugin_eventmonitor/utils.py | 19 +++-- 4 files changed, 165 insertions(+), 40 deletions(-) create mode 100644 nonebot_plugin_eventmonitor/txt2img.py diff --git a/nonebot_plugin_eventmonitor/config.py b/nonebot_plugin_eventmonitor/config.py index e073472..d9f7807 100644 --- a/nonebot_plugin_eventmonitor/config.py +++ b/nonebot_plugin_eventmonitor/config.py @@ -1,3 +1,4 @@ +"""消息处理文本""" import random from datetime import datetime @@ -9,8 +10,8 @@ class Config: @staticmethod async def admin_changer(sub_type, user_id, bot_qq) -> str: + """发送管理员变动消息""" admin_msg = "" - # 根据管理员变动类型选择不同的消息 if sub_type == "set": # 如果用户ID等于机器人的QQ号,返回特定消息 @@ -31,10 +32,9 @@ async def admin_changer(sub_type, user_id, bot_qq) -> str: @staticmethod async def del_user_bye(add_time, user_id): - # sourcery skip: inline-immediately-returned-variable, use-fstring-for-concatenation + """发送退群消息""" rely = "" del_time = datetime.fromtimestamp(add_time) - # 检查用户ID是否在超级用户列表superusers中 if user_id in utils.superusers: # 如果是超级用户,生成特定的离开消息 @@ -48,7 +48,7 @@ async def del_user_bye(add_time, user_id): @staticmethod async def add_user_wecome(add_time, user_id, bot_qq): - + """发送入群消息""" # 将时间戳转换为datetime类型的时间add_time add_time = datetime.fromtimestamp(add_time) rely = "" @@ -69,6 +69,7 @@ async def add_user_wecome(add_time, user_id, bot_qq): @staticmethod async def monitor_rongyu(honor_type, user_id, bot_qq) -> str: + """发送群荣誉变化消息""" rely = "" # 根据honor_type选择不同的消息 if honor_type == "emotion": @@ -108,6 +109,7 @@ async def monitor_rongyu(honor_type, user_id, bot_qq) -> str: @staticmethod async def rad_package_change(target_id, bot_qq) -> str: + """发送运气王变化消息""" rely = "" if target_id == bot_qq: rely = "你们又不行了,本喵喜提运气王🧧" @@ -126,6 +128,7 @@ async def chuo_send_msg() -> str: @staticmethod async def upload_files(user_id) -> Message: + """发送上传群文件消息""" return ( MessageSegment.image( f'https://q4.qlogo.cn/headimg_dl?dst_uin={user_id}&spec=640' diff --git a/nonebot_plugin_eventmonitor/handle.py b/nonebot_plugin_eventmonitor/handle.py index f059cd1..4bfb2b7 100644 --- a/nonebot_plugin_eventmonitor/handle.py +++ b/nonebot_plugin_eventmonitor/handle.py @@ -1,3 +1,4 @@ +"""事件处理""" import os import json import shutil @@ -10,6 +11,7 @@ from nonebot.matcher import Matcher from nonebot.adapters.onebot.v11 import ( Bot, Message, + MessageSegment, PokeNotifyEvent, HonorNotifyEvent, GroupUploadNoticeEvent, @@ -23,10 +25,12 @@ from .utils import utils from .config import config from .update import update +from .txt2img import txt_to_img class Eventmonitor: @staticmethod - async def chuo(matcher: Matcher, event: PokeNotifyEvent) -> NoReturn: + async def chuo(matcher: Matcher, event: PokeNotifyEvent) -> None: + """戳一戳""" if not (await utils.check_chuo(utils.g_temp, str(event.group_id))): await matcher.finish(utils.notAllow) # 获取用户id @@ -39,63 +43,85 @@ async def chuo(matcher: Matcher, event: PokeNotifyEvent) -> NoReturn: if cd > utils.chuo_cd or event.get_user_id() in nonebot.get_driver().config.superusers: utils.chuo_CD_dir.update({uid: event.time}) rely_msg: str = await config.chuo_send_msg() - await matcher.finish(message=Message(rely_msg)) - #群荣誉事件 + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg))) + @staticmethod async def qrongyu(matcher: Matcher, event: HonorNotifyEvent, bot: Bot) -> None: + """群荣誉事件""" if not (await utils.check_honor(utils.g_temp, str(event.group_id))): return bot_qq = int(bot.self_id) rely_msg: str = await config.monitor_rongyu(event.honor_type, event.user_id, bot_qq) - await matcher.finish(message=Message(rely_msg), at_sender=True) + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)), at_sender=True) - #群文件事件 @staticmethod async def files(matcher: Matcher, event: GroupUploadNoticeEvent) -> None: + """群文件事件""" if not (await utils.check_file(utils.g_temp, str(event.group_id))): return - rely_msg: Message= await config.upload_files(event.user_id) - await matcher.finish(message=rely_msg, at_sender=True) + rely_msg= await config.upload_files(event.user_id) + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(str(rely_msg))), at_sender=True) - #退群事件 @staticmethod async def del_user(matcher: Matcher, event: GroupDecreaseNoticeEvent) -> None: + """退群事件""" if not (await utils.check_del_user(utils.g_temp, str(event.group_id))): return rely_msg= await config.del_user_bye(event.time, event.user_id) - await matcher.finish(message=Message(rely_msg)) + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(str(rely_msg))), at_sender=True) - #入群事件 @staticmethod async def add_user(matcher: Matcher, event: GroupIncreaseNoticeEvent, bot: Bot) -> None: + """入群事件""" await utils.config_check() if not (await utils.check_add_user(utils.g_temp, str(event.group_id))): return bot_qq = int(bot.self_id) - rely_msg = await config.add_user_wecome(event.time, event.user_id, bot_qq) - await matcher.finish(message=Message(rely_msg), at_sender=True) + rely_msg = await config.add_user_wecome(event.time, event.user_id, bot_qq) + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(str(rely_msg))), at_sender=True) - #管理员变动 @staticmethod async def admin_chance(matcher: Matcher, event: GroupAdminNoticeEvent, bot: Bot) -> None: + """管理员变动事件""" if not (await utils.check_admin(utils.g_temp, str(event.group_id))): return bot_qq = int(bot.self_id) rely_msg: str = await config.admin_changer(event.sub_type, event.user_id, bot_qq) - await matcher.finish(message=Message(rely_msg), at_sender=True) + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)), at_sender=True) - #红包运气王 @staticmethod async def hongbao(matcher: Matcher, event: LuckyKingNotifyEvent, bot: Bot) -> None: + """红包运气王事件""" if not (await utils.check_red_package(utils.g_temp, str(event.group_id))): return bot_qq = int(bot.self_id) rely_msg = await config.rad_package_change(event.target_id, bot_qq) - await matcher.finish(message=Message(rely_msg), at_sender=True) + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)), at_sender=True) @staticmethod async def switch(matcher: Matcher, event: GroupMessageEvent) -> None: - # 获取开关指令的参数,即用户输入的指令内容 + """获取开关指令的参数,即用户输入的指令内容""" command = str(event.get_message()).strip() # 获取群组ID gid = str(event.group_id) @@ -105,47 +131,68 @@ async def switch(matcher: Matcher, event: GroupMessageEvent) -> None: utils.g_temp[gid][key] = True utils.write_group_data(utils.g_temp) name = utils.get_function_name(key) - await matcher.finish(f"{name}功能已开启喵") + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(f"{name}功能已开启喵") + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(f"{name}功能已开启喵"))) elif "禁止" in command or "关闭" in command: if key := utils.get_command_type(command): utils.g_temp[gid][key] = False utils.write_group_data(utils.g_temp) name = utils.get_function_name(key) - await matcher.finish(f"{name}功能已禁用喵") + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(f"{name}功能已关闭喵") + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(f"{name}功能已关闭喵"))) + + @staticmethod + async def usage(matcher: Matcher) -> NoReturn: + """获取指令帮助""" + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(utils.usage) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(utils.usage))) @staticmethod async def state(matcher: Matcher, event:GroupMessageEvent) -> None: + """指令开关""" gid = str(event.group_id) with open(utils.address, "r", encoding="utf-8") as f: group_status = json.load(f) + if gid not in group_status: await utils.config_check() - else: - await matcher.finish(f"群{gid}的Event配置状态:\n" + "\n".join - ( - [f"{utils.path[func][0]}: {'开启' if group_status[gid][func] else '关闭'}" - for func in utils.path] + with open(utils.address, "r", encoding="utf-8") as f: + group_status = json.load(f) + + rely_msg = (f"群{gid}的Event配置状态:\n" + "\n".join( + [f"{utils.path[func][0]}: {'开启' if group_status[gid][func] else '关闭'}" + for func in utils.path] + ) ) - ) + if not (await utils.check_txt_to_img(utils.check_txt_img)): + await matcher.finish(rely_msg) + else: + await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg))) @staticmethod - async def check_bot(matcher: Matcher): + async def check_bot(matcher: Matcher) -> None: + """检测插件更新""" try: code, error = await eventmonitor.check_update(matcher) if error: logger.error(f"错误: {error}", "插件检查更新") - await matcher.send(f"更新插件nonebot-plugin-evenrmonitor时发生错误:\n{error}") + await matcher.send(f"下载更新插件nonebot-plugin-evenrmonitor时发生网络错误:\n{error}") except Exception as e: - logger.error("更新插件nonebot-plugin-evenrmonitor时发生错误", "检查更新", e=e) - await matcher.send(f"更新插件nonebot-plugin-evenrmonitor时发生错误 \n{type(e)}: {e}") + logger.error("备份更新插件nonebot-plugin-evenrmonitor时发生错误", e=e) + await matcher.send(f"备份更新插件nonebot-plugin-evenrmonitor时发生错误 \n{type(e)}: {e}") else: if code == 200: await matcher.finish("更新完毕,请重启bot....") - - @staticmethod async def check_update(matcher: Matcher): + """检查插件更新""" logger.info("开始检查插件更新...") data = await update.get_latest_version_data() if data: @@ -191,6 +238,7 @@ async def check_update(matcher: Matcher): @staticmethod async def _file_handle(): + """更新插件""" # 接收最新版本号作为参数,并返回处理结果字符串 if not update.temp_dir.exists(): @@ -246,6 +294,7 @@ async def _file_handle(): except Exception as e: raise e os.system(f"poetry run pip install -r {(update_info_file / 'requirements.txt').absolute()}") + # 使用os.system命令执行shell命令,安装更新后的依赖包 if tf: tf.close() # 关闭tarfile对象,释放资源 diff --git a/nonebot_plugin_eventmonitor/txt2img.py b/nonebot_plugin_eventmonitor/txt2img.py new file mode 100644 index 0000000..824560f --- /dev/null +++ b/nonebot_plugin_eventmonitor/txt2img.py @@ -0,0 +1,64 @@ +"""一个工具类, 将文本转换为图片, 根据文本长度自动换行""" +from io import BytesIO +from pathlib import Path + +from PIL import Image, ImageDraw, ImageFont + + +class TxtToImg: + def __init__(self) -> None: + self.LINE_CHAR_COUNT = 30 * 2 + self.CHAR_SIZE = 30 + self.TABLE_WIDTH = 4 + self.module_path: Path = Path(__file__).parent + self.font = str(self.module_path / "fonts" / "SIMYOU.TTF") + + async def line_break(self, line: str) -> str: + """将一行文本按照指定宽度进行换行""" + ret = "" + width = 0 + for c in line: + if len(c.encode("utf8")) == 3: # 中文 + if self.LINE_CHAR_COUNT == width + 1: # 剩余位置不够一个汉字 + width = 2 + ret += "\n" + c + else: # 中文宽度加2,注意换行边界 + width += 2 + ret += c + elif c == "\n": + width = 0 + ret += c + elif c == "\t": + space_c = ( + self.TABLE_WIDTH - width % self.TABLE_WIDTH + ) # 已有长度对TABLE_WIDTH取余 + ret += " " * space_c + width += space_c + else: + width += 1 + ret += c + if width >= self.LINE_CHAR_COUNT: + ret += "\n" + width = 0 + return ret if ret.endswith("\n") else ret + "\n" + + async def txt_to_img(self, text: str, font_size: int = 30) -> bytes: + """将文本转换为图片""" + text = await self.line_break(text) + d_font = ImageFont.truetype(self.font, font_size) + lines = text.count("\n") + image = Image.new( + "L", + (self.LINE_CHAR_COUNT * font_size // 2 + 50, font_size * lines + 50), + "white", + ) + draw_table = ImageDraw.Draw(im=image) + draw_table.text(xy=(25, 25), text=text, fill="#000000", font=d_font, spacing=4) + new_img = image.convert("RGB") + img_byte = BytesIO() + new_img.save(img_byte, format="PNG") + return img_byte.getvalue() + + +# 创建一个实例 +txt_to_img = TxtToImg() diff --git a/nonebot_plugin_eventmonitor/utils.py b/nonebot_plugin_eventmonitor/utils.py index c96a9c4..c9141e1 100644 --- a/nonebot_plugin_eventmonitor/utils.py +++ b/nonebot_plugin_eventmonitor/utils.py @@ -1,3 +1,4 @@ +"""依赖文件""" import os import json import nonebot @@ -26,8 +27,9 @@ def __init__(self) -> None: 指令5:群成员增加检测(当有人入群时,发送入群欢迎,当bot首次入群会乞讨管理,当主人/superuser入群会有特殊回复) 指令6:管理员变动检测(当新增管理员或取消管理员时发送消息提示,当bot自身被上/下管理时有特殊回复) 指令7:运气王检测(检测抢红包检测后的运气王并发送提示消息) - 指令8:检查event更新 - 指令9:重启""" + 指令8:更新插件eventmonitor + 指令9:重启 + 指令10:event配置""" self.notAllow = '功能未开启' self.path = { 'chuo': ['戳一戳'], @@ -43,11 +45,12 @@ def __init__(self) -> None: self.config_path = Path() / 'data/eventmonitor' self.address = self.config_path / 'config.json' config = nonebot.get_driver().config - self.superusers = {int(uid) for uid in config.superusers} - self.nickname = next(iter(config.nickname)) + self.superusers: set[int] = {int(uid) for uid in config.superusers} + self.nickname: str = next(iter(config.nickname), "Bot") self.chuo_cd: int = getattr(config, "chuo_cd", 10) self.check_bot_update: bool = getattr(config, "isalive", True) - self.current_version = '0.3.1' + self.check_txt_img:bool = getattr(config, "event_img", False) + self.current_version = '0.3.2' #戳一戳文案 self.chuo_msg = [ f"别戳了,{self.nickname}怕疼QwQ", @@ -212,6 +215,12 @@ async def check_red_package(g_temp, gid: str) -> bool: if gid in g_temp and not g_temp[gid]["red_package"]: return False return g_temp[gid]["red_package"] + + @staticmethod + async def check_txt_to_img(check_txt_img): + if not utils.check_txt_img: + return False + return check_txt_img def get_function_name(self, key: str) -> str: """根据关键词获取对应功能名称"""