first commit

This commit is contained in:
2025-07-05 17:26:12 +08:00
commit cef359da64
39 changed files with 820 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
napcat/
logs/
.idea/
.venv/
data/

22
config.toml Normal file
View File

@@ -0,0 +1,22 @@
#napcat配置
bot_qq = 123456 #机器人q号
root_qq = 1234567 # 管理员q号
ws_uri = "ws://localhost:3001" # ws 地址, 可自定义端口, 默认 3001
webui_uri = "http://localhost:6099" # webui 地址, 可自定义端口, 默认 6099
webui_token = "napcat" # webui 令牌, 默认 napcat
ws_token = "" # ws_uri 令牌, 默认留空
ws_listen_ip = "localhost" # ws_uri 监听 ip, 默认 localhost 监听本机,监听全部则配置 0.0.0.0
remote_mode = false # 是否远程模式, 即 NapCat 服务不在本机运行 psncatbot官方已废弃该参数
# 功能配置
allowed_groups = "all" # 授权群聊all为全部eg[123456789, 987654321]
allowed_users = "all" # 授权用户all为全部eg[123456789, 987654321]
velocity_ip = "127.0.0.1" # velocity ip
velocity_port = 8080 # velocity port
velocity_token = "your velocity_token" #`velocity token
friend_auto = true # 好友自动同意
group_auto = true # 无作用
group_welcome = true # 入群欢迎
group_welcome_message = "!at 欢迎加入本群,使用@bot /help获取此bot帮助" # 入群消息 !at 为@加群用户
group_leave = true # 退群提醒
group_leave_message = "用户{userid}退群了" # 退群消息 {userid}为退群用户id

0
control/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

150
control/group.py Normal file
View File

