Compare commits

...

5 Commits

17 changed files with 569 additions and 271 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@ napcat/
.venv .venv
data/ data/
logs/ logs/
uuid.db

View File

@@ -1,30 +1,63 @@
#napcat配置 # ncatbot 配置
# 此处仅提供ncatbot部分配置项其余配置项请在ncatbot.yaml中修改
# 此处配置优先级高于ncatbot.yaml
# 请务必保证bot_qq及root_qq配置正确
# 在ws监听IP为0.0.0.0时要求ws_token为强密码启用web_uienable_webui_interaction = True检查时要求webui_token为强密码
# 强密码要求: 1至少 12 位字符 2包含数字、字母和特殊符号特殊符号包括!@#$%^&*()_+-=[]{}|;:,.<>?
bot_qq = 123456 #机器人q号 bot_qq = 123456 #机器人q号
root_qq = 1234567 # 管理员q号 root_qq = 1234567 # 管理员q号
ws_uri = "loacalhost:3001" # ws 地址, 可自定义端口, 默认 3001 ws_uri = "localhost:3001" # ws 地址, 可自定义端口, 默认 3001
webui_uri = "loacalhost:6099" # webui 地址, 可自定义端口, 默认 6099 webui_uri = "localhost:6099" # webui 地址, 可自定义端口, 默认 6099
webui_token = "napcat" # webui 令牌, 默认 napcat webui_token = "napcat" # webui 令牌, 默认 napcat
ws_token = "" # ws_uri 令牌, 默认留空 ws_token = "" # ws_uri 令牌, 默认留空
ws_listen_ip = "localhost" # ws_uri 监听 ip, 默认 localhost 监听本机,监听全部则配置 0.0.0.0 ws_listen_ip = "localhost" # ws_uri 监听 ip, 默认 localhost 监听本机,监听全部则配置 0.0.0.0
remote_mode = false # 是否远程模式, 即 NapCat 服务不在本机运行 psncatbot官方已废弃该参数 enable_webui_interaction = true # 是否启用webui交互默认启用
# 功能配置 # 功能配置
allowed_groups = "all" # 授权群聊all为全部eg[123456789, 987654321] allowed_groups = "all" # 授权群聊all为全部eg[123456789, 987654321]
allowed_users = "all" # 授权用户all为全部eg[123456789, 987654321] allowed_users = "all" # 授权用户all为全部eg[123456789, 987654321]
ai_service = "xyit" # ai平台 支持 dify” “xyit" ai_service = "" # ai平台 支持 "dify" "mcunc" "aliyun"
friend_auto = false # 好友自动同意 friend_auto = false # 好友自动同意,实验性功能
group_auto = true group_auto = true
group_welcome = false # 入群欢迎 group_welcome = false # 入群欢迎
group_welcome_message = "!at 欢迎加入本群,使用@bot /help获取此bot帮助" # 入群消息 !at 为@加群用户
group_leave = false # 退群提醒 group_leave = false # 退群提醒
# 语言配置
group_welcome_message = "!at 欢迎加入本群,使用@bot /help获取此bot帮助" # 入群消息 !at 为@加群用户
group_leave_message = "用户{userid}退群了" # 退群消息 {userid}为退群用户id group_leave_message = "用户{userid}退群了" # 退群消息 {userid}为退群用户id
error_message = "服务器繁忙,请稍后再逝"
permission_denied_message = "此bot未在该群启用"
status_message = "---chatbot ncapcat---\n Q bot 运行正常 \n 版本: 2.0 pre \n © 无尽创意MCUNC"
help_message = "直接输入聊天内容即可 \n /help -- 获取帮助 \n /clear [群号 / private:Q号] (all 为全部,不填为本 群/用户) \n /status -- 查看bot状态"
command_isnone_message = "你似乎没有提供想和我聊的内容喵~ \n 直接输入聊天内容即可"
command_notfound_message = "指令不存在,输入/help查看帮助"
chatmessage_isnone_message = "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>"
clean_all_success_message = "✅ 已清空所有群组数据"
clean_one_success_message = "✅ 已成功删除 group_id = {group_id} 的数据" # {group_id}为群组id
clean_one_notfound_message = "⚠️ 没有找到 group_id = {group_id} 的数据" # {group_id}为群组id
clean_nopermissoin_message = "你不是管理员哦喵~"
clean_fail_message = "❌ 数据库操作失败"
# 用户标识配置
# 此项用户在消息开头添加用户qid以便于ai识别用户需在ai模型中添加提示词
# 如使用xyit平台请勿修改
enable_qid = true # 是否启用
qid_prefix = "<qid>" # 前缀
qid_suffix = "</qid>" # 后缀
# dify配置 ai_service选择dify时需配置 # dify配置 ai_service选择dify时需配置
dify_ip= "" # ip:端口 dify_ip= "" # https://ip:port
dify_token = "" # token dify_token = "" # token
# xyit配置 ai_service选择xyit时需配置 # mcunc配置 ai_service选择mcunc时需配置
xyit_ip = "ai.openapi.xyit.net" # ip 此项一般不需用修改 mcunc_ip = "https://ai.openapi.mcunc.cn" # ip 此项一般不需用修改
xyit_appID = "" # appID mcunc_appID = "" # appID
xyit_appKEY = "" # appKEY mcunc_appKEY = "" # appKEY
xyit_model = "maoniang" # 模型名称 mcunc_model = "maoniang" # 模型名称名称
# 阿里云百炼配置 ai_service选择aliyun时需配置
aliyun_app_id = ""
aliyun_api_key = ""

