Skip to content

Commit

Permalink
✨新增文字转图片
Browse files Browse the repository at this point in the history
  • Loading branch information
Reversedeer committed Aug 9, 2024
1 parent 259779d commit f78784c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 40 deletions.
11 changes: 7 additions & 4 deletions nonebot_plugin_eventmonitor/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""消息处理文本"""
import random

from datetime import datetime
Expand All @@ -9,8 +10,8 @@
class Config:
@staticmethod
async def admin_changer(sub_type, user_id, bot_qq) -> str:
"""发送管理员变动消息"""
admin_msg = ""

# 根据管理员变动类型选择不同的消息
if sub_type == "set":
# 如果用户ID等于机器人的QQ号,返回特定消息
Expand All @@ -31,10 +32,9 @@ async def admin_changer(sub_type, user_id, bot_qq) -> str:

@staticmethod
async def del_user_bye(add_time, user_id):
# sourcery skip: inline-immediately-returned-variable, use-fstring-for-concatenation
"""发送退群消息"""
rely = ""
del_time = datetime.fromtimestamp(add_time)

# 检查用户ID是否在超级用户列表superusers中
if user_id in utils.superusers:
# 如果是超级用户,生成特定的离开消息
Expand All @@ -48,7 +48,7 @@ async def del_user_bye(add_time, user_id):

@staticmethod
async def add_user_wecome(add_time, user_id, bot_qq):

"""发送入群消息"""
# 将时间戳转换为datetime类型的时间add_time
add_time = datetime.fromtimestamp(add_time)
rely = ""
Expand All @@ -69,6 +69,7 @@ async def add_user_wecome(add_time, user_id, bot_qq):

@staticmethod
async def monitor_rongyu(honor_type, user_id, bot_qq) -> str:
"""发送群荣誉变化消息"""
rely = ""
# 根据honor_type选择不同的消息
if honor_type == "emotion":
Expand Down Expand Up @@ -108,6 +109,7 @@ async def monitor_rongyu(honor_type, user_id, bot_qq) -> str:

@staticmethod
async def rad_package_change(target_id, bot_qq) -> str:
"""发送运气王变化消息"""
rely = ""
if target_id == bot_qq:
rely = "你们又不行了,本喵喜提运气王🧧"
Expand All @@ -126,6 +128,7 @@ async def chuo_send_msg() -> str:

@staticmethod
async def upload_files(user_id) -> Message:
"""发送上传群文件消息"""
return (
MessageSegment.image(
f'https://q4.qlogo.cn/headimg_dl?dst_uin={user_id}&spec=640'
Expand Down
111 changes: 80 additions & 31 deletions nonebot_plugin_eventmonitor/handle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""事件处理"""
import os
import json
import shutil
Expand All @@ -10,6 +11,7 @@
from nonebot.matcher import Matcher
from nonebot.adapters.onebot.v11 import (
Bot, Message,
MessageSegment,
PokeNotifyEvent,
HonorNotifyEvent,
GroupUploadNoticeEvent,
Expand All @@ -23,10 +25,12 @@
from .utils import utils
from .config import config
from .update import update
from .txt2img import txt_to_img

class Eventmonitor:
@staticmethod
async def chuo(matcher: Matcher, event: PokeNotifyEvent) -> NoReturn:
async def chuo(matcher: Matcher, event: PokeNotifyEvent) -> None:
"""戳一戳"""
if not (await utils.check_chuo(utils.g_temp, str(event.group_id))):
await matcher.finish(utils.notAllow)
# 获取用户id
Expand All @@ -39,63 +43,85 @@ async def chuo(matcher: Matcher, event: PokeNotifyEvent) -> NoReturn:
if cd > utils.chuo_cd or event.get_user_id() in nonebot.get_driver().config.superusers:
utils.chuo_CD_dir.update({uid: event.time})
rely_msg: str = await config.chuo_send_msg()
await matcher.finish(message=Message(rely_msg))
#群荣誉事件
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)))

@staticmethod
async def qrongyu(matcher: Matcher, event: HonorNotifyEvent, bot: Bot) -> None:
"""群荣誉事件"""
if not (await utils.check_honor(utils.g_temp, str(event.group_id))):
return
bot_qq = int(bot.self_id)
rely_msg: str = await config.monitor_rongyu(event.honor_type, event.user_id, bot_qq)
await matcher.finish(message=Message(rely_msg), at_sender=True)
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)), at_sender=True)

#群文件事件
@staticmethod
async def files(matcher: Matcher, event: GroupUploadNoticeEvent) -> None:
"""群文件事件"""
if not (await utils.check_file(utils.g_temp, str(event.group_id))):
return
rely_msg: Message= await config.upload_files(event.user_id)
await matcher.finish(message=rely_msg, at_sender=True)
rely_msg= await config.upload_files(event.user_id)
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(str(rely_msg))), at_sender=True)