@@ -0,0 +1,150 @@
from model.AiCat import AiCat
from model.McList import McList
from model.McFind import McFind
from model.McHh import McHh
from model.McBind import McBind
import toml
import logging
class group:
def __init__(self, msg):
self.user_id = msg.user_id
self.group_id = msg.group_id
self.message_id = msg.message_id
self.message_type = msg.message_type
self.raw_message = msg.raw_message
self.sender = msg.sender
self.message = msg.message
self.self_id = msg.self_id
self.time = msg.time
def main(self):
is_at = self.is_at()
if is_at is None:
return None
else:
permission = self.check_permission()
if permission is None:
return "服务器繁忙,请稍后再逝"
elif permission:
return self.menu(is_at)
else:
return "此bot未在该群启用"
def is_at(self):
for seg in self.message:
if seg['type'] == 'at' and seg['data'].get('qq') == str(self.self_id):
texts = [s['data']['text'].strip() for s in self.message if s['type'] == 'text']
full_text = ' '.join(texts).strip()
return full_text
return None
def check_permission(self):
try:
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
allowed_groups = config.get("allowed_groups", [])
except Exception as e:
logging.error(str(e))
return None
# 检查当前群是否在允许列表中
if allowed_groups == "all":
return True
elif self.group_id in allowed_groups:
return True
else:
return False
def menu(self,command):
if command.startswith("/help"):
return " \n /help -- 获取帮助 \n /list [服务器名/all] [页数] -- 列出服务器在线人数 \n /find <玩家名>-- 查找指定玩家是否在线 \n /bind <授权码> -- 绑定UNC游戏账户 \n /hh <喊话内容> -- 全服喊话 \n /cat <聊天内容> -- 与猫娘对话 \n /chat <提问内容> -- 智能问答(开发中,暂不可用) \n /status -- 查看bot状态 \n <必要参数> [可选参数]"
elif command.startswith("/list"):
# 去除前导空格并分割命令
parts = command.strip().split()
if len(parts) == 1:
# 只有 "/list" 命令
server = 'all'
page = '1'
elif len(parts) == 2:
# "/list server"
server = parts[1]
page = '1'
elif len(parts) == 3:
# "/list server page"
server = parts[1]
page = parts[2]
else:
return "参数错误 \n 格式:/list [服务器名/all] [页数]"
try:
page = int(page)
if page < 1:
return "请输入大于1的页数 \n 格式:/list [服务器名/all] [页数]"
except ValueError:
return "输入的页数需为整数 \n 格式 /list [服务器名/all] [页数]"
except Exception as e:
return "服务器繁忙,请稍后再逝"
page -= 1
list_player = McList(server, page)
answer = McList.main(list_player)
return answer
elif command.startswith("/find"):
# 排除 " /find "" /find"未传参情况
parts = command.split("/find ", 1)
if len(parts) > 1:
chat_content = parts[1].strip()
if chat_content:
find = McFind(chat_content)
answer = McFind.main(find)
return answer
else:
return "请提供查找的玩家名 \n 格式:/find <玩家名>"
else:
return "请提供查找的玩家名 \n 格式:/find <玩家名>"
elif command.startswith("/hh"):
# 排除“ /hh "” /hh“未传参情况
parts = command.split("/hh ", 1)
if len(parts) > 1:
chat_content = parts[1].strip()
if chat_content:
hh = McHh(self.user_id,chat_content)
answer = McHh.main(hh)
return answer
else:
return "请提供喊话内容 \n 格式:/hh <喊话内容>"
else:
return "请提供喊话内容 \n 格式:/hh <喊话内容>"
elif command.startswith("/chat"):
return "智能问答功能正在开发中,尚不可用!\n开发进度5%"
elif command.startswith("/cat"):
# # 排除 " /cat "" /cat"未传参情况
# parts = self.content.split(" /cat ", 1)
# if len(parts) > 1:
# chat_content = parts[1].strip()
# if chat_content:
# cat = AiCat(chat_content,self.id)
# answer = AiCat.main(cat)
# return answer
# else:
# return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>"
# else:
# return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>"
return "该内容已移动至猫娘qbot"
elif command.startswith("/bind"):
# 排除“ /bind "” /bind“未传参情况
parts = command.split("/bind ", 1)
if len(parts) > 1:
chat_content = parts[1].strip()
if chat_content:
bind = McBind(self.user_id, chat_content)
answer = McBind.main(bind)
return answer
else:
return "授权码不可为空 \n 格式:/bind <授权码> \n 在服务器内输入/bind指令获取授权码"
else:
return "授权码不可为空 \n 格式:/bind <授权码>"
elif command.startswith("/status"):
return "---MCUNC QBOT---\n Q bot 运行正常 \n 版本: 2.0 pre \n © 融玩文化 | 无尽创意MCUNC"
# return "\n Q Bot运行正常 \n Version: 0.2.1 pre-release \n © 融玩文化 | 无尽创意mcunc"
else:
return "功能不存在 \n 输入/help 查看帮助"

62
control/notice.py Normal file
View File

@@ -0,0 +1,62 @@
import toml
import logging
class notice:
def __init__(self,msg):
if msg["notice_type"] == "group_increase" or msg["notice_type"] == "group_decrease":
self.time = msg["time"]
self.self_id = msg["self_id"]
self.post_type = msg["post_type"]
self.notice_type = msg["notice_type"]
self.sub_type = msg["sub_type"]
self.group_id = msg["group_id"]
self.operator_id = msg["operator_id"]
self.user_id = msg["user_id"]
else:
pass
def main(self):
if self.notice_type == "group_increase":
return self.group_increase()
elif self.notice_type == "group_decrease":
return self.group_decrease()
else:
return None
def group_increase(self):
print(1)
try:
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
group_welcome = config.get("group_welcome")
print(group_welcome)
group_welcome_message = config.get("group_welcome_message")
print(group_welcome_message)
if group_welcome:
if "!at" in group_welcome_message:
return group_welcome_message.replace("!at", f"[CQ:at,qq={self.user_id}]")
else:
return group_welcome_message
else:
return None
except Exception as e:
logging.error(f"读取配置文件错误:{e}")
return None
def group_decrease(self):
try:
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
group_leave = config.get("group_leave")
group_leave_message = config.get("group_leave_message")
print(group_leave_message)
if group_leave:
if "{userid}" in group_leave_message:
return group_leave_message.replace("{userid}", str(self.user_id))
else:
return group_leave_message
else:
return None
except Exception as e:
logging.error(f"读取配置文件错误:{e}")
return None

