github监控
2024-6-13 ~ 2024-6-14
功能设计
首先还是初步设计一下框架
|--main.py
|---scripts/
| |github/
| |githubs.py
| |update.py
| |github.db
其中githubs.py主要功能是用户添加与删除监控,由于github这个东西吧自己关注了不代表别人想关注,因此就不打算群发了,并且只能添加一个用户,即
用户添加一个账号A,脚本处理读取其following,然后对following的用户分别读取repo,对于repo在指定时间内更新的,就通过私聊的方式发送给该用户。同时为了限制一个人只能发送一个账号,也需要进行判断。这里不打算单独添加github账号的原因是:用户单独添加也麻烦,并且自己没following还悄悄关注有点怪。
仅可添加一个用户名(以additionalName为准,即github.com/XXX中的XXX),如果该用户关注的其他用户的github有更新,会以私信的方式发送。同时一个人只能添加一个用户。
/github add [username] 添加github用户
/github del [username] 删除github用户
/github info 查看当前监控用户
首先是安装一些要用到的库
pip install schedule
然后就是开始写脚本
修改main.py
由于没有令牌,拿github api的时候会出现限制问题,因此需要去申请一下github API令牌,权限给repo和user就可以了
然后main.py修改这样:
from scripts.bilibili.bilibili import handle_bili_command
from scripts.bilibili.update import main as bili_update_main
from scripts.github.githubs import handle_git_command
from scripts.github.update import main as github_update_main
然后呢这个得加上好友,所以需要添加好友处理
def accept_friend_request(flag):
url = f"{base_url}/set_friend_add_request"
params = {
"flag": flag,
"approve": 'true',
"access_token": access_token
}
requests.get(url, params=params)
…………
def receive_event():
elif data['post_type'] == 'request' and data['request_type'] == 'friend':
flag = data['flag']
accept_friend_request(flag)
githubs.py
import requests
import sqlite3
import os
DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'github.db')
async def handle_git_command(message, user_id, group_id, send_group_message, send_private_message, github_token):
parts = message.split()
if len(parts) < 2:
send_group_message(group_id, "命令格式错误。")
return
command = parts[1].lower()
if command == 'add' and len(parts) == 3:
username = parts[2]
if not check_github_user_exists(username, github_token):
send_group_message(group_id, f"GitHub 用户 {username} 不存在。")
return
add_user(user_id, username, group_id, send_group_message)
elif command == 'del' and len(parts) == 3:
username = parts[2]
delete_user(user_id, username, group_id, send_group_message)
elif command == 'info':
info_user(user_id, group_id, send_group_message)
else:
send_group_message(group_id, "命令格式错误。")
def check_github_user_exists(username, github_token):
url = f'https://api.github.com/users/{username}'
headers = {'Authorization': f'token {github_token}'}
response = requests.get(url, headers=headers)
return response.status_code == 200
def add_user(user_id, username, group_id, send_group_message):
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('INSERT OR REPLACE INTO github_users (user_id, github_username) VALUES (?, ?)',
(user_id, username))
conn.commit()
conn.close()
send_group_message(group_id, f"已添加监控 GitHub 用户: {username}")
def delete_user(user_id, username, group_id, send_group_message):
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('DELETE FROM github_users WHERE user_id = ? AND github_username = ?',
(user_id, username))
conn.commit()
conn.close()
send_group_message(group_id, f"已删除监控 GitHub 用户: {username}")
def info_user(user_id, group_id, send_group_message):
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('SELECT github_username FROM github_users WHERE user_id = ?', (user_id,))
row = cursor.fetchone()
conn.close()
if row:
send_group_message(group_id, f"当前监控的 GitHub 用户: {row[0]}")
else:
send_group_message(group_id, "当前没有监控的 GitHub 用户。")
update.py
import requests
import sqlite3
import schedule
import time
import os
from datetime import datetime, timedelta
import logging
DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'github.db')
logging.basicConfig(level=logging.INFO)
def get_following(username, github_token):
url = f'https://api.github.com/users/{username}/following'
headers = {'Authorization': f'token {github_token}'}
response = requests.get(url, headers=headers)
try:
response.raise_for_status()
data = response.json()
if isinstance(data, list):
return [user['login'] for user in data]
else:
logging.error(f"Unexpected response format: {data}")
return []
except requests.RequestException as e:
logging.error(f"Failed to fetch following users: {e}")
return []
def get_recent_repos(username, github_token):
url = f'https://api.github.com/users/{username}/repos'
headers = {'Authorization': f'token {github_token}'}
response = requests.get(url, headers=headers)
try:
response.raise_for_status()
repos = response.json()
recent_repos = []
for repo in repos:
updated_at = datetime.strptime(repo['updated_at'], '%Y-%m-%dT%H:%M:%SZ')
pushed_at = datetime.strptime(repo['pushed_at'], '%Y-%m-%dT%H:%M:%SZ')
if datetime.utcnow() - updated_at <= timedelta(minutes=2) or datetime.utcnow() - pushed_at <= timedelta(minutes=2):
recent_repos.append(repo)
return recent_repos
except requests.RequestException as e:
logging.error(f"Failed to fetch repositories: {e}")
return []
def check_updates(send_private_message, github_token):
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('SELECT user_id, github_username FROM github_users')
rows = cursor.fetchall()
for row in rows:
user_id = row[0]
github_username = row[1]
following_users = get_following(github_username, github_token)
for user in following_users:
recent_repos = get_recent_repos(user, github_token)
for repo in recent_repos:
updated_at = datetime.utcnow() + timedelta(hours=8)
updated_time = updated_at.strftime('%Y-%m-%d %H:%M:%S')
time_diff = updated_at - datetime.strptime(repo['updated_at'], '%Y-%m-%dT%H:%M:%SZ')
if time_diff < timedelta(minutes=1):
time_message = "Updated now"
elif time_diff < timedelta(minutes=2):
time_message = "Updated 1 minute ago"
else:
time_message = "Updated 2 minutes ago"
logging.info(f"Sending update for repo {repo['name']} to user {user_id}")
send_private_message(user_id, f"{user} updated repository {repo['name']} at {repo['html_url']} ({time_message})")
conn.close()
def start_update_schedule(send_private_message, github_token):
schedule.every(2).minutes.do(check_updates, send_private_message, github_token)
while True:
schedule.run_pending()
time.sleep(1)
def main(send_private_message, github_token):
start_update_schedule(send_private_message, github_token)
if __name__ == '__main__':
github_token = "" # 替换为你的GitHub API令牌
main(print, github_token)
init_github.py
import sqlite3
import os
DATABASE_PATH = os.path.join(os.path.dirname(__file__), 'github.db')
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS github_users (
user_id TEXT PRIMARY KEY,
github_username TEXT
)
''')
conn.commit()
conn.close()
最后先运行init_github.py,再运行main.py即可
├── help.txt
├── main.py
├── scripts
│ ├── bilibili
│ │ ├── __pycache__
│ │ │ ├── bilibili.cpython-310.pyc
│ │ │ └── update.cpython-310.pyc
│ │ ├── bilibili.db
│ │ ├── bilibili.py
│ │ ├── initialize_db.py
│ │ ├── log.txt
│ │ └── update.py
│ └── github
│ ├── __pycache__
│ │ ├── githubs.cpython-310.pyc
│ │ └── update.cpython-310.pyc
│ ├── github.db
│ ├── githubs.py
│ ├── init_github.py
│ └── update.py