#退群事件
@staticmethod
async def del_user(matcher: Matcher, event: GroupDecreaseNoticeEvent) -> None:
"""退群事件"""
if not (await utils.check_del_user(utils.g_temp, str(event.group_id))):
return
rely_msg= await config.del_user_bye(event.time, event.user_id)
await matcher.finish(message=Message(rely_msg))
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(str(rely_msg))), at_sender=True)

#入群事件
@staticmethod
async def add_user(matcher: Matcher, event: GroupIncreaseNoticeEvent, bot: Bot) -> None:
"""入群事件"""
await utils.config_check()
if not (await utils.check_add_user(utils.g_temp, str(event.group_id))):
return
bot_qq = int(bot.self_id)
rely_msg = await config.add_user_wecome(event.time, event.user_id, bot_qq)
await matcher.finish(message=Message(rely_msg), at_sender=True)
rely_msg = await config.add_user_wecome(event.time, event.user_id, bot_qq)
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(str(rely_msg))), at_sender=True)

#管理员变动
@staticmethod
async def admin_chance(matcher: Matcher, event: GroupAdminNoticeEvent, bot: Bot) -> None:
"""管理员变动事件"""
if not (await utils.check_admin(utils.g_temp, str(event.group_id))):
return
bot_qq = int(bot.self_id)
rely_msg: str = await config.admin_changer(event.sub_type, event.user_id, bot_qq)
await matcher.finish(message=Message(rely_msg), at_sender=True)
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)), at_sender=True)

#红包运气王
@staticmethod
async def hongbao(matcher: Matcher, event: LuckyKingNotifyEvent, bot: Bot) -> None:
"""红包运气王事件"""
if not (await utils.check_red_package(utils.g_temp, str(event.group_id))):
return
bot_qq = int(bot.self_id)
rely_msg = await config.rad_package_change(event.target_id, bot_qq)
await matcher.finish(message=Message(rely_msg), at_sender=True)
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)), at_sender=True)

@staticmethod
async def switch(matcher: Matcher, event: GroupMessageEvent) -> None:
# 获取开关指令的参数,即用户输入的指令内容
"""获取开关指令的参数,即用户输入的指令内容"""
command = str(event.get_message()).strip()
# 获取群组ID
gid = str(event.group_id)
Expand All @@ -105,47 +131,68 @@ async def switch(matcher: Matcher, event: GroupMessageEvent) -> None:
utils.g_temp[gid][key] = True
utils.write_group_data(utils.g_temp)
name = utils.get_function_name(key)
await matcher.finish(f"{name}功能已开启喵")
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(f"{name}功能已开启喵")
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(f"{name}功能已开启喵")))
elif "禁止" in command or "关闭" in command:
if key := utils.get_command_type(command):
utils.g_temp[gid][key] = False
utils.write_group_data(utils.g_temp)
name = utils.get_function_name(key)
await matcher.finish(f"{name}功能已禁用喵")
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(f"{name}功能已关闭喵")
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(f"{name}功能已关闭喵")))

@staticmethod
async def usage(matcher: Matcher) -> NoReturn:
"""获取指令帮助"""
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(utils.usage)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(utils.usage)))

@staticmethod
async def state(matcher: Matcher, event:GroupMessageEvent) -> None:
"""指令开关"""
gid = str(event.group_id)
with open(utils.address, "r", encoding="utf-8") as f:
group_status = json.load(f)

