群友语录
2024-7-9~2024-7-11
有群友说想看代码,因此就放出来了。
设计的是类似于关键词回复的,但是能主要是存放图片。类型相当于是大伙可以给图片存放在服务器并且分类到文件夹。比如上传2张图片都命名为小白,那么小白这个文件夹下就会存放两张图片,在获取的时候可以通过随机获取,也可以通过查询图片对应的ID来查看指定图片。
首先是修改主函数的调用,很简单的修改,添加这样两句
from scripts.citiao.citiao import handle_yulu_command, process_message, save_image_to_db, download_image, is_admin,add_image_from_reply,get_all_aliases
if message.startswith('/yulu'):
asyncio.run(handle_yulu_command(message, raw_message, user_id, group_id, send_group_message))
else:
asyncio.run(process_message(data, send_group_message))
然后是编写数据库,之前想的是存base64数据到数据库里,然后又想的直接存字节数据到数据库,最后觉得还是直接存成图片发图片比较好。
首先是初始化
init_citiao_db.py
import sqlite3
import os
DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'citiao.db')
def create_db():
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS yulu (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qq_id TEXT NOT NULL,
alias TEXT NOT NULL
)
''')
conn.commit()
conn.close()
if __name__ == "__main__":
create_db()
print("Database created successfully.")
首先是存的ID 添加人的QQ号 别名
不过后面需要设置是否在这个群使用yulu功能,因此又新增了一个
update_db.py
import sqlite3
import os
DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'citiao.db')
def update_db():
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS group_settings (
group_id TEXT PRIMARY KEY,
yulu_enabled INTEGER NOT NULL DEFAULT 1
)
''')
conn.commit()
conn.close()
if __name__ == "__main__":
update_db()
print("Database updated successfully.")
那么现在就是对citiao.py(语录)进行编写
我的功能是这样设计:(/help yulu)
功能设计
/yulu add (别名):别名为可选字段,发送后需要发送一张图片
/yulu info: 查询你本人添加的图片
/yulu del [序号]:删除自己添加的序号图片,无法删除其他人添加的图片
/yulu dels [别名]:(管理员可用)删除某个别名下的所有图片
/yulu search (别名):不使用别名参数时,将展示所有别名。查询别名时将展示别名包含的所有序号
/yulu show (序号|别名):不使用参数时,将随机展示一张已添加的内容,使用参数时会精准展示(参数为别名时会随机展示一张别名的内容)
/yulu re [序号] [新的别名]:修改序号的别名,无法修改其他人添加的图片
/yulu [enable|disable]:开/关本群语录功能(BOT管理员可用)
一眼丁真,可以开始写代码了,解释我写在代码中了,同时需要在同文件夹下新建img文件夹,最后会展示树结构
citiao.py
import sqlite3
import os
import asyncio
import requests
import logging
import random
import datetime
DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'citiao.db') #当前文件夹下创建,在/root/NapCat/app/scripts/citiao
ADMIN_DATABASE_PATH = os.path.join('/root/NapCat/app/','admin.db') #admin.db位置
IMG_DIR = os.path.join(os.path.dirname(__file__), 'img') #记得提前创建
waiting_users = {}
def connect_db():
return sqlite3.connect(DATABASE_PATH, check_same_thread=False)
def connect_admin_db():
return sqlite3.connect(ADMIN_DATABASE_PATH, check_same_thread=False)
#保存图片,默认别名为未设置
def save_image_to_db(qq_id, alias='未设置'):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('INSERT INTO yulu (qq_id, alias) VALUES (?, ?)', (qq_id, alias))
image_id = cursor.lastrowid
conn.commit()
conn.close()
return image_id
#普通用户删除图片还有数据库中对应内容
def delete_image(qq_id, image_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('DELETE FROM yulu WHERE id = ? AND qq_id = ?', (image_id, qq_id))
deleted = cursor.rowcount
conn.commit()
conn.close()
if deleted:
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
if os.path.exists(image_path):
os.remove(image_path)
return deleted
#管理员用户删除图片与数据库对应内容
def delete_image_admin(image_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('DELETE FROM yulu WHERE id = ?', (image_id,))
deleted = cursor.rowcount
conn.commit()
conn.close()
if deleted:
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
if os.path.exists(image_path):
os.remove(image_path)
return deleted
#/yulu info
def get_user_images(qq_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id, alias FROM yulu WHERE qq_id = ?', (qq_id,))
rows = cursor.fetchall()
conn.close()
return rows
def get_image_path(image_id):
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
return image_path if os.path.exists(image_path) else None
#获取图片对应别名
def get_images_by_alias(alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id, alias FROM yulu WHERE alias LIKE ?', ('%' + alias + '%',))
rows = cursor.fetchall()
conn.close()
return rows
#获取所有别名
def get_all_aliases():
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT DISTINCT alias FROM yulu')
rows = cursor.fetchall()
conn.close()
return [row[0] for row in rows]
#获取所有图片
def get_all_images():
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id FROM yulu')
rows = cursor.fetchall()
conn.close()
return rows
#判断是否是管理员
def is_admin(qq_id):
conn = connect_admin_db()
cursor = conn.cursor()
cursor.execute('SELECT permissions FROM admin WHERE user_id = ?', (qq_id,))
result = cursor.fetchone()
conn.close()
return result is not None and result[0] in ('root', 'admin')
#下载图片,最多尝试3次
async def download_image(image_url):
retries = 3
for attempt in range(retries):
try:
response = requests.get(image_url)
response.raise_for_status()
return response.content
except requests.RequestException as e:
logging.error(f"Error downloading image: {e}")
if attempt < retries - 1:
logging.info(f"Retrying... ({attempt + 1}/{retries})")
await asyncio.sleep(2)
else:
logging.error("Max retries reached, giving up.")
return None
#用户添加图片响应
async def add_image_from_reply(user_id, group_id, alias, image_url, send_group_message):
send_group_message(group_id, '正在添加,请稍后……')
try:
image_data = await download_image(image_url)
if image_data:
image_id = save_image_to_db(user_id, alias)
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
with open(image_path, 'wb') as image_file:
image_file.write(image_data)
response_message = f"图片已保存,ID为 {image_id},别名为 {alias}。"
send_group_message(group_id, response_message)
else:
response_message = "下载图片时出错,请重试。"
send_group_message(group_id, response_message)
except Exception as e:
logging.error(f"Error saving image from reply: {e}")
response_message = "保存图片时出错,请重试。"
send_group_message(group_id, response_message)
#rename
def update_image_alias(image_id, qq_id, new_alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('UPDATE yulu SET alias = ? WHERE id = ? AND qq_id = ?', (new_alias, image_id, qq_id))
updated = cursor.rowcount
conn.commit()
conn.close()
return updated
#admin rename
def update_image_alias_admin(image_id, new_alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('UPDATE yulu SET alias = ? WHERE id = ?', (new_alias, image_id))
updated = cursor.rowcount
conn.commit()
conn.close()
return updated
#本群是否开启yulu功能
def is_yulu_enabled(group_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT yulu_enabled FROM group_settings WHERE group_id = ?', (group_id,))
result = cursor.fetchone()
conn.close()
return result is None or result[0] == 1
#设置本群是否开启yulu功能
def set_yulu_enabled(group_id, enabled):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('INSERT OR REPLACE INTO group_settings (group_id, yulu_enabled) VALUES (?, ?)', (group_id, 1 if enabled else 0))
conn.commit()
conn.close()
#删除某一别名下的所有图片
def delete_images_by_alias(alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id FROM yulu WHERE alias = ?', (alias,))
rows = cursor.fetchall()
deleted_count = 0
for row in rows:
image_id = row[0]
cursor.execute('DELETE FROM yulu WHERE id = ?', (image_id,))
deleted_count += cursor.rowcount
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
if os.path.exists(image_path):
os.remove(image_path)
conn.commit()
conn.close()
return deleted_count
#主函数
async def handle_yulu_command(message, raw_message, user_id, group_id, send_group_message):
if not is_yulu_enabled(group_id) and not message.startswith('/yulu enable') and not message.startswith('/yulu disable'):
send_group_message(group_id, "本群的 'yulu' 功能已被禁用。")
return
parts = message.split(maxsplit=2)
if len(parts) >= 2:
subcommand = parts[1]
if subcommand == 'add':
alias = parts[2] if len(parts) == 3 else '未设置'
await handle_yulu_add(group_id, user_id, alias, send_group_message)
elif subcommand == 'del' and len(parts) == 3:
image_id = int(parts[2])
if delete_image(user_id, image_id) or (is_admin(user_id) and delete_image_admin(image_id)):
send_group_message(group_id, f"图片ID {image_id} 已删除。")
else:
send_group_message(group_id, "你没有权限进行此操作。")
elif subcommand == 'dels' and len(parts) == 3:
if is_admin(user_id):
alias = parts[2]
deleted_count = delete_images_by_alias(alias)
send_group_message(group_id, f"别名 {alias} 的 {deleted_count} 张图片已删除。")
else:
send_group_message(group_id, "你没有权限进行此操作。")
elif subcommand == 'info':
images = get_user_images(user_id)
if images:
response_message = "你添加的词条如下:\n" + "\n".join([f"ID:{img[0]} 别名:{img[1]}" for img in images])
else:
response_message = "你没有添加任何词条。"
send_group_message(group_id, response_message)
elif subcommand == 're' and len(parts) == 3:
id_and_alias = parts[2].split(maxsplit=1)
if len(id_and_alias) == 2:
image_id = int(id_and_alias[0])
new_alias = id_and_alias[1]
if update_image_alias(image_id, user_id, new_alias) or (
is_admin(user_id) and update_image_alias_admin(image_id, new_alias)):
send_group_message(group_id, f"图片ID {image_id} 的别名已修改为 {new_alias}。")
else:
send_group_message(group_id, "你没有权限进行此操作。")
else:
send_group_message(group_id, "命令格式错误,请使用 /help 查看帮助信息。")
elif subcommand == 'show':
if len(parts) == 3:
try:
image_id = int(parts[2])
image_path = get_image_path(image_id)
if image_path:
logging.info(f"Sending image with ID {image_id}")
send_group_message(group_id, f"[CQ:image,file=file://{image_path}]")
else:
send_group_message(group_id, f"未找到ID为{image_id}的图片。")
except ValueError:
alias = parts[2]
image_ids = get_images_by_alias(alias)
if image_ids:
image_id = random.choice([img[0] for img in image_ids])
image_path = get_image_path(image_id)
logging.info(f"Sending image with alias {alias}")
send_group_message(group_id, f"[CQ:image,file=file://{image_path}]")
else:
send_group_message(group_id, f"未找到别名为{alias}的图片。")
else:
image_ids = get_all_images()
if image_ids:
image_id = random.choice([img[0] for img in image_ids])
image_path = get_image_path(image_id)
logging.info(f"Sending random image with ID {image_id}")
send_group_message(group_id, f"[CQ:image,file=file://{image_path}]")
else:
send_group_message(group_id, "未找到任何图片。")
elif subcommand == 'search':
if len(parts) == 3:
alias = parts[2]
images = get_images_by_alias(alias)
if images:
response_message = f"别名包含 {alias} 的图片ID:\n" + "\n".join([f"ID:{img[0]} 别名 :{img[1]}" for img in images])
else:
response_message = f"未找到别名包含 {alias} 的图片。"
send_group_message(group_id, response_message)
else:
aliases = get_all_aliases()
if aliases:
response_message = "所有别名如下:\n" + "\t".join(aliases)
else:
response_message = "未找到任何别名。"
send_group_message(group_id, response_message)
elif subcommand == 'status':
if is_admin(user_id):
await handle_yulu_status(group_id, send_group_message)
else:
send_group_message(group_id, "你没有权限执行此命令。")
elif subcommand == 'enable' and is_admin(user_id):
set_yulu_enabled(group_id, True)
send_group_message(group_id, "本群的 'yulu' 功能已开启。")
elif subcommand == 'disable' and is_admin(user_id):
set_yulu_enabled(group_id, False)
send_group_message(group_id, "本群的 'yulu' 功能已禁用。")
else:
send_group_message(group_id, "命令格式错误,请使用 /help 查看帮助信息。")
#等待用户添加,用的是指令版不是/add版本
async def process_message(data, send_group_message):
user_id = str(data['user_id'])
if user_id in waiting_users:
group_id = waiting_users[user_id]["group_id"]
alias = waiting_users[user_id]["alias"]
if waiting_users[user_id]["waiting"] and data['post_type'] == 'message' and data['message_type'] == 'group' and str(data['user_id']) == user_id:
for item in data['message']:
if item['type'] == 'image':
waiting_users[user_id]["waiting"] = False
image_url = item['data']['url']
send_group_message(group_id, "