141
control/private.py Normal file
View File

@@ -0,0 +1,141 @@
from model.AiCat import AiCat
from model.McList import McList
from model.McFind import McFind
from model.McHh import McHh
from model.McBind import McBind
import toml
import logging
class private:
def __init__(self, msg):
self.user_id = msg.user_id
self.message_id = msg.message_id
self.message_type = msg.message_type
self.raw_message = msg.raw_message
self.sender = msg.sender
self.message = msg.message
self.self_id = msg.self_id
self.time = msg.time
def main(self):
texts = [seg['data']['text'].strip() for seg in self.message if seg['type'] == 'text']
full_text = ' '.join(texts).strip()
permission = self.check_permission()
if permission is None:
return "服务器繁忙,请稍后再逝"
elif permission:
return self.menu(full_text)
else:
return "此bot未在该群启用"
def check_permission(self):
try:
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
allowed_users = config.get("allowed_users", [])
except Exception as e:
logging.error(str(e))
return None
# 检查当前群是否在允许列表中
if allowed_users == "all":
return True
elif self.user_id in allowed_users:
return True
else:
return False
def menu(self,command):
print( command)
if command.startswith("/help"):
return " \n /help -- 获取帮助 \n /list [服务器名/all] [页数] -- 列出服务器在线人数 \n /find <玩家名>-- 查找指定玩家是否在线 \n /bind <授权码> -- 绑定UNC游戏账户 \n /hh <喊话内容> -- 全服喊话 \n /cat <聊天内容> -- 与猫娘对话 \n /chat <提问内容> -- 智能问答(开发中,暂不可用) \n /status -- 查看bot状态 \n <必要参数> [可选参数]"
elif command.startswith("/list"):
# 去除前导空格并分割命令
parts = command.strip().split()
if len(parts) == 1:
# 只有 "/list" 命令
server = 'all'
page = '1'
elif len(parts) == 2:
# "/list server"
server = parts[1]
page = '1'
elif len(parts) == 3:
# "/list server page"
server = parts[1]
page = parts[2]
else:
return "参数错误 \n 格式:/list [服务器名/all] [页数]"
try:
page = int(page)
if page < 1:
return "请输入大于1的页数 \n 格式:/list [服务器名/all] [页数]"
except ValueError:
return "输入的页数需为整数 \n 格式 /list [服务器名/all] [页数]"
except Exception as e:
return "服务器繁忙,请稍后再逝"
page -= 1
list_player = McList(server, page)
answer = McList.main(list_player)
return answer
elif command.startswith("/find"):
# 排除 " /find "" /find"未传参情况
parts = command.split("/find ", 1)
if len(parts) > 1:
chat_content = parts[1].strip()
if chat_content:
find = McFind(chat_content)
answer = McFind.main(find)
return answer
else:
return "请提供查找的玩家名 \n 格式:/find <玩家名>"
else:
return "请提供查找的玩家名 \n 格式:/find <玩家名>"
elif command.startswith("/hh"):
# 排除“ /hh "” /hh“未传参情况
parts = command.split("/hh ", 1)
if len(parts) > 1:
chat_content = parts[1].strip()
if chat_content:
hh = McHh(self.user_id,chat_content)
answer = McHh.main(hh)
return answer
else:
return "请提供喊话内容 \n 格式:/hh <喊话内容>"
else:
return "请提供喊话内容 \n 格式:/hh <喊话内容>"
elif command.startswith("/chat"):
return "智能问答功能正在开发中,尚不可用!\n开发进度5%"
elif command.startswith("/cat"):
# # 排除 " /cat "" /cat"未传参情况
# parts = self.content.split(" /cat ", 1)
# if len(parts) > 1:
# chat_content = parts[1].strip()
# if chat_content:
# cat = AiCat(chat_content,self.id)
# answer = AiCat.main(cat)
# return answer
# else:
# return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>"
# else:
# return "你似乎没有提供想和我聊的内容喵~ \n 格式:/cat <提问内容>"
return "该内容已移动至猫娘qbot"
elif command.startswith("/bind"):
# 排除“ /bind "” /bind“未传参情况
parts = command.split("/bind ", 1)
if len(parts) > 1:
chat_content = parts[1].strip()
if chat_content:
bind = McBind(self.user_id, chat_content)
answer = McBind.main(bind)
return answer
else:
return "授权码不可为空 \n 格式:/bind <授权码> \n 在服务器内输入/bind指令获取授权码"
else:
return "授权码不可为空 \n 格式:/bind <授权码>"
elif command.startswith("/status"):
return "---MCUNC QBOT---\n Q bot 运行正常 \n 版本: 2.0 pre \n © 融玩文化 | 无尽创意MCUNC"
# return "\n Q Bot运行正常 \n Version: 0.2.1 pre-release \n © 融玩文化 | 无尽创意mcunc"
else:
return "功能不存在 \n 输入/help 查看帮助"