View File

@@ -1,10 +1,13 @@
import logging from ncatbot.core.event.message_segment import At, Text
from model.AiCat import AiCat from model import ai_model
from model.Clear import Clear from model.Clear import Clear
import toml import toml
from model.logger import setup_logger
logger = setup_logger()
class group: class group:
def __init__(self, msg): def __init__(self, msg):
self.self_id = msg.self_id
self.user_id = msg.user_id self.user_id = msg.user_id
self.group_id = msg.group_id self.group_id = msg.group_id
self.message_id = msg.message_id self.message_id = msg.message_id
@@ -12,8 +15,18 @@ class group:
self.raw_message = msg.raw_message self.raw_message = msg.raw_message
self.sender = msg.sender self.sender = msg.sender
self.message = msg.message self.message = msg.message
self.self_id = msg.self_id try:
self.time = msg.time with open("config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
self.permission_denied_message = config.get("permission_denied_message")
self.error_message = config.get("error_message")
self.help_message = config.get("help_message")
self.command_isnone_message = config.get("command_none_message")
self.command_notfound_message = config.get("command_notfound_message")
self.status_message = config.get("status_message")
self.chatmessage_isnone_message = config.get("chatmessage_isnone_message")
except Exception as e:
logger.error(f"读取配置文件出现错误{e}")
def main(self): def main(self):
is_at = self.is_at() is_at = self.is_at()
@@ -22,16 +35,16 @@ class group:
else: else:
permission = self.check_permission() permission = self.check_permission()
if permission is None: if permission is None:
return "服务器繁忙,请稍后再逝" return self.error_message
elif permission: elif permission:
return self.menu(is_at) return self.menu(is_at)
else: else:
return "此bot未在该群启用" return self.permission_denied_message
def is_at(self): def is_at(self):
for seg in self.message: for seg in self.message:
if seg['type'] == 'at' and seg['data'].get('qq') == str(self.self_id): if isinstance(seg, At) and seg.qq == str(self.self_id):
texts = [s['data']['text'].strip() for s in self.message if s['type'] == 'text'] texts = [s.text.strip() for s in self.message if isinstance(s, Text)]
full_text = ' '.join(texts).strip() full_text = ' '.join(texts).strip()
return full_text return full_text
@@ -43,7 +56,7 @@ class group:
config = toml.load(f) config = toml.load(f)
allowed_groups = config.get("allowed_groups", []) allowed_groups = config.get("allowed_groups", [])
except Exception as e: except Exception as e:
logging.error(str(e)) logger.error(str(e))
return None return None
# 检查当前群是否在允许列表中 # 检查当前群是否在允许列表中
@@ -54,22 +67,22 @@ class group:
else: else:
return False return False
def menu(self,command): def menu(self,command):
if command.startswith("/help"): if command.startswith("/help"):
return " 直接输入聊天内容即可 \n /help -- 获取帮助 \n /clear [群号 / private:Q号] (all 为全部,不填为本 群/用户) \n /status -- 查看bot状态 " return self.help_message
elif command.startswith("/cat"):# 留此指令接口为了方便通过审核 elif command.startswith("/cat"):# 留此指令接口为了方便qq官bot通过审核
# 排除 " /cat "" /cat"未传参情况 # 排除 " /cat "" /cat"未传参情况
parts = command.split("/cat ", 1) parts = command.split("/cat ", 1)
if len(parts) > 1: if len(parts) > 1:
chat_content = parts[1].strip() chat_content = parts[1].strip()
if chat_content: if chat_content:
cat = AiCat(chat_content, self.user_id,self.group_id) answer = ai_model.main(chat_content, self.user_id,self.group_id)
answer = AiCat.main(cat)
return answer return answer
else: else:
return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>" return self.chatmessage_isnone_message
else: else:
return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>" return self.chatmessage_isnone_message
elif command.startswith("/clear"): elif command.startswith("/clear"):
parts = command.split("/clear ", 1) parts = command.split("/clear ", 1)
if len(parts) > 1: if len(parts) > 1:
@@ -84,14 +97,13 @@ class group:
clear = Clear(self.user_id, self.group_id) clear = Clear(self.user_id, self.group_id)
return clear.main() return clear.main()
elif command.startswith("/status"): elif command.startswith("/status"):
return "---猫娘 QBOT---\n Q bot 运行正常 \n 版本: 2.0 pre \n © 融玩文化 | 无尽创意MCUNC" return self.status_message
elif command.startswith("/"): elif command.startswith("/"):
return "指令不存在,输入/help查看帮助" return self.command_notfound_message
elif command == "": elif command == "":
return "你似乎没有提供想和我聊的内容喵~ \n 直接输入聊天内容即可" return self.command_isnone_message
elif command is None: elif command is None:
return "你似乎没有提供想和我聊的内容喵~ \n 直接输入聊天内容即可" return self.command_isnone_message
else: else:
cat = AiCat(command, self.user_id,self.group_id) answer = ai_model.main(command, self.user_id, self.group_id)
answer = AiCat.main(cat)
return answer return answer

View File

@@ -1,20 +1,15 @@
import toml import toml
import logging import logging
from model.logger import setup_logger
logger = setup_logger()
class notice: class notice:
def __init__(self,msg): def __init__(self,msg):
print(msg) self.time = msg.time
if msg["notice_type"] == "group_increase" or msg["notice_type"] == "group_decrease": self.notice_type = msg.notice_type
self.time = msg["time"] self.group_id = msg.group_id
self.self_id = msg["self_id"] self.operator_id = msg.operator_id
self.post_type = msg["post_type"] self.user_id = msg.user_id
self.notice_type = msg["notice_type"]
self.sub_type = msg["sub_type"]
self.group_id = msg["group_id"]
self.operator_id = msg["operator_id"]
self.user_id = msg["user_id"]
else:
pass
def main(self): def main(self):
if self.notice_type == "group_increase": if self.notice_type == "group_increase":
@@ -25,7 +20,6 @@ class notice:
return None return None
def group_increase(self): def group_increase(self):
print(1)
try: try:
with open("./config.toml", "r", encoding="utf-8") as f: with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f) config = toml.load(f)
@@ -41,7 +35,7 @@ class notice:
else: else:
return None return None
except Exception as e: except Exception as e:
logging.error(f"读取配置文件错误:{e}") logger.error(f"读取配置文件错误:{e}")
return None return None
def group_decrease(self): def group_decrease(self):
@@ -50,7 +44,6 @@ class notice:
config = toml.load(f) config = toml.load(f)
group_leave = config.get("group_leave") group_leave = config.get("group_leave")
group_leave_message = config.get("group_leave_message") group_leave_message = config.get("group_leave_message")
print(group_leave_message)
if group_leave: if group_leave:
if "{userid}" in group_leave_message: if "{userid}" in group_leave_message:
return group_leave_message.replace("{userid}", str(self.user_id)) return group_leave_message.replace("{userid}", str(self.user_id))
@@ -59,5 +52,5 @@ class notice:
else: else:
return None return None
except Exception as e: except Exception as e:
logging.error(f"读取配置文件错误:{e}") logger.error(f"读取配置文件错误:{e}")
return None return None

