Skip to main content

词云统计

2024-6-14


功能设计

直接简单说明个人想法

该功能存放于scripts/WordCloud/下,其中有以下内容:

wordcould.py-->用于处理词云

init_wcdb.py-->建立sqlite3数据库存储内容

groups/-->groups文件夹存放内容

然后具体功能设计想法是这样的:

  1. 管理员或者超级管理员(admin或root)可以打开与关闭词云功能,具体命令是/wc on和/wc off(管理员相关代码和数据库在上传的/app/admin.py和/app/admin.db,db的结构是 user_id TEXT PRIMARY KEY, permissions TEXT)
  2. 生成一个数据库,包含开启该功能的QQ群、开启人账号、开启状态 即group_id,user_id,IsOpen,当用户B(管理员用户)在A群输入/wc on的时候,数据库该条内容为 A,B,1 输入/wc off的时候,为: A,B,0
  3. 当每次输入/wc on的时候,判断该群聊是否已经开启,如已经开启则提示:该群已经打开词云统计!,如果为首次开启或者从关闭到开启,则提示:词云统计开启成功。 同时,如果为首次开启或者从关闭到开启,则在groups文件夹下创建一个txt文件来存放消息记录,文件名为group_id.txt,例如QQ群号1234567(后面都以1234567为例),则生成/scripts/WordCloud/groups/1234567.txt,如果已经开启则不进行任何操作。 如果管理员或超级管理员输入/wc off的时候,则删除/scripts/WordCloud/groups/1234567.txt文件
  4. 如果该群开启了词云功能,则在每天的23:59分统计该群的词云并生成图片存放在/groups/1234567.png中,然后使用go-cqhttp的[CQ:image,file=<filename>]进行图片的发送,例如发送内容"send_group_message(group_id, "本群今日聊天记录词云统计如下[CQ:image,file=./groups/1234567.png]") 同时在发送后删除该1234567.png和1234567.txt,然后创建新的1234567.txt统计新一天的内容
  5. 如果有用户(所有用户均可)想提前查询当日的词云,那么他可以输入/wc show命令提前生成词云,存放依旧为/groups/1234567.png,发送之后不会删除1234567.txt,但是会删除1234567.png图片

然后需要安装所需要的库

pip install matplotlib wordcloud

wordclouds.py

import os
import sqlite3
from wordcloud import WordCloud
import logging

DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'wordcloud.db')
GROUPS_DIR = os.path.join(os.path.dirname(__file__), 'groups')
FONT_PATH = '/usr/share/fonts/truetype/simhei.ttf' # 使用SimHei字体


def toggle_wordcloud(group_id, user_id, is_open, send_group_message):
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()

cursor.execute('SELECT is_open FROM wordcloud_groups WHERE group_id = ?', (group_id,))
row = cursor.fetchone()

if row:
if row[0] == is_open:
if is_open:
send_group_message(group_id, "该群已经打开词云统计!")
else:
send_group_message(group_id, "该群已经关闭词云统计!")
else:
cursor.execute('UPDATE wordcloud_groups SET is_open = ?, user_id = ? WHERE group_id = ?',
(is_open, user_id, group_id))
if is_open:
if not os.path.exists(GROUPS_DIR):
os.makedirs(GROUPS_DIR)
open(os.path.join(GROUPS_DIR, f"{group_id}.txt"), 'w').close()
send_group_message(group_id, "词云统计开启成功。")
else:
os.remove(os.path.join(GROUPS_DIR, f"{group_id}.txt"))
send_group_message(group_id, "词云统计关闭成功。")
else:
cursor.execute('INSERT INTO wordcloud_groups (group_id, user_id, is_open) VALUES (?, ?, ?)',
(group_id, user_id, is_open))
if is_open:
if not os.path.exists(GROUPS_DIR):
os.makedirs(GROUPS_DIR)
open(os.path.join(GROUPS_DIR, f"{group_id}.txt"), 'w').close()
send_group_message(group_id, "词云统计开启成功。")

conn.commit()
conn.close()


def generate_wordcloud(group_id, send_group_message):
txt_path = os.path.join(GROUPS_DIR, f"{group_id}.txt")
png_path = os.path.join(GROUPS_DIR, f"{group_id}.png")