42
control/request.py Normal file
View File

@@ -0,0 +1,42 @@
import logging
import toml
class request:
def __init__(self,msg):
self.time = msg.time
self.self_id = msg.self_id
self.request_type = msg.request_type
self.sub_type = msg.sub_type
self.group_id = msg.group_id
self.user_id = msg.user_id
self.comment = msg.comment
self.flag = msg.flag
def main(self):
if self.request_type == "friend":
friend_auto = self.get_info()
if friend_auto:
return True
else:
return False
def get_info(self):
try:
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
friend_auto = config.get("friend_auto")
return friend_auto
except Exception as e:
logging.error(f"读取配置文件错误:{e}")
return False
def get_allow_group(self):
try:
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
allow_group = config.get("allowed_groups")
return allow_group
except Exception as e:
logging.error(f"读取配置文件错误:{e}")
return []

82
main.py Normal file
View File

@@ -0,0 +1,82 @@
from ncatbot.core.notice import NoticeMessage
from ncatbot.core import BotClient, Request
import logging
import asyncio
from control.group import group
from control.private import private
from control.request import request
from control.notice import notice
import toml
with open("./config.toml", "r", encoding="utf-8") as f:
config = toml.load(f)
bt_uin = config.get("bot_qq")
root = config.get("root_qq")
ws_uri = config.get("ws_uri")
web_uri = config.get("web_uri")
webui_token = config.get("webui_token")
ws_token = config.get("ws_token")
ws_listen_ip = config.get("ws_listen_ip")
remote_mode = config.get("remote_mode")
bot = BotClient()
api = bot.run_blocking(bt_uin=bt_uin, root=root, ws_uri=ws_uri, web_uri=web_uri, webui_token=webui_token, ws_token=ws_token, ws_listen_ip=ws_listen_ip, remote_mode=remote_mode)
@bot.group_event()
async def on_group_message(msg):
logging.info(f"收到消息:{msg.raw_message},来自{msg.group_id}群聊{msg.user_id}用户")
if msg.user_id == 2854196310: # qq管家防止刷屏。bot大战请看
pass
else:
ctrl = group(msg)
return_message = ctrl.main()
if return_message is None:
return
else:
logging.info(f"返回消息:{return_message}")
await bot.api.post_group_msg(group_id=msg.group_id, text=return_message, reply=msg.message_id)
@bot.private_event()
async def on_private_message(msg):
logging.info(f"收到消息:{msg.raw_message},来自{msg.user_id}用户")
if msg.user_id == 2854196310: # qq管家防止刷屏。
pass
else:
ctrl = private(msg)
return_message = ctrl.main()
if return_message is None:
return
else:
logging.info(f"返回消息:{return_message}")
await bot.api.post_private_msg(user_id=msg.user_id, text=return_message, reply=msg.message_id)
@bot.request_event()
async def on_request_event(msg: Request):
logging.info(f"收到request事件{msg.request_type},来自{msg.group_id}群聊,{msg.user_id}用户,验证消息:{msg.comment}")
ctrl = request(msg)
accept_friend_application = ctrl.main()
if accept_friend_application is True:
await msg.reply(True, comment="请求已通过")
logging.info("请求已通过")
else:
await msg.reply(False, comment="请求被拒绝")
logging.info("请求被拒绝")
@bot.notice_event()
async def on_notice_event(msg: NoticeMessage):
logging.info(f"收到notice事件{msg['notice_type']},来自{msg['group_id']}群聊,{msg['user_id']}用户")
ctrl = notice(msg)
return_message = ctrl.main()
if return_message is None:
return
else:
logging.info(f"返回消息:{return_message}")
await bot.api.post_group_msg(group_id=msg["group_id"], text=return_message)
asyncio.get_event_loop().run_forever()

