Skip to main content

群友语录

2024-7-9~2024-7-11


有群友说想看代码,因此就放出来了。

设计的是类似于关键词回复的,但是能主要是存放图片。类型相当于是大伙可以给图片存放在服务器并且分类到文件夹。比如上传2张图片都命名为小白,那么小白这个文件夹下就会存放两张图片,在获取的时候可以通过随机获取,也可以通过查询图片对应的ID来查看指定图片。

首先是修改主函数的调用,很简单的修改,添加这样两句

from scripts.citiao.citiao import handle_yulu_command, process_message, save_image_to_db, download_image, is_admin,add_image_from_reply,get_all_aliases

if message.startswith('/yulu'):
asyncio.run(handle_yulu_command(message, raw_message, user_id, group_id, send_group_message))
else:
asyncio.run(process_message(data, send_group_message))

然后是编写数据库,之前想的是存base64数据到数据库里,然后又想的直接存字节数据到数据库,最后觉得还是直接存成图片发图片比较好。

首先是初始化

init_citiao_db.py

import sqlite3
import os

DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'citiao.db')

def create_db():
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS yulu (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qq_id TEXT NOT NULL,
alias TEXT NOT NULL
)
''')

conn.commit()
conn.close()

if __name__ == "__main__":
create_db()
print("Database created successfully.")

首先是存的ID 添加人的QQ号 别名

不过后面需要设置是否在这个群使用yulu功能,因此又新增了一个

update_db.py

import sqlite3
import os

DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'citiao.db')

def update_db():
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS group_settings (
group_id TEXT PRIMARY KEY,
yulu_enabled INTEGER NOT NULL DEFAULT 1
)
''')

conn.commit()
conn.close()

if __name__ == "__main__":
update_db()
print("Database updated successfully.")

那么现在就是对citiao.py(语录)进行编写

我的功能是这样设计:(/help yulu)

功能设计

/yulu add (别名):别名为可选字段,发送后需要发送一张图片

/yulu info:查询你本人添加的图片

/yulu del [序号]:删除自己添加的序号图片,无法删除其他人添加的图片

/yulu dels [别名]:(管理员可用)删除某个别名下的所有图片

/yulu search (别名):不使用别名参数时,将展示所有别名。查询别名时将展示别名包含的所有序号

/yulu show (序号|别名):不使用参数时,将随机展示一张已添加的内容,使用参数时会精准展示(参数为别名时会随机展示一张别名的内容)

/yulu re [序号] [新的别名]:修改序号的别名,无法修改其他人添加的图片

/yulu [enable|disable]:开/关本群语录功能(BOT管理员可用)

一眼丁真,可以开始写代码了,解释我写在代码中了,同时需要在同文件夹下新建img文件夹,最后会展示树结构

citiao.py

import sqlite3
import os
import asyncio
import requests
import logging
import random
import datetime

DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'citiao.db') #当前文件夹下创建,在/root/NapCat/app/scripts/citiao
ADMIN_DATABASE_PATH = os.path.join('/root/NapCat/app/','admin.db') #admin.db位置
IMG_DIR = os.path.join(os.path.dirname(__file__), 'img') #记得提前创建
waiting_users = {}

def connect_db():
return sqlite3.connect(DATABASE_PATH, check_same_thread=False)

def connect_admin_db():
return sqlite3.connect(ADMIN_DATABASE_PATH, check_same_thread=False)

#保存图片,默认别名为未设置
def save_image_to_db(qq_id, alias='未设置'):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('INSERT INTO yulu (qq_id, alias) VALUES (?, ?)', (qq_id, alias))
image_id = cursor.lastrowid
conn.commit()
conn.close()
return image_id