logging.info(f"Generating wordcloud for group {group_id}")
logging.info(f"Text file path: {txt_path}")
logging.info(f"PNG file path: {png_path}")

if not os.path.exists(txt_path) or os.path.getsize(txt_path) == 0:
send_group_message(group_id, "没有足够的数据生成词云。")
return None

with open(txt_path, 'r', encoding='utf-8') as file:
text = file.read()

logging.info(f"Text content: {text}")

if not text.strip():
send_group_message(group_id, "没有足够的数据生成词云。")
return None

try:
wordcloud = WordCloud(
width=800,
height=400,
background_color='white',
font_path=FONT_PATH,
colormap='viridis',
max_font_size=200,
random_state=42
).generate(text)
wordcloud.to_file(png_path)
return png_path
except Exception as e:
logging.error(f"Failed to generate wordcloud: {e}")
send_group_message(group_id, "生成词云时发生错误。")
return None


def handle_wordcloud_command(command, group_id, user_id, send_group_message, is_admin):
if command == 'on' or command == 'off':
if not is_admin(user_id):
send_group_message(group_id, "你没有权限执行此命令。")
return

if command == 'on':
toggle_wordcloud(group_id, user_id, 1, send_group_message)
elif command == 'off':
toggle_wordcloud(group_id, user_id, 0, send_group_message)
elif command == 'show':
png_path = generate_wordcloud(group_id, send_group_message)
if png_path:
absolute_png_path = f"file:///root/NapCat/app/scripts/WordCloud/groups/{group_id}.png"
send_group_message(group_id, f"本群今日聊天记录词云统计如下[CQ:image,file={absolute_png_path}]")
os.remove(png_path)
else:
send_group_message(group_id, "命令格式错误。")


def record_message(group_id, message):
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()

cursor.execute('SELECT is_open FROM wordcloud_groups WHERE group_id = ?', (group_id,))
row = cursor.fetchone()

if row and row[0] == 1:
txt_path = os.path.join(GROUPS_DIR, f"{group_id}.txt")
with open(txt_path, 'a', encoding='utf-8') as file:
logging.info(f"Recording message to {txt_path}: {message}")
file.write(message + '\n')

conn.close()

init_wcdb.py

import sqlite3
import os

DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'wordcloud.db')

def init_db():
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS wordcloud_groups (
group_id TEXT PRIMARY KEY,
user_id TEXT,
is_open INTEGER
)
''')
conn.commit()
conn.close()

if __name__ == "__main__":
init_db()

main.py

修改添加一下业务功能就行

def receive_event():
elif message.startswith('/wc'):
parts = message.split()
if len(parts) >= 2:
handle_wordcloud_command(parts[1], group_id, user_id, send_group_message, is_admin)
else:
send_group_message(group_id, "命令格式错误。")

def generate_daily_wordcloud():
conn = sqlite3.connect(os.path.join('scripts', 'WordCloud', 'wordcloud.db'))
cursor = conn.cursor()
cursor.execute('SELECT group_id FROM wordcloud_groups WHERE is_open = 1')
groups = cursor.fetchall()
conn.close()

for group in groups:
group_id = group[0]
png_path = generate_wordcloud(group_id, send_group_message)
if png_path:
absolute_png_path = f"file:///root/NapCat/app/scripts/WordCloud/groups/{group_id}.png"
send_group_message(group_id, f"本群今日聊天记录词云统计如下[CQ:image,file={absolute_png_path}]")
os.remove(png_path)
txt_path = os.path.join('scripts', 'WordCloud', 'groups', f"{group_id}.txt")
if os.path.exists(txt_path):
os.remove(txt_path)
open(txt_path, 'w').close()

def schedule_tasks():
schedule.every().day.at("23:59").do(generate_daily_wordcloud)

while True:
schedule.run_pending()
time.sleep(1)

最后结构

└── scripts
├── WordCloud
│   ├── __pycache__
│   │   └── wordclouds.cpython-310.pyc
│   ├── groups
│   ├── init_wcdb.py
│   ├── wordcloud.db
│   └── wordclouds.py