55
model/AiCat.py Normal file
View File

@@ -0,0 +1,55 @@
import requests
import json
import logging
class AiCat:
def __init__(self,message,qid):
self.message = message
self.qid = qid
self.query = f"<qid>{self.qid}</qid>{self.message}"
print(self.query) # test
def main(self):
# API URL
url = "https://ai.mcunc.cn/v1/chat-messages" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer app-pqib8uic8oBP95XmXuBi5ANq" # 替换为你的 API 密钥
}
# 请求体
payload = {
"query": self.query, # 用户输入/提问内容
"inputs": {}, # App 定义的变量值(默认为空)
"response_mode": "blocking", # 流式模式或阻塞模式
"user": "QBotAPI", # 用户标识,需保证唯一性
"conversation_id": "09cc6545-b0e0-4611-aad1-cf9bf51600e1", # (选填)会话 ID继续对话时需要传入
"files": [], # 文件列表(选填),适用于文件结合文本理解
"auto_generate_name": True # (选填)自动生成标题,默认为 True
}
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logging.info("请求成功!返回结果:")
response_data = response.json()
logging.info(json.dumps(response_data, indent=4, ensure_ascii=False)) # 格式化输出 JSON
# 提取 answer 值
answer = response_data.get("answer", "answer 字段不存在")
print(response_data.get("conversation_id")) # test
return answer
else:
logging.info(f"请求失败!状态码: {response.status_code}")
logging.info(f"错误信息: {response.text}") # 打印错误信息
return "诶呀! 服务器好像出现了一点点错误呐~稍等一会再重试哦喵~"
except Exception as e:
logging.info(f"请求过程中出现异常: {e}")
return "诶呀! 服务器好像出现了一点点错误呐~稍等一会再重试哦喵~"

51
model/McBind.py Normal file
View File