#普通用户删除图片还有数据库中对应内容
def delete_image(qq_id, image_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('DELETE FROM yulu WHERE id = ? AND qq_id = ?', (image_id, qq_id))
deleted = cursor.rowcount
conn.commit()
conn.close()
if deleted:
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
if os.path.exists(image_path):
os.remove(image_path)
return deleted

#管理员用户删除图片与数据库对应内容
def delete_image_admin(image_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('DELETE FROM yulu WHERE id = ?', (image_id,))
deleted = cursor.rowcount
conn.commit()
conn.close()
if deleted:
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
if os.path.exists(image_path):
os.remove(image_path)
return deleted

#/yulu info
def get_user_images(qq_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id, alias FROM yulu WHERE qq_id = ?', (qq_id,))
rows = cursor.fetchall()
conn.close()
return rows

def get_image_path(image_id):
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
return image_path if os.path.exists(image_path) else None

#获取图片对应别名
def get_images_by_alias(alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id, alias FROM yulu WHERE alias LIKE ?', ('%' + alias + '%',))
rows = cursor.fetchall()
conn.close()
return rows

#获取所有别名
def get_all_aliases():
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT DISTINCT alias FROM yulu')
rows = cursor.fetchall()
conn.close()
return [row[0] for row in rows]

#获取所有图片
def get_all_images():
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id FROM yulu')
rows = cursor.fetchall()
conn.close()
return rows

#判断是否是管理员
def is_admin(qq_id):
conn = connect_admin_db()
cursor = conn.cursor()
cursor.execute('SELECT permissions FROM admin WHERE user_id = ?', (qq_id,))
result = cursor.fetchone()
conn.close()
return result is not None and result[0] in ('root', 'admin')

#下载图片,最多尝试3次
async def download_image(image_url):
retries = 3
for attempt in range(retries):
try:
response = requests.get(image_url)
response.raise_for_status()
return response.content
except requests.RequestException as e:
logging.error(f"Error downloading image: {e}")
if attempt < retries - 1:
logging.info(f"Retrying... ({attempt + 1}/{retries})")
await asyncio.sleep(2)
else:
logging.error("Max retries reached, giving up.")
return None

#用户添加图片响应
async def add_image_from_reply(user_id, group_id, alias, image_url, send_group_message):
send_group_message(group_id, '正在添加,请稍后……')
try:
image_data = await download_image(image_url)
if image_data:
image_id = save_image_to_db(user_id, alias)
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
with open(image_path, 'wb') as image_file:
image_file.write(image_data)
response_message = f"图片已保存,ID为 {image_id},别名为 {alias}。"
send_group_message(group_id, response_message)
else:
response_message = "下载图片时出错,请重试。"
send_group_message(group_id, response_message)
except Exception as e:
logging.error(f"Error saving image from reply: {e}")
response_message = "保存图片时出错,请重试。"
send_group_message(group_id, response_message)

#rename
def update_image_alias(image_id, qq_id, new_alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('UPDATE yulu SET alias = ? WHERE id = ? AND qq_id = ?', (new_alias, image_id, qq_id))
updated = cursor.rowcount
conn.commit()
conn.close()
return updated
#admin rename
def update_image_alias_admin(image_id, new_alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('UPDATE yulu SET alias = ? WHERE id = ?', (new_alias, image_id))
updated = cursor.rowcount
conn.commit()
conn.close()
return updated
#本群是否开启yulu功能
def is_yulu_enabled(group_id):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT yulu_enabled FROM group_settings WHERE group_id = ?', (group_id,))
result = cursor.fetchone()
conn.close()
return result is None or result[0] == 1
#设置本群是否开启yulu功能
def set_yulu_enabled(group_id, enabled):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('INSERT OR REPLACE INTO group_settings (group_id, yulu_enabled) VALUES (?, ?)', (group_id, 1 if enabled else 0))
conn.commit()
conn.close()
#删除某一别名下的所有图片
def delete_images_by_alias(alias):
conn = connect_db()
cursor = conn.cursor()
cursor.execute('SELECT id FROM yulu WHERE alias = ?', (alias,))
rows = cursor.fetchall()
deleted_count = 0

for row in rows:
image_id = row[0]
cursor.execute('DELETE FROM yulu WHERE id = ?', (image_id,))
deleted_count += cursor.rowcount
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
if os.path.exists(image_path):
os.remove(image_path)

conn.commit()
conn.close()
return deleted_count

#主函数
async def handle_yulu_command(message, raw_message, user_id, group_id, send_group_message):
if not is_yulu_enabled(group_id) and not message.startswith('/yulu enable') and not message.startswith('/yulu disable'):
send_group_message(group_id, "本群的 'yulu' 功能已被禁用。")
return
parts = message.split(maxsplit=2)
if len(parts) >= 2:
subcommand = parts[1]
if subcommand == 'add':
alias = parts[2] if len(parts) == 3 else '未设置'
await handle_yulu_add(group_id, user_id, alias, send_group_message)
elif subcommand == 'del' and len(parts) == 3:
image_id = int(parts[2])
if delete_image(user_id, image_id) or (is_admin(user_id) and delete_image_admin(image_id)):
send_group_message(group_id, f"图片ID {image_id} 已删除。")
else:
send_group_message(group_id, "你没有权限进行此操作。")
elif subcommand == 'dels' and len(parts) == 3:
if is_admin(user_id):
alias = parts[2]
deleted_count = delete_images_by_alias(alias)
send_group_message(group_id, f"别名 {alias}{deleted_count} 张图片已删除。")
else:
send_group_message(group_id, "你没有权限进行此操作。")
elif subcommand == 'info':
images = get_user_images(user_id)
if images:
response_message = "你添加的词条如下:\n" + "\n".join([f"ID:{img[0]} 别名:{img[1]}" for img in images])
else:
response_message = "你没有添加任何词条。"
send_group_message(group_id, response_message)
elif subcommand == 're' and len(parts) == 3:
id_and_alias = parts[2].split(maxsplit=1)
if len(id_and_alias) == 2:
image_id = int(id_and_alias[0])
new_alias = id_and_alias[1]
if update_image_alias(image_id, user_id, new_alias) or (
is_admin(user_id) and update_image_alias_admin(image_id, new_alias)):
send_group_message(group_id, f"图片ID {image_id} 的别名已修改为 {new_alias}。")
else:
send_group_message(group_id, "你没有权限进行此操作。")
else:
send_group_message(group_id, "命令格式错误,请使用 /help 查看帮助信息。")
elif subcommand == 'show':
if len(parts) == 3:
try:
image_id = int(parts[2])
image_path = get_image_path(image_id)
if image_path:
logging.info(f"Sending image with ID {image_id}")
send_group_message(group_id, f"[CQ:image,file=file://{image_path}]")
else:
send_group_message(group_id, f"未找到ID为{image_id}的图片。")
except ValueError:
alias = parts[2]
image_ids = get_images_by_alias(alias)
if image_ids:
image_id = random.choice([img[0] for img in image_ids])
image_path = get_image_path(image_id)
logging.info(f"Sending image with alias {alias}")
send_group_message(group_id, f"[CQ:image,file=file://{image_path}]")
else:
send_group_message(group_id, f"未找到别名为{alias}的图片。")
else:
image_ids = get_all_images()
if image_ids:
image_id = random.choice([img[0] for img in image_ids])
image_path = get_image_path(image_id)
logging.info(f"Sending random image with ID {image_id}")
send_group_message(group_id, f"[CQ:image,file=file://{image_path}]")
else:
send_group_message(group_id, "未找到任何图片。")
elif subcommand == 'search':
if len(parts) == 3:
alias = parts[2]
images = get_images_by_alias(alias)
if images:
response_message = f"别名包含 {alias} 的图片ID:\n" + "\n".join([f"ID:{img[0]} 别名:{img[1]}" for img in images])
else:
response_message = f"未找到别名包含 {alias} 的图片。"
send_group_message(group_id, response_message)
else:
aliases = get_all_aliases()
if aliases:
response_message = "所有别名如下:\n" + "\t".join(aliases)
else:
response_message = "未找到任何别名。"
send_group_message(group_id, response_message)
elif subcommand == 'status':
if is_admin(user_id):
await handle_yulu_status(group_id, send_group_message)
else:
send_group_message(group_id, "你没有权限执行此命令。")
elif subcommand == 'enable' and is_admin(user_id):
set_yulu_enabled(group_id, True)
send_group_message(group_id, "本群的 'yulu' 功能已开启。")
elif subcommand == 'disable' and is_admin(user_id):
set_yulu_enabled(group_id, False)
send_group_message(group_id, "本群的 'yulu' 功能已禁用。")
else:
send_group_message(group_id, "命令格式错误,请使用 /help 查看帮助信息。")
#等待用户添加,用的是指令版不是/add版本
async def process_message(data, send_group_message):
user_id = str(data['user_id'])
if user_id in waiting_users:
group_id = waiting_users[user_id]["group_id"]
alias = waiting_users[user_id]["alias"]
if waiting_users[user_id]["waiting"] and data['post_type'] == 'message' and data['message_type'] == 'group' and str(data['user_id']) == user_id:
for item in data['message']:
if item['type'] == 'image':
waiting_users[user_id]["waiting"] = False
image_url = item['data']['url']
send_group_message(group_id, "正在添加,请稍后。")
try:
image_data = await download_image(image_url)
if image_data:
image_id = save_image_to_db(user_id, alias)
image_path = os.path.join(IMG_DIR, f"{image_id}.jpg")
with open(image_path, 'wb') as image_file:
image_file.write(image_data)
response_message = f"图片已保存,ID为 {image_id}。"
send_group_message(group_id, response_message)
else:
response_message = "下载图片时出错,请重试。"
send_group_message(group_id, response_message)
except Exception as e:
logging.error(f"Error saving image: {e}")
response_message = "保存图片时出错,请重试。"
send_group_message(group_id, response_message)
finally:
del waiting_users[user_id]
return
response_message = "您发送的不是图片,请重新发送。"
send_group_message(group_id, response_message)
del waiting_users[user_id]

async def handle_yulu_add(group_id, qq_id, alias, send_group_message):
response_message = "请在30秒内发送一张图片。"
send_group_message(group_id, response_message)

waiting_users[qq_id] = {"group_id": group_id, "alias": alias, "waiting": True}
await asyncio.sleep(30)
if qq_id in waiting_users and waiting_users[qq_id]["waiting"]:
del waiting_users[qq_id]
response_message = "添加语录超时,请重新添加。"
send_group_message(group_id, response_message)
#查询状态
async def handle_yulu_status(group_id, send_group_message):
conn = connect_db()
cursor = conn.cursor()

cursor.execute('SELECT id, qq_id FROM yulu ORDER BY id DESC LIMIT 1')
last_record = cursor.fetchone()
last_id, last_user = last_record if last_record else (None, None)

last_add_time = None
if last_id:
image_path = get_image_path(last_id)
if image_path:
last_add_time = datetime.datetime.fromtimestamp(os.path.getmtime(image_path)).strftime('%Y-%m-%d %H:%M:%S')

cursor.execute('SELECT COUNT(DISTINCT alias) FROM yulu')
total_aliases = cursor.fetchone()[0]

cursor.execute('SELECT COUNT(*) FROM yulu')
total_images = cursor.fetchone()[0]

cursor.execute('SELECT alias, COUNT(*) as cnt FROM yulu WHERE alias != "未设置" GROUP BY alias ORDER BY cnt DESC LIMIT 1')
most_common_alias_record = cursor.fetchone()
most_common_alias, most_common_count = most_common_alias_record if most_common_alias_record else (None, 0)

cursor.execute('SELECT id FROM yulu WHERE alias = ?', (most_common_alias,))
most_common_ids = cursor.fetchall()

cursor.execute('SELECT qq_id, COUNT(*) as cnt FROM yulu GROUP BY qq_id ORDER BY cnt DESC LIMIT 1')
most_active_user_record = cursor.fetchone()
max_user, total_user_images = most_active_user_record if most_active_user_record else (None, 0)

conn.close()

response_message = (
f"最近一条语录添加人:{last_user},添加时间:{last_add_time},ID为{last_id}\n"
f"总的别名数量:{total_aliases}\n"
f"总的语录数量:{total_images}\n"
f"(除去未设置后)其中最多的语录是 {most_common_alias},ID分别是:{', '.join(str(id[0]) for id in most_common_ids)}\n"
f"其中添加语录最多的人是 {max_user},TA一共添加了 {total_user_images} 条"
)

send_group_message(group_id, response_message)

├── citiao.db ├── citiao.py ├── img/ ├── init_db.py ├── update_db.py