if gid not in group_status:
await utils.config_check()
else:
await matcher.finish(f"群{gid}的Event配置状态:\n" + "\n".join
(
[f"{utils.path[func][0]}: {'开启' if group_status[gid][func] else '关闭'}"
for func in utils.path]
with open(utils.address, "r", encoding="utf-8") as f:
group_status = json.load(f)

rely_msg = (f"群{gid}的Event配置状态:\n" + "\n".join(
[f"{utils.path[func][0]}: {'开启' if group_status[gid][func] else '关闭'}"
for func in utils.path]
)
)
)
if not (await utils.check_txt_to_img(utils.check_txt_img)):
await matcher.finish(rely_msg)
else:
await matcher.send(MessageSegment.image(await txt_to_img.txt_to_img(rely_msg)))

@staticmethod
async def check_bot(matcher: Matcher):
async def check_bot(matcher: Matcher) -> None:
"""检测插件更新"""
try:
code, error = await eventmonitor.check_update(matcher)
if error:
logger.error(f"错误: {error}", "插件检查更新")
await matcher.send(f"更新插件nonebot-plugin-evenrmonitor时发生错误:\n{error}")
await matcher.send(f"下载更新插件nonebot-plugin-evenrmonitor时发生网络错误:\n{error}")
except Exception as e:
logger.error("更新插件nonebot-plugin-evenrmonitor时发生错误", "检查更新", e=e)
await matcher.send(f"更新插件nonebot-plugin-evenrmonitor时发生错误 \n{type(e)}: {e}")
logger.error("备份更新插件nonebot-plugin-evenrmonitor时发生错误", e=e)
await matcher.send(f"备份更新插件nonebot-plugin-evenrmonitor时发生错误 \n{type(e)}: {e}")
else:
if code == 200:
await matcher.finish("更新完毕,请重启bot....")



@staticmethod
async def check_update(matcher: Matcher):
"""检查插件更新"""
logger.info("开始检查插件更新...")
data = await update.get_latest_version_data()
if data:
Expand Down Expand Up @@ -191,6 +238,7 @@ async def check_update(matcher: Matcher):

@staticmethod
async def _file_handle():
"""更新插件"""
# 接收最新版本号作为参数,并返回处理结果字符串

if not update.temp_dir.exists():
Expand Down Expand Up @@ -246,6 +294,7 @@ async def _file_handle():
except Exception as e:
raise e
os.system(f"poetry run pip install -r {(update_info_file / 'requirements.txt').absolute()}")
# 使用os.system命令执行shell命令,安装更新后的依赖包
if tf:
tf.close()
# 关闭tarfile对象,释放资源
Expand Down
64 changes: 64 additions & 0 deletions nonebot_plugin_eventmonitor/txt2img.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""一个工具类, 将文本转换为图片, 根据文本长度自动换行"""
from io import BytesIO
from pathlib import Path

from PIL import Image, ImageDraw, ImageFont


class TxtToImg:
def __init__(self) -> None:
self.LINE_CHAR_COUNT = 30 * 2
self.CHAR_SIZE = 30
self.TABLE_WIDTH = 4
self.module_path: Path = Path(__file__).parent
self.font = str(self.module_path / "fonts" / "SIMYOU.TTF")

async def line_break(self, line: str) -> str:
"""将一行文本按照指定宽度进行换行"""
ret = ""
width = 0
for c in line:
if len(c.encode("utf8")) == 3: # 中文
if self.LINE_CHAR_COUNT == width + 1: # 剩余位置不够一个汉字
width = 2
ret += "\n" + c
else: # 中文宽度加2,注意换行边界
width += 2
ret += c
elif c == "\n":
width = 0
ret += c
elif c == "\t":
space_c = (
self.TABLE_WIDTH - width % self.TABLE_WIDTH
) # 已有长度对TABLE_WIDTH取余
ret += " " * space_c
width += space_c
else:
width += 1
ret += c
if width >= self.LINE_CHAR_COUNT:
ret += "\n"
width = 0
return ret if ret.endswith("\n") else ret + "\n"

async def txt_to_img(self, text: str, font_size: int = 30) -> bytes:
"""将文本转换为图片"""
text = await self.line_break(text)
d_font = ImageFont.truetype(self.font, font_size)
lines = text.count("\n")
image = Image.new(
"L",
(self.LINE_CHAR_COUNT * font_size // 2 + 50, font_size * lines + 50),
"white",
)
draw_table = ImageDraw.Draw(im=image)
draw_table.text(xy=(25, 25), text=text, fill="#000000", font=d_font, spacing=4)
new_img = image.convert("RGB")
img_byte = BytesIO()
new_img.save(img_byte, format="PNG")
return img_byte.getvalue()


# 创建一个实例
txt_to_img = TxtToImg()
19 changes: 14 additions & 5 deletions nonebot_plugin_eventmonitor/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""依赖文件"""
import os
import json
import nonebot
Expand Down Expand Up @@ -26,8 +27,9 @@ def __init__(self) -> None:
指令5:群成员增加检测(当有人入群时,发送入群欢迎,当bot首次入群会乞讨管理,当主人/superuser入群会有特殊回复)
指令6:管理员变动检测(当新增管理员或取消管理员时发送消息提示,当bot自身被上/下管理时有特殊回复)
指令7:运气王检测(检测抢红包检测后的运气王并发送提示消息)
指令8:检查event更新
指令9:重启"""
指令8:更新插件eventmonitor
指令9:重启
指令10:event配置"""
self.notAllow = '功能未开启'
self.path = {
'chuo': ['戳一戳'],
Expand All @@ -43,11 +45,12 @@ def __init__(self) -> None:
self.config_path = Path() / 'data/eventmonitor'
self.address = self.config_path / 'config.json'
config = nonebot.get_driver().config
self.superusers = {int(uid) for uid in config.superusers}
self.nickname = next(iter(config.nickname))
self.superusers: set[int] = {int(uid) for uid in config.superusers}
self.nickname: str = next(iter(config.nickname), "Bot")
self.chuo_cd: int = getattr(config, "chuo_cd", 10)
self.check_bot_update: bool = getattr(config, "isalive", True)
self.current_version = '0.3.1'
self.check_txt_img:bool = getattr(config, "event_img", False)
self.current_version = '0.3.2'
#戳一戳文案
self.chuo_msg = [
f"别戳了,{self.nickname}怕疼QwQ",
Expand Down Expand Up @@ -212,6 +215,12 @@ async def check_red_package(g_temp, gid: str) -> bool:
if gid in g_temp and not g_temp[gid]["red_package"]:
return False
return g_temp[gid]["red_package"]

@staticmethod
async def check_txt_to_img(check_txt_img):
if not utils.check_txt_img:
return False
return check_txt_img

def get_function_name(self, key: str) -> str:
"""根据关键词获取对应功能名称"""
Expand Down

0 comments on commit f78784c

Please sign in to comment.