@@ -0,0 +1,51 @@
import requests
import json
import logging
import toml
class McBind:
def __init__(self,qid,code):
self.qid = qid
self.code = code
def main(self):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
# 获取所需字段
velocity_ip = config.get("velocity_ip")
velocity_port = config.get("velocity_port")
velocity_token = config.get("velocity_token")
# API URL
url = f"http://{velocity_ip}:{velocity_port}/vc/blind" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": velocity_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"qqID": self.qid,
"code": self.code
}
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
return "您已成功绑定MCUNC游戏账户"
elif response.status_code == 403:
logging.info("授权码不存在或过期")
return "授权码不存在或过期"
else:
logging.error("请求失败!状态码: {response.status_code}")
logging.error(f"错误信息: {response.text}") # 打印错误信息
return "服务器繁忙,请稍后再逝"
except Exception as e:
logging.error(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"

55
model/McFind.py Normal file
View File

@@ -0,0 +1,55 @@
import requests
import json
import logging
import toml
class McFind:
def __init__(self,name):
self.name = name
def main(self):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
# 获取所需字段
velocity_ip = config.get("velocity_ip")
velocity_port = config.get("velocity_port")
velocity_token = config.get("velocity_token")
# API URL
url = f"http://{velocity_ip}:{velocity_port}/vc/find_player" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": velocity_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"name": self.name,
}
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logging.info("请求成功!返回结果:")
response_data = response.json()
logging.info(json.dumps(response_data, indent=4, ensure_ascii=False)) # 格式化输出 JSON
# 提取 answer 值
online = response_data.get("online", "online 字段不存在")
server = response_data.get("server", "server 字段不存在")
if online:
return f"{self.name}在线,所在服务器:{server}"
else:
return f"{self.name}不在线"
else:
logging.error(f"请求失败!状态码: {response.status_code}")
logging.error(f"错误信息: {response.text}") # 打印错误信息
return "服务器繁忙,请稍后再逝"
except Exception as e:
logging.error(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"

68
model/McHh.py Normal file
View File

@@ -0,0 +1,68 @@
import requests
import json
import logging
import toml
class McHh:
def __init__(self,qid,message):
self.qid = qid
self.message = message
def main(self):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
# 获取所需字段
velocity_ip = config.get("velocity_ip")
velocity_port = config.get("velocity_port")
velocity_token = config.get("velocity_token")
# API URL
url = f"http://{velocity_ip}:{velocity_port}/vc/hh" # 替换为实际的 API 地址
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": velocity_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"qID": self.qid,
"message": self.message
}
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
time = response.text
try:
time = int(time)
seconds = time / 1000
formatted_time = round(seconds, 1)
except Exception as e :
logging.error(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"
# 检查响应状态码
if response.status_code == 200:
return "喊话成功"
elif response.status_code == 403:
return "您未绑定MCUNC游戏账户请先绑定后重试"
elif response.status_code == 429:
return f"喊话过快,剩余时间:{formatted_time}s"
elif response.status_code == 409:
return "喊话失败:您没有足够的话筒!"
else:
logging.error(f"请求失败!状态码: {response.status_code}")
logging.error(f"错误信息: {response.text}") # 打印错误信息
return "服务器繁忙,请稍后再逝"
except Exception as e:
logging.error(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"

87
model/McList.py Normal file
View File

@@ -0,0 +1,87 @@
import requests
import json
import logging
import toml
class McList:
def __init__(self,server,page):
self.server = server
self.page = page
def main(self):
with open('./config.toml', 'r', encoding='utf-8') as f:
config = toml.load(f)
# 获取所需字段
velocity_ip = config.get("velocity_ip")
velocity_port = config.get("velocity_port")
velocity_token = config.get("velocity_token")
# API URL
url = f"http://{velocity_ip}:{velocity_port}/vc/query"
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": velocity_token # 替换为你的 API 密钥
}
# 请求体
payload = {
"server": self.server,
"page": self.page
}
# 发送 POST 请求
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
logging.info("请求成功!返回结果:")
response_data = response.json()
logging.info(json.dumps(response_data, indent=4, ensure_ascii=False)) # 格式化输出 JSON
# 提取 player_number 值
player_number = response_data.get("player_number", "player_number 字段不存在")
totalpage = response_data.get("totalPage", "totalPage 字段不存在")
players = response_data.get("players", "players 字段不存在")
try:
page = int(self.page)
formatted_page = page + 1
except Exception as e:
logging.error(f"请求过程中出现异常: {e}")
return "服务繁忙,请稍后再逝"
try:
formatted_totalpage = int(totalpage)
if formatted_totalpage == 0:
formatted_totalpage = 1
elif formatted_totalpage == -1:
return f"服务器:{self.server} 不存在"
else:
formatted_totalpage = totalpage
success = True
except Exception as e:
logging.info(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"
if players is None :
logging.info("玩家列表为空")
formatted_players = ""
else:
try:
formatted_players = "\n".join(f"{i+1}{player}" for i, player in enumerate(players))
except Exception as e:
logging.info(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"
answer = f"当前服务器共有{player_number}个玩家,\n{formatted_players} \n [{formatted_page}页/{formatted_totalpage}页]"
return answer
else:
logging.info(f"请求失败!状态码: {response.status_code}")
logging.info(f"错误信息: {response.text}")
logging.info(response.json)
return "服务器繁忙,请稍后再逝"
except Exception as e:
logging.info(f"请求过程中出现异常: {e}")
return "服务器繁忙,请稍后再逝"

0
model/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.