View File

@@ -1,8 +1,12 @@
import logging from ncatbot.core.event.message_segment import Text
from model.AiCat import AiCat from model.logger import setup_logger
from model.Clear import Clear from model.Clear import Clear
from model import ai_model
import toml import toml
logger = setup_logger()
class private: class private:
def __init__(self, msg): def __init__(self, msg):
self.user_id = msg.user_id self.user_id = msg.user_id
@@ -13,17 +17,29 @@ class private:
self.message = msg.message self.message = msg.message
self.self_id = msg.self_id self.self_id = msg.self_id
self.time = msg.time self.time = msg.time
try:
with open("config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
self.permission_denied_message = config.get("permission_denied_message")
self.error_message = config.get("error_message")
self.help_message = config.get("help_message")
self.command_isnone_message = config.get("command_none_message")
self.command_notfound_message = config.get("command_notfound_message")
self.status_message = config.get("status_message")
self.chatmessage_isnone_message = config.get("chatmessage_isnone_message")
except Exception as e:
logger.error(f"读取配置文件出现错误{e}")
def main(self): def main(self):
texts = [seg['data']['text'].strip() for seg in self.message if seg['type'] == 'text'] texts = [seg.text.strip() for seg in self.message if isinstance(seg, Text)]
full_text = ' '.join(texts).strip() full_text = ' '.join(texts).strip()
permission = self.check_permission() permission = self.check_permission()
if permission is None: if permission is None:
return "服务器繁忙,请稍后再逝" return self.error_message
elif permission: elif permission:
return self.menu(full_text) return self.menu(full_text)
else: else:
return "此bot未在该群启用" return self.permission_denied_message
def check_permission(self): def check_permission(self):
try: try:
@@ -31,7 +47,7 @@ class private:
config = toml.load(f) config = toml.load(f)
allowed_users = config.get("allowed_users", []) allowed_users = config.get("allowed_users", [])
except Exception as e: except Exception as e:
logging.error(str(e)) logger.error(str(e))
return None return None
# 检查当前群是否在允许列表中 # 检查当前群是否在允许列表中
@@ -44,20 +60,19 @@ class private:
def menu(self,command): def menu(self,command):
if command.startswith("/help"): if command.startswith("/help"):
return " 直接输入聊天内容即可 \n /help -- 获取帮助 \n /clear [群号 / private:Q号] (all 为全部,不填为本 群/用户) \n /status -- 查看bot状态 " return self.help_message
elif command.startswith(" /cat"):# 留此指令接口为了方便通过审核 elif command.startswith(" /cat"):# 留此指令接口为了方便通过审核
# 排除 " /cat "" /cat"未传参情况 # 排除 " /cat "" /cat"未传参情况
parts = command.split(" /cat ", 1) parts = command.split(" /cat ", 1)
if len(parts) > 1: if len(parts) > 1:
chat_content = parts[1].strip() chat_content = parts[1].strip()
if chat_content: if chat_content:
cat = AiCat(chat_content,self.user_id , f"private:{self.user_id}") answer = ai_model.main(chat_content, self.user_id, f"private:{self.user_id}")
answer = AiCat.main(cat)
return answer return answer
else: else:
return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>" return self.chatmessage_isnone_message
else: else:
return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>" return self.chatmessage_isnone_message
elif command.startswith("/clear"): elif command.startswith("/clear"):
parts = command.split("/clear ", 1) parts = command.split("/clear ", 1)
if len(parts) > 1: if len(parts) > 1:
@@ -72,14 +87,13 @@ class private:
clear = Clear(self.user_id, f"private:{self.user_id}") clear = Clear(self.user_id, f"private:{self.user_id}")
return clear.main() return clear.main()
elif command.startswith(" /status"): elif command.startswith(" /status"):
return "---猫娘 QBOT---\n Q bot 运行正常 \n 版本: 2.0 pre \n © 融玩文化 | 无尽创意MCUNC" return self.status_message
elif command.startswith(" /"): elif command.startswith(" /"):
return "指令不存在,输入/help查看帮助" return self.command_notfound_message
elif command == "": elif command == "":
return "你似乎没有提供想和我聊的内容喵~ \n 直接输入聊天内容即可" return self.command_isnone_message
elif command is None: elif command is None:
return "你似乎没有提供想和我聊的内容喵~ \n 直接输入聊天内容即可" return self.command_isnone_message
else: else:
cat = AiCat(command, self.user_id,f"private:{self.user_id}") answer = ai_model.main(command, self.user_id, f"private:{self.user_id}")
answer = AiCat.main(cat)
return answer return answer

View File

@@ -1,14 +1,13 @@
import logging from model.logger import setup_logger
import toml import toml
logger = setup_logger()
class request: class request:
def __init__(self,msg): def __init__(self,msg):
self.time = msg.time
self.self_id = msg.self_id self.self_id = msg.self_id
self.request_type = msg.request_type self.request_type = msg.request_type
self.sub_type = msg.sub_type self.post_type = msg.post_type
self.group_id = msg.group_id
self.user_id = msg.user_id
self.comment = msg.comment self.comment = msg.comment
self.flag = msg.flag self.flag = msg.flag
@@ -28,15 +27,5 @@ class request:
friend_auto = config.get("friend_auto") friend_auto = config.get("friend_auto")
return friend_auto return friend_auto
except Exception as e: except Exception as e:
logging.error(f"读取配置文件错误:{e}") logger.error(f"读取配置文件错误:{e}")
return False return False
def get_allow_group(self):
try:
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
allow_group = config.get("allowed_groups")
return allow_group
except Exception as e:
logging.error(f"读取配置文件错误:{e}")
return []

73
main.py
View File

@@ -1,31 +1,42 @@
from ncatbot.core.notice import NoticeMessage import os
from ncatbot.core import BotClient, Request os.environ['NCATBOT_CONFIG_PATH'] = 'ncatbot.yaml'
import logging import toml
import asyncio from ncatbot.core.event import PrivateMessageEvent, GroupMessageEvent, NoticeEvent, RequestEvent
from ncatbot.utils import ncatbot_config
from ncatbot.core import BotClient
from model.logger import setup_logger
from control.group import group from control.group import group
from control.private import private from control.private import private
from control.request import request from control.request import request
from control.notice import notice from control.notice import notice
import toml
logger = setup_logger()
with open("./config.toml", "r", encoding="utf-8") as f: with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f) config = toml.load(f)
bt_uin = config.get("bot_qq") bt_uin = config.get("bot_qq")
root = config.get("root_qq") root = config.get("root_qq")
ws_uri = config.get("ws_uri") ws_uri = config.get("ws_uri")
web_uri = config.get("web_uri") webui_uri = config.get("webui_uri")
webui_token = config.get("webui_token") webui_token = config.get("webui_token")
ws_token = config.get("ws_token") ws_token = config.get("ws_token")
ws_listen_ip = config.get("ws_listen_ip") ws_listen_ip = config.get("ws_listen_ip")
remote_mode = config.get("remote_mode") enable_webui_interaction = config.get("enable_webui_interaction")
ncatbot_config.set_bot_uin(bt_uin)
ncatbot_config.set_root(root)
ncatbot_config.set_webui_uri(webui_uri)
ncatbot_config.set_webui_token(webui_token)
ncatbot_config.set_ws_uri(ws_uri)
ncatbot_config.set_ws_token(ws_token)
ncatbot_config.set_ws_listen_ip(ws_listen_ip)
ncatbot_config.enable_webui_interaction = enable_webui_interaction
bot = BotClient() bot = BotClient()
api = bot.run_blocking(bt_uin=bt_uin, root=root, ws_uri=ws_uri, web_uri=web_uri, webui_token=webui_token, ws_token=ws_token, ws_listen_ip=ws_listen_ip, remote_mode=remote_mode)
@bot.group_event() @bot.on_group_message()
async def on_group_message(msg): async def on_group_message(msg: GroupMessageEvent):
logging.info(f"收到消息:{msg.raw_message},来自{msg.group_id}群聊{msg.user_id}用户") logger.info(f"收到消息:{msg.raw_message},来自{msg.group_id}群聊{msg.user_id}用户")
if msg.user_id == 2854196310: # qq管家防止刷屏。bot大战请看 if msg.user_id == 2854196310: # qq管家防止刷屏。bot大战请看
pass pass
else: else:
@@ -34,12 +45,12 @@ async def on_group_message(msg):
if return_message is None: if return_message is None:
return return
else: else:
logging.info(f"返回消息:{return_message}") # logger.info(f"返回消息:{return_message}")
await bot.api.post_group_msg(group_id=msg.group_id, text=return_message, reply=msg.message_id) await msg.reply(text=return_message)
@bot.private_event() @bot.on_private_message()
async def on_private_message(msg): async def on_private_message(msg: PrivateMessageEvent):
logging.info(f"收到消息:{msg.raw_message},来自{msg.user_id}用户") logger.info(f"收到消息:{msg.raw_message},来自{msg.user_id}用户")
if msg.user_id == 2854196310: # qq管家防止刷屏。 if msg.user_id == 2854196310: # qq管家防止刷屏。
pass pass
else: else:
@@ -48,36 +59,36 @@ async def on_private_message(msg):
if return_message is None: if return_message is None:
return return
else: else:
logging.info(f"返回消息:{return_message}") # logger.info(f"返回消息:{return_message}")
await bot.api.post_private_msg(user_id=msg.user_id, text=return_message, reply=msg.message_id) await msg.reply(text=return_message)
@bot.request_event() @bot.on_request(filter='friend')
async def on_request_event(msg: Request): async def on_request_event(msg: RequestEvent):
logging.info(f"收到request事件{msg.request_type},来自{msg.group_id}群聊,{msg.user_id}用户,验证消息:{msg.comment}") logger.info(f"收到request事件{msg.request_type},来自{msg.group_id}群聊,{msg.user_id}用户,验证消息:{msg.comment}")
logger.info(f"收到request请求来自{msg.self_id}")
ctrl = request(msg) ctrl = request(msg)
accept_friend_application = ctrl.main() accept_friend_application = ctrl.main()
if accept_friend_application is True: if accept_friend_application is True:
await msg.reply(True, comment="请求已通过") await msg.reply(True)
logging.info("请求已通过") # logger.info("请求已通过")
else: else:
await msg.reply(False, comment="请求被拒绝") await msg.reply(False)
logging.info("请求被拒绝") # logger.info("请求被拒绝")
@bot.notice_event() @bot.on_notice()
async def on_notice_event(msg: NoticeMessage): async def on_notice_event(msg: NoticeEvent):
logging.info(f"收到notice事件{msg['notice_type']},来自{msg['user_id']}用户") logger.info(f"收到notice事件{msg['notice_type']},来自{msg['user_id']}用户")
ctrl = notice(msg) ctrl = notice(msg)
return_message = ctrl.main() return_message = ctrl.main()
if return_message is None: if return_message is None:
return return
else: else:
logging.info(f"返回消息:{return_message}") # logger.info(f"返回消息:{return_message}")
await bot.api.post_group_msg(group_id=msg["group_id"], text=return_message) await bot.api.post_group_msg(group_id=msg["group_id"], text=return_message)
bot.run_frontend()
asyncio.get_event_loop().run_forever()

