新版本重构了ai模型提供商逻辑,新增MCUNC,阿里云百炼ai提供商,支持语言可配置,优化日志输出格式

This commit is contained in:
2025-09-24 17:55:33 +08:00
parent 0ddede55b5
commit 4d9e1bf355
15 changed files with 563 additions and 207 deletions

View File

@@ -1,144 +0,0 @@
import requests
import json
import logging
import toml
import sqlite3
class AiCat:
def __init__(self,message,qid,group_openid):
self.message = message
self.qid = qid
self.group_openid = group_openid
self.query = f"<qid>{self.qid}</qid>{self.message}"
def main(self):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
# 获取所需字段
ai_service = config.get("ai_service")
# dify
dify_ip = config.get("dify_ip")
dify_token = config.get("dify_token")
# xyit
xyit_ip = config.get("xyit_ip")
xyit_appID = config.get("xyit_appID")
xyit_appKEY = config.get("xyit_appKEY")
xyit_model = config.get("xyit_model")
self.init_db()
uuid = self.get_uuid()
if uuid == "":
logging.info("未找到 UUID")
else:
logging.info(f"找到 UUID: {uuid}")
if ai_service == "dify":
# API URL
url = f"https://{dify_ip}/v1/chat-messages" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": dify_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"query": self.query, # 用户输入/提问内容
"inputs": {}, # App 定义的变量值(默认为空)
"response_mode": "blocking", # 流式模式或阻塞模式
"user": "QBotAPI", # 用户标识,需保证唯一性
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
"files": [], # 文件列表(选填),适用于文件结合文本理解
"auto_generate_name": True # (选填)自动生成标题,默认为 True
}
logging.info("请求平台dify")
elif ai_service == "xyit":
url = f"https://{xyit_ip}/models/{xyit_model}/" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"appID": xyit_appID , # 替换为你的 API 密钥
"appKEY": xyit_appKEY
}
# 请求体
payload = {
"query": self.query, # 用户输入/提问内容
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
}
logging.info("请求平台xyit")
else:
logging.error("未配置ai平台")
return "服务器繁忙,请稍后再逝"
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logging.info("请求成功!返回结果:")
response_data = response.json()
print(response_data) # test
# 存储uuid
if uuid == "":
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO groups (group_openid, uuid) VALUES (?, ?)",
(self.group_openid, response_data.get("conversation_id")))
except sqlite3.Error as e:
logging.error(f"数据库插入错误: {e}")
# 提取 answer 值
answer = response_data.get("answer","服务器繁忙,请稍后再逝")
return answer
else:
logging.info(f"请求失败!状态码: {response.status_code}")
logging.info(f"错误信息: {response.text}") # 打印错误信息
return "服务器繁忙,请稍后再逝"
except Exception as e:
logging.info(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"
def get_uuid(self):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT uuid FROM groups WHERE group_openid = ?", (self.group_openid,))
result = cursor.fetchone()
return result[0] if result else ""
except sqlite3.Error as e:
logging.error(f"数据库查询错误: {e}")
return ""
def init_db(self):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS groups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_openid TEXT UNIQUE NOT NULL,
uuid TEXT NOT NULL
)
''')
conn.commit()
except sqlite3.Error as e:
logging.error(f"数据库初始化失败: {e}")

View File

@@ -1,14 +1,29 @@
import logging
from model.logger import setup_logger
import toml
import sqlite3
logger = setup_logger()
class Clear:
def __init__(self, user_id,group_id):
self.user_id = user_id
self.group_id = group_id
def main(self):
with open("config.toml", "r", encoding="utf-8"):
config = toml.load("config.toml")
clean_all_success_message = config.get("clean_all_success_message")
clean_one_success_message = config.get("clean_one_success_message")
clean_one_notfound_message = config.get("clean_one_notfound_message")
clean_fail_message = config.get("clean_fail_message")
clean_nopermissoin_message = config.get("clean_nopermissoin_message")
if "{group_id}" in clean_one_success_message:
clean_one_success_message.replace("{group_id}", self.group_id)
if "{group_id}" in clean_one_notfound_message:
clean_one_notfound_message.replace("{group_id}", self.group_id)
if self.is_root():
if self.group_id == "all":
try:
@@ -17,13 +32,13 @@ class Clear:
# 清空表中所有数据
cursor.execute("DELETE FROM groups;") # 假设表名为 uuid_table请根据实际表名修改
conn.commit()
logging.info("✅ 数据库表已成功清空")
return "✅ 已清空所有群组数据"
logger.info("已清空所有群组数据")
return clean_all_success_message
except sqlite3.Error as e:
if 'conn' in locals():
conn.close()
logging.error(f"数据库操作失败: {e}")
return "❌ 数据库操作失败"
logger.error(f"数据库操作失败: {e}")
return clean_fail_message
else:
try:
with sqlite3.connect("uuid.db") as conn:
@@ -32,19 +47,19 @@ class Clear:
cursor.execute("DELETE FROM groups WHERE group_openid = ?", (self.group_id,))
conn.commit()
if cursor.rowcount > 0:
logging.info(f"✅ 已成功删除 group_id = {self.group_id} 的数据")
return f"✅ 已成功删除 group_id = {self.group_id} 的数据"
logger.info(f"✅ 已成功删除 group_id = {self.group_id} 的数据")
return clean_one_success_message
else:
logging.info(f"⚠️ 没有找到 group_id = {self.group_id} 的数据")
return f"⚠️ 没有找到 group_id = {self.group_id} 的数据"
logger.info(f"⚠️ 没有找到 group_id = {self.group_id} 的数据")
return clean_one_notfound_message
except sqlite3.Error as e:
if 'conn' in locals():
conn.close()
logging.error(f"❌ 数据库操作失败: {e}")
return "❌ 数据库操作失败"
logger.error(f"❌ 数据库操作失败: {e}")
return clean_fail_message
else:
return "你不是管理员哦喵~"
return clean_nopermissoin_message
def is_root(self):
with open("./config.toml", "r", encoding="utf-8") as f:

35
model/ai_model.py Normal file
View File

@@ -0,0 +1,35 @@
import toml
from model.logger import setup_logger
logger = setup_logger()
def main(message,qid,group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
ai_service = config.get("ai_service")
enable_qid = config.get("enable_qid")
qid_prefix = config.get("qid_prefix")
qid_suffix = config.get("qid_suffix")
error_message = config.get("error_message")
if enable_qid:
message = f"{qid_prefix}{qid}{qid_suffix}{message}"
else:
pass
if ai_service == "dify":
from model.ai_models import dify
return dify.main(message, group_openid)
elif ai_service == "mcunc":
from model.ai_models import mcunc
return mcunc.main(message, group_openid)
elif ai_service == "xyit":
from model.ai_models import xyit
return xyit.main(message, group_openid)
else:
logger.error("未配置ai_service")
return error_message

57
model/ai_models/aliyun.py Normal file
View File

@@ -0,0 +1,57 @@
import toml
from model.logger import setup_logger
from http import HTTPStatus
from dashscope import Application
from model.sql_tools import init_db
from model.sql_tools import get_uuid
from model.sql_tools import add_uuid
logger = setup_logger()
def main(message, group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
error_message = config.get("error_message")
# aliyun
aliyun_app_id = config.get("aliyun_app_id")
aliyun_api_key = config.get("aliyun_api_key")
init_db()
uuid = get_uuid(group_openid)
if uuid == "":
logger.info("未找到 UUID")
else:
logger.info(f"找到 UUID: {uuid}")
# 发送请求
response = Application.call(
# 若没有配置环境变量可用百炼API Key将下行替换为api_key="sk-xxx"。但不建议在生产环境中直接将API Key硬编码到代码中以减少API Key泄露风险。
api_key=aliyun_api_key,
app_id=aliyun_app_id,
prompt= message,
session_id = group_openid
)
# 检查响应状态码
if response.status_code != HTTPStatus.OK:
logger.info(f"请求失败!状态码: {response.status_code}")
logger.info(f"错误信息: {response.message}") # 打印错误信息
return error_message
else:
logger.info("请求成功!返回结果:")
print(f"message: {response.output.text}, session_id: {response.output.session_id}") # test
# 存储uuid
if uuid == "":
uuid = response.output.session_id
add_uuid(group_openid, uuid)
# 提取 answer 值
answer = response.output.text
return answer

77
model/ai_models/dify.py Normal file
View File

@@ -0,0 +1,77 @@
import requests
import json
from model.logger import setup_logger
import toml
from model.sql_tools import init_db
from model.sql_tools import get_uuid
from model.sql_tools import add_uuid
logger = setup_logger()
def main(message, group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
error_message = config.get("error_message")
# dify
dify_ip = config.get("dify_ip")
dify_token = config.get("dify_token")
init_db()
uuid = get_uuid(group_openid)
if uuid == "":
logging.info("未找到 UUID")
else:
logging.info(f"找到 UUID: {uuid}")
url = f"{dify_ip}/v1/chat-messages" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": dify_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"query": message, # 用户输入/提问内容
"inputs": {}, # App 定义的变量值(默认为空)
"response_mode": "blocking", # 流式模式或阻塞模式
"user": "chatbot_napcat", # 用户标识,需保证唯一性
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
"files": [], # 文件列表(选填),适用于文件结合文本理解
"auto_generate_name": True # (选填)自动生成标题,默认为 True
}
logging.info("请求平台dify")
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logging.info("请求成功!返回结果:")
response_data = response.json()
print(response_data) # test
# 存储uuid
if uuid == "":
uuid = response_data.get("conversation_id")
add_uuid(group_openid, uuid)
# 提取 answer 值
answer = response_data.get("answer", error_message)
return answer
else:
logging.info(f"请求失败!状态码: {response.status_code}")
logging.info(f"错误信息: {response.text}") # 打印错误信息
return error_message
except Exception as e:
logging.info(f"请求过程中出现异常: {e}")
return error_message

75
model/ai_models/mcunc.py Normal file
View File

@@ -0,0 +1,75 @@
import requests
import json
from model.logger import setup_logger
import toml
from model.sql_tools import init_db
from model.sql_tools import get_uuid
from model.sql_tools import add_uuid
logger = setup_logger()
def main(message, group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
error_message = config.get("error_message")
# mcunc
mcunc_ip = config.get("mcunc_ip")
mcunc_appID = config.get("mcunc_appID")
mcunc_appKEY = config.get("mcunc_appKEY")
mcunc_model = config.get("mcunc_model")
init_db()
uuid = get_uuid(group_openid)
if uuid == "":
logger.info("未找到 UUID")
else:
logger.info(f"找到 UUID: {uuid}")
url = f"{mcunc_ip}/models/{mcunc_model}/" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"appID": mcunc_appID, # 替换为你的 API 密钥
"appKEY": mcunc_appKEY
}
# 请求体
payload = {
"query": message, # 用户输入/提问内容
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
}
logger.info("请求平台mcunc")
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logger.info("请求成功!返回结果:")
response_data = response.json()
print(response_data) # test
# 存储uuid
if uuid == "":
uuid = response_data.get("conversation_id")
add_uuid(group_openid, uuid)
# 提取 answer 值
answer = response_data.get("answer", error_message)
return answer
else:
logger.info(f"请求失败!状态码: {response.status_code}")
logger.info(f"错误信息: {response.text}") # 打印错误信息
return error_message
except Exception as e:
logger.info(f"请求过程中出现异常: {e}")
return error_message

76
model/ai_models/xyit.py Normal file
View File

@@ -0,0 +1,76 @@
import requests
import json
from model.logger import setup_logger
import toml
from model.sql_tools import init_db
from model.sql_tools import get_uuid
from model.sql_tools import add_uuid
logger = setup_logger()
def main(message, group_openid):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
error_message = config.get("error_message")
# xyit
xyit_ip = config.get("xyit_ip")
xyit_appID = config.get("xyit_appID")
xyit_appKEY = config.get("xyit_appKEY")
xyit_model = config.get("xyit_model")
init_db()
uuid = get_uuid(group_openid)
if uuid == "":
logger.info("未找到 UUID")
else:
logger.info(f"找到 UUID: {uuid}")
url = f"{xyit_ip}/models/{xyit_model}/" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"appID": xyit_appID, # 替换为你的 API 密钥
"appKEY": xyit_appKEY
}
# 请求体
payload = {
"query": message, # 用户输入/提问内容
"conversation_id": uuid, # (选填)会话 ID继续对话时需要传入
}
logger.info("请求平台xyit")
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logger.info("请求成功!返回结果:")
response_data = response.json()
print(response_data) # test
# 存储uuid
if uuid == "":
uuid = response_data.get("conversation_id")
add_uuid(group_openid, uuid)
# 提取 answer 值
answer = response_data.get("answer", error_message)
return answer
else:
logger.info(f"请求失败!状态码: {response.status_code}")
logger.info(f"错误信息: {response.text}") # 打印错误信息
return error_message
except Exception as e:
logger.info(f"请求过程中出现异常: {e}")
return error_message

60
model/logger.py Normal file
View File

@@ -0,0 +1,60 @@
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
from pathlib import Path
def setup_logger(log_file=None, log_level=logging.INFO, max_bytes=10485760, backup_count=5):
"""
初始化日志记录器,将日志按指定格式输出并记录到文件
Args:
log_file (str): 日志文件路径默认为None会自动生成以当天日期命名的日志文件
log_level: 日志级别
max_bytes (int): 单个日志文件最大字节数
backup_count (int): 保留的备份日志文件数量
Returns:
logging.Logger: 配置好的日志记录器
"""
# 创建logger
logger = logging.getLogger('mcsmapi')
logger.setLevel(log_level)
# 避免重复添加handler
if logger.handlers:
return logger
# 创建logs目录如果不存在
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
# 如果未指定日志文件名,则使用当天日期命名
if log_file is None:
today = datetime.now().strftime('%Y-%m-%d')
log_file = log_dir / f'{today}.log'
# 创建格式化器
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] [%(filename)s:%(lineno)d] - %(message)s'
)
# 创建文件处理器(带轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8'
)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
# 添加处理器到logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger

42
model/sql_tools.py Normal file
View File

@@ -0,0 +1,42 @@
import sqlite3
from model.logger import setup_logger
logger = setup_logger()
def init_db():
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS groups
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
group_openid TEXT UNIQUE NOT NULL,
uuid TEXT NOT NULL
)
''')
conn.commit()
except sqlite3.Error as e:
logger.error(f"数据库初始化失败: {e}")
def get_uuid(group_openid):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT uuid FROM groups WHERE group_openid = ?", (group_openid,))
result = cursor.fetchone()
return result[0] if result else ""
except sqlite3.Error as e:
logger.error(f"数据库查询错误: {e}")
return ""
def add_uuid(group_openid, uuid):
try:
with sqlite3.connect("uuid.db") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO groups (group_openid, uuid) VALUES (?, ?)",
(group_openid, uuid))
except sqlite3.Error as e:
logger.error(f"数据库插入错误: {e}")