View File

@@ -1,144 +0,0 @@
import requests
import json
import logging
import toml
import sqlite3
class AiCat:
def __init__(self,message,qid,group_openid):
self.message = message
self.qid = qid
self.group_openid = group_openid
self.query = f"<qid>{self.qid}</qid>{self.message}"
def main(self):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
# 获取所需字段
ai_service = config.get("ai_service")
# dify
dify_ip = config.get("dify_ip")
dify_token = config.get("dify_token")
# xyit
xyit_ip = config.get("xyit_ip")
xyit_appID = config.get("xyit_appID")
xyit_appKEY = config.get("xyit_appKEY")
xyit_model = config.get("xyit_model")
self.init_db()
uuid = self.get_uuid()
if uuid == "":
logging.info("未找到 UUID")
else:
logging.info(f"找到 UUID: {uuid}")
if ai_service == "dify":
# API URL
url = f"https://{dify_ip}/v1/chat-messages" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": dify_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"query": self.query, # 用户输入/提问内容
"inputs": {}, # App 定义的变量值(默认为空)
"response_mode": "blocking", # 流式模式或阻塞模式
"user": "QBotAPI", # 用户标识,需保证唯一性
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
"files": [], # 文件列表(选填),适用于文件结合文本理解
"auto_generate_name": True # (选填)自动生成标题,默认为 True
}
logging.info("请求平台dify")
elif ai_service == "xyit":
url = f"http://{xyit_ip}/{xyit_model}/" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"appID": xyit_appID , # 替换为你的 API 密钥
"appKEY": xyit_appKEY
}
# 请求体
payload = {
"query": self.query, # 用户输入/提问内容
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
}
logging.info("请求平台xyit")
else:
logging.error("未配置ai平台")
return "服务器繁忙,请稍后再逝"
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logging.info("请求成功!返回结果:")
response_data = response.json()
print(response_data) # test
# 存储uuid
if uuid == "":
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO groups (group_openid, uuid) VALUES (?, ?)",
(self.group_openid, response_data.get("conversation_id")))
except sqlite3.Error as e:
logging.error(f"数据库插入错误: {e}")
# 提取 answer 值
answer = response_data.get("answer","服务器繁忙,请稍后再逝")
return answer
else:
logging.info(f"请求失败!状态码: {response.status_code}")
logging.info(f"错误信息: {response.text}") # 打印错误信息
return "服务器繁忙,请稍后再逝"
except Exception as e:
logging.info(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"
def get_uuid(self):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT uuid FROM groups WHERE group_openid = ?", (self.group_openid,))
result = cursor.fetchone()
return result[0] if result else ""
except sqlite3.Error as e:
logging.error(f"数据库查询错误: {e}")
return ""
def init_db(self):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_openid TEXT UNIQUE NOT NULL,
uuid TEXT NOT NULL
)
''')
conn.commit()
except sqlite3.Error as e:
logging.error(f"数据库初始化失败: {e}")

View File

@@ -1,14 +1,29 @@
import logging from model.logger import setup_logger
import toml import toml
import sqlite3 import sqlite3
logger = setup_logger()
class Clear: class Clear:
def __init__(self, user_id,group_id): def __init__(self, user_id,group_id):
self.user_id = user_id self.user_id = user_id
self.group_id = group_id self.group_id = group_id
def main(self): def main(self):
with open("config.toml", "r", encoding="utf-8"):
config = toml.load("config.toml")
clean_all_success_message = config.get("clean_all_success_message")
clean_one_success_message = config.get("clean_one_success_message")
clean_one_notfound_message = config.get("clean_one_notfound_message")
clean_fail_message = config.get("clean_fail_message")
clean_nopermissoin_message = config.get("clean_nopermissoin_message")
if "{group_id}" in clean_one_success_message:
clean_one_success_message = clean_one_success_message.replace("{group_id}", self.group_id)
if "{group_id}" in clean_one_notfound_message:
clean_one_notfound_message = clean_one_notfound_message.replace("{group_id}", self.group_id)
if self.is_root(): if self.is_root():
if self.group_id == "all": if self.group_id == "all":
try: try:
@@ -17,13 +32,13 @@ class Clear:
# 清空表中所有数据 # 清空表中所有数据
cursor.execute("DELETE FROM groups;") # 假设表名为 uuid_table请根据实际表名修改 cursor.execute("DELETE FROM groups;") # 假设表名为 uuid_table请根据实际表名修改
conn.commit() conn.commit()
logging.info("✅ 数据库表已成功清空") logger.info("已清空所有群组数据")
return "✅ 已清空所有群组数据" return clean_all_success_message
except sqlite3.Error as e: except sqlite3.Error as e:
if 'conn' in locals(): if 'conn' in locals():
conn.close() conn.close()
logging.error(f"数据库操作失败: {e}") logger.error(f"数据库操作失败: {e}")
return "❌ 数据库操作失败" return clean_fail_message
else: else:
try: try:
with sqlite3.connect("uuid.db") as conn: with sqlite3.connect("uuid.db") as conn:
@@ -32,19 +47,19 @@ class Clear:
cursor.execute("DELETE FROM groups WHERE group_openid = ?", (self.group_id,)) cursor.execute("DELETE FROM groups WHERE group_openid = ?", (self.group_id,))
conn.commit() conn.commit()
if cursor.rowcount > 0: if cursor.rowcount > 0:
logging.info(f"✅ 已成功删除 group_id = {self.group_id} 的数据") logger.info(f"✅ 已成功删除 group_id = {self.group_id} 的数据")
return f"✅ 已成功删除 group_id = {self.group_id} 的数据" return clean_one_success_message
else: else:
logging.info(f"⚠️ 没有找到 group_id = {self.group_id} 的数据") logger.info(f"⚠️ 没有找到 group_id = {self.group_id} 的数据")
return f"⚠️ 没有找到 group_id = {self.group_id} 的数据" return clean_one_notfound_message
except sqlite3.Error as e: except sqlite3.Error as e:
if 'conn' in locals(): if 'conn' in locals():
conn.close() conn.close()
logging.error(f"❌ 数据库操作失败: {e}") logger.error(f"❌ 数据库操作失败: {e}")
return "❌ 数据库操作失败" return clean_fail_message
else: else:
return "你不是管理员哦喵~" return clean_nopermissoin_message
def is_root(self): def is_root(self):
with open("./config.toml", "r", encoding="utf-8") as f: with open("./config.toml", "r", encoding="utf-8") as f:

35
model/ai_model.py Normal file
View File

@@ -0,0 +1,35 @@
import toml
from model.logger import setup_logger
logger = setup_logger()
def main(message,qid,group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
ai_service = config.get("ai_service")
enable_qid = config.get("enable_qid")
qid_prefix = config.get("qid_prefix")
qid_suffix = config.get("qid_suffix")
error_message = config.get("error_message")
if enable_qid:
message = f"{qid_prefix}{qid}{qid_suffix}{message}"
else:
pass
if ai_service == "dify":
from model.ai_models import dify
return dify.main(message, group_openid)
elif ai_service == "mcunc":
from model.ai_models import mcunc
return mcunc.main(message, group_openid)
elif ai_service == "aliyun":
from model.ai_models import aliyun
return aliyun.main(message, group_openid)
else:
logger.error("未配置ai_service")
return error_message

59
model/ai_models/aliyun.py Normal file
View File

@@ -0,0 +1,59 @@
import toml
from model.logger import setup_logger
from http import HTTPStatus
from dashscope import Application
from model.sql_tools import init_db
from model.sql_tools import get_uuid
from model.sql_tools import add_uuid
logger = setup_logger()
def main(message, group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
error_message = config.get("error_message")
# aliyun
aliyun_app_id = config.get("aliyun_app_id")
aliyun_api_key = config.get("aliyun_api_key")
init_db()
uuid = get_uuid(group_openid)
if uuid == "":
logger.info("未找到 UUID")
else:
logger.info(f"找到 UUID: {uuid}")
logger.info("请求平台aliyun")
# 发送请求
response = Application.call(
# 若没有配置环境变量可用百炼API Key将下行替换为api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中以减少API Key泄露风险。
api_key=aliyun_api_key,
app_id=aliyun_app_id,
prompt= message,
session_id = group_openid
)
# 检查响应状态码
if response.status_code != HTTPStatus.OK:
logger.info(f"请求失败!状态码: {response.status_code}")
logger.info(f"错误信息: {response.message}") # 打印错误信息
return error_message
else:
logger.info("请求成功!返回结果:")
print(f"message: {response.output.text}, session_id: {response.output.session_id}") # test
# 存储uuid
if uuid == "":
uuid = response.output.session_id
add_uuid(group_openid, uuid)
# 提取 answer 值
answer = response.output.text
return answer

77
model/ai_models/dify.py Normal file
View File

@@ -0,0 +1,77 @@
import requests
import json
from model.logger import setup_logger
import toml
from model.sql_tools import init_db
from model.sql_tools import get_uuid
from model.sql_tools import add_uuid
logger = setup_logger()
def main(message, group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
error_message = config.get("error_message")
# dify
dify_ip = config.get("dify_ip")
dify_token = config.get("dify_token")
init_db()
uuid = get_uuid(group_openid)
if uuid == "":
logger.info("未找到 UUID")
else:
logger.info(f"找到 UUID: {uuid}")
url = f"{dify_ip}/v1/chat-messages" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": dify_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"query": message, # 用户输入/提问内容
"inputs": {}, # App 定义的变量值(默认为空)
"response_mode": "blocking", # 流式模式或阻塞模式
"user": "chatbot_napcat", # 用户标识,需保证唯一性
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
"files": [], # 文件列表(选填),适用于文件结合文本理解
"auto_generate_name": True # (选填)自动生成标题,默认为 True
}
logger.info("请求平台dify")
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logger.info("请求成功!返回结果:")
response_data = response.json()
print(response_data) # test
# 存储uuid
if uuid == "":
uuid = response_data.get("conversation_id")
add_uuid(group_openid, uuid)
# 提取 answer 值
answer = response_data.get("answer", error_message)
return answer
else:
logger.info(f"请求失败!状态码: {response.status_code}")
logger.info(f"错误信息: {response.text}") # 打印错误信息
return error_message
except Exception as e:
logger.info(f"请求过程中出现异常: {e}")
return error_message

75
model/ai_models/mcunc.py Normal file
View File

@@ -0,0 +1,75 @@
import requests
import json
from model.logger import setup_logger
import toml
from model.sql_tools import init_db
from model.sql_tools import get_uuid
from model.sql_tools import add_uuid
logger = setup_logger()
def main(message, group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
error_message = config.get("error_message")
# mcunc
mcunc_ip = config.get("mcunc_ip")
mcunc_appID = config.get("mcunc_appID")
mcunc_appKEY = config.get("mcunc_appKEY")
mcunc_model = config.get("mcunc_model")
init_db()
uuid = get_uuid(group_openid)
if uuid == "":
logger.info("未找到 UUID")
else:
logger.info(f"找到 UUID: {uuid}")
url = f"{mcunc_ip}/models/{mcunc_model}/" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"appID": mcunc_appID, # 替换为你的 API 密钥
"appKEY": mcunc_appKEY
}
# 请求体
payload = {
"query": message, # 用户输入/提问内容
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
}
logger.info("请求平台mcunc")
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logger.info("请求成功!返回结果:")
response_data = response.json()
print(response_data) # test
# 存储uuid
if uuid == "":
uuid = response_data.get("conversation_id")
add_uuid(group_openid, uuid)
# 提取 answer 值
answer = response_data.get("answer", error_message)
return answer
else:
logger.info(f"请求失败!状态码: {response.status_code}")
logger.info(f"错误信息: {response.text}") # 打印错误信息
return error_message
except Exception as e:
logger.info(f"请求过程中出现异常: {e}")
return error_message

61
model/logger.py Normal file
View File

@@ -0,0 +1,61 @@
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
from pathlib import Path
def setup_logger(log_file=None, log_level=logging.INFO, max_bytes=10485760, backup_count=5):
"""
初始化日志记录器,将日志按指定格式输出并记录到文件
Args:
log_file (str): 日志文件路径默认为None会自动生成以当天日期命名的日志文件
log_level: 日志级别
max_bytes (int): 单个日志文件最大字节数
backup_count (int): 保留的备份日志文件数量
Returns:
logging.Logger: 配置好的日志记录器
"""
# 创建logger
logger = logging.getLogger('chatbot-napcat')
logger.setLevel(log_level)
logger.propagate = False
# 避免重复添加handler
if logger.handlers:
return logger
# 创建logs目录如果不存在
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
# 如果未指定日志文件名,则使用当天日期命名
if log_file is None:
today = datetime.now().strftime('%Y-%m-%d')
log_file = log_dir / f'{today}.log'
# 创建格式化器
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s'
)
# 创建文件处理器(带轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8'
)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
# 添加处理器到logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger

42
model/sql_tools.py Normal file
View File

@@ -0,0 +1,42 @@
import sqlite3
from model.logger import setup_logger
logger = setup_logger()
def init_db():
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS groups
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_openid TEXT UNIQUE NOT NULL,
uuid TEXT NOT NULL
)
''')
conn.commit()
except sqlite3.Error as e:
logger.error(f"数据库初始化失败: {e}")
def get_uuid(group_openid):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT uuid FROM groups WHERE group_openid = ?", (group_openid,))
result = cursor.fetchone()
return result[0] if result else ""
except sqlite3.Error as e:
logger.error(f"数据库查询错误: {e}")
return ""
def add_uuid(group_openid, uuid):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO groups (group_openid, uuid) VALUES (?, ?)",
(group_openid, uuid))
except sqlite3.Error as e:
logger.error(f"数据库插入错误: {e}")

25
ncatbot.yaml Normal file
View File

@@ -0,0 +1,25 @@
root: '1453599706'
bt_uin: '3917995084'
enable_webui_interaction: null
debug: false
github_proxy: null
check_ncatbot_update: true
skip_ncatbot_install_check: false
websocket_timeout: 15
napcat:
ws_uri: ws://sz.mcunc.site:13001
ws_token: 79_t&_e@WJ#HgXXp
ws_listen_ip: 0.0.0.0
webui_uri: http://sz.mcunc.site:26099
webui_token: U16z8RnYNls]W[N|
enable_webui: true
check_napcat_update: false
stop_napcat: false
remote_mode: false
report_self_message: false
report_forward_message_detail: true
plugin:
plugins_dir: plugins
plugin_whitelist: []
plugin_blacklist: []
skip_plugin_load: false

BIN
uuid.db

Binary file not shown.