之前写爬虫的时候想找B站的直播流接口抓源,结果发现了个人主页有个监听直播间状态的XHR,就心血来潮想写个开播提醒的QQ机器人,顺便研究了下现有的机器人框架。
go-cqhttp协议层和nonebot2框架层搭建
查了下才发现之前大名鼎鼎的酷Q已经是昨日黄花了,不过现在也有较为完整的QQbot协议onebot了,综合考虑了下决定用比较简单的GO-CQhttp搭建,具体的搭建很容易,查文档就行,具体就是找相应的下下来再改个yml配置文件,为了方便下面api文件的编写,最好用websocket反向代理。
Websocket是与http不同的另一种协议,优势是可以做到双向的通信,在推送上比用HTTP更高效,不过websocket不能用一般的Flask编写后端,再加上直接写后端api可扩展性不好,因此就用nonebot2的框架好了,具体的搭建参考文档即可,其实就是pip下依赖和修改配置文件,之后的插件调用在bot.py修改。
简单解释下cqhttp与nonebot2的关系,cqhttp是onebotQQ机器人协议的具体Go实现,运行在服务器的端口,将QQ服务器发送至机器人QQ号的内容进行符合onebot协议的封装处理,处理后的数据经反向代理发送至后端,nonebot2是后端的机器人框架,可以利用cqhttp协议中的对象完成数据处理和实现具体的api操作,经过处理后再将内容返回CQhttp并发送给QQ服务器。
基于python的具体api插件编写
如前所述,QQ机器人的协议层和框架层已经搭建好了,之后才是真正的api插件编写。为了性能更好,nonebot2的api编写用的python异步编程,我对异步不了解,但并不影响具体的api实现,只是性能差点。方便起见,先写个自动同意加好友和加群请求的插件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from nonebot import on_request from nonebot.adapters.cqhttp import Bot, FriendRequestEvent, GroupRequestEvent
from nonebot.typing import T_State
friend_req = on_request(priority=5)
@friend_req.handle() async def friend_agree(bot: Bot, event: FriendRequestEvent, state: T_State): await bot.set_friend_add_request(flag=event.flag, approve=True) group_invite = on_request(priority=5) @group_invite.handle() async def group_agree(bot: Bot, event: GroupRequestEvent, state: T_State): await bot.set_group_add_request(flag=event.flag, sub_type='invite', approve=True)
|
async def定义异步函数,await异步执行操作,具体的框架编写标准写在注释里了。事件响应器,响应规则,时间处理器之类的概念查文档即可,框架官方文档永远是最重要的参考资料。
之后就开播信息推送api的编写了,先想思路: 首先得找到B站推送开播信息的api接口,然后每隔一段时间对这个接口发起请求,判断是否开播,开播即推送开播信息,未开播则结束程序等待下一次请求。
思路很清晰,先找api接口,直播时会在个人主页页面将置顶视频换为直播间链接,找了下果然在一个XHR里有个json是直播间数据,liveStatus为0时未开播,为1时在直播中。这个接口直链为https://api.bilibili.com/x/space/acc/info?mid=1265680561&jsonp=jsonp 其中mid的参数是uid。
然后是定时执行任务的插件,用nonebot_plugin_apscheduler即可,跨插件访问可以参考nonebot2文档的教程。然后就是requests和json库请求和处理json数据,并返回相应的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| from nonebot import on_command, require, get_bot import requests import json import sqlite3 import _thread scheduler = require('nonebot_plugin_apscheduler').scheduler @scheduler.scheduled_job('interval', minutes=2, id='live')
async def lives_pusher(): bot = get_bot() v_id = 1111111111 group_id = 659332197 url0 = 'https://api.bilibili.com/x/space/acc/info?mid=' url = url0 + str(v_id) + '&jsonp=jsonp' r = requests.get(url) r_json = json.loads(r.text) live_status = r_json['data']['live_room']['liveStatus'] if live_status == 1: title = r_json['data']['live_room']['title'] live_url = r_json['data']['live_room']['url'] img_url = r_json['data']['live_room']['cover'] cq = "[CQ:image,file=" + img_url + ",id=40000]" name = r_json['data']['name'] await bot.send_group_msg(group_id=group_id, message=cq) await bot.send_group_msg(group_id=group_id, message=(f"你推的{name}开播啦!\n直播标题:{title}\n链接:{live_url}")) else: continue
|
思路很清晰,写起来也容易,但测试了下发现明显有个麻烦的问题,推送后无法终止,只会在下一次启动时再推送,倒是可以用sleep(6000),不过就无法在终止时间内监听直播状态了。而且这样写也只能推送一个V的直播。
想了想,因为在这个python文件内的变量在程序终止后都会清除,应该只能用数据库了,方便起见用的sqlite3,sqlite3增删改查操作。
既然用了数据库,干脆也加个前端加数据的api插件好了,顺便熟悉下python操作sqlite3数据库:
先在shell用python3
进入命令行操作python,在目标目录用sql新建个库和表,再插入一条简单事例记录
1 2 3 4 5 6 7 8 9 10
| import sqlite3 con = sqlite3.connect('vtb.db')
cur = con.cursor() sql = "CREATE TABLE IF NOT EXISTS uid(id INTEGER PRIMARY KEY,uid TEXT,push_times INTEGER)" cur.execute(sql)
cur.execute('INSERT INTO uid VALUES(?,?,?)', (128,11111111,0)) con.commit()
|
然后就是编写具体api插件
1 2 3 4 5 6 7 8
| import sqlite3 add = on_command("add", aliases={"add","添加推送"}, priority=5, rule=to_me())
async def add_pusher(bot: Bot, event: Event, state: T_State): con = sqlite3.connect('/root/nonebot/Seren/seren/plugins/vtb.db') cur = con.cursor()
|
从cqhttp对象内获取发送信息内的uid(查文档得知具体的对象名),并将其插入数据库
1 2 3 4 5 6 7 8 9 10
| uid = str(event.get_message()) cur.execute("SELECT * FROM uid") data = cur.fetchall() num = data[1][0] - 1
cur.execute('INSERT INTO uid VALUES(?,?,?)', (num,uid,0)) con.commit() cur.close() con.close() await add.send(Message(str(uid))+"已添加至推送列表")
|
然后再用try-except加个简单的异常处理,防止前端添加的数据不是int在之后遍历时报错:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| from nonebot import on_command from nonebot.typing import T_State from nonebot.rule import to_me from nonebot.adapters import Bot, Event from nonebot.adapters.cqhttp.message import Message import sqlite3 add = on_command("add", aliases={"add","添加推送"}, priority=5, rule=to_me()) @add.handle() async def add_pusher(bot: Bot, event: Event, state: T_State): con = sqlite3.connect('/root/nonebot/Seren/seren/plugins/vtb.db') cur = con.cursor() try: message = str(event.get_message()) uid = int(message) cur.execute("SELECT * FROM uid") data = cur.fetchall() num = data[1][0] - 1 cur.execute('INSERT INTO uid VALUES(?,?,?)', (num,uid,0)) con.commit() cur.close() con.close() await add.send(Message(str(uid))+"已添加至推送列表") except: cur.close() con.close() await add.send("请输入UID😅")
|
之后就是推送api的编写,其实就是加上数据库操作和循环遍历,完整插件源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| from nonebot import on_command, require, get_bot import requests import json import sqlite3 import _thread scheduler = require('nonebot_plugin_apscheduler').scheduler @scheduler.scheduled_job('interval', minutes=2, id='live') async def lives_pusher(): bot = get_bot() group_id = ************* con = sqlite3.connect(r"/root/nonebot/Seren/seren/plugins/vtb.db") cur = con.cursor() cur.execute("SELECT * FROM uid") obj = cur.fetchall() v_num = len(obj) for i in range(1, v_num): v_id = obj[i][1] url0 = 'https://api.bilibili.com/x/space/acc/info?mid=' url = url0 + str(v_id) + '&jsonp=jsonp' r = requests.get(url) r_json = json.loads(r.text) live_status = r_json['data']['live_room']['liveStatus'] v_id = obj[i][1] push_times = obj[i][2] if live_status == 1: if push_times == 0: cur.execute("UPDATE uid SET pusht=1 WHERE uid='%d'"%v_id) con.commit() title = r_json['data']['live_room']['title'] live_url = r_json['data']['live_room']['url'] img_url = r_json['data']['live_room']['cover'] cq = "[CQ:image,file=" + img_url + ",id=40000]" name = r_json['data']['name'] await bot.send_group_msg(group_id=group_id, message=cq) await bot.send_group_msg(group_id=group_id, message=(f"你推的{name}开播啦!\n直播标题:{title}\n链接:{live_url}")) await bot.send_private_msg(user_id=user_id, message=cq) await bot.send_private_msg(user_id=user_id, message=(f"你推的{name}开播啦!\n直播标题:{title}\n链接:{live_url}")) else: continue else: cur.execute("UPDATE uid SET pusht=0 WHERE uid='%d'"%v_id) con.commit()
|
测试下,效果如下图:
大功告成咯。
本来想自己用,不过前几天看有人说开播提醒群之类的,就想稍微改改能批量发送推从信息:
添加推送的用户:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| from nonebot import on_command from nonebot.typing import T_State from nonebot.rule import to_me from nonebot.adapters import Bot, Event from nonebot.adapters.cqhttp.message import Message import sqlite3 adduser = on_command("add_user", aliases={"adduser","添加用户"}, priority=5, rule=to_me()) @adduser.handle() async def add_user(bot: Bot, event: Event, state: T_State): con = sqlite3.connect('/root/nonebot/Seren/seren/plugins/times.db') cur = con.cursor() try: message = str(event.get_message()) qqid = int(message) cur.execute("SELECT * FROM users") data = cur.fetchall() num = data[0][0] - 1 cur.execute('INSERT INTO users VALUES(?,?)', (num,qqid)) con.commit() cur.close() con.close() await adduser.send(Message(str(qqid))+"已添加至推送列表") except: await adduser.send("格式错误!请输入QQ号捏~") ~
|
推送api:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| from nonebot import on_command, require, get_bot from nonebot.typing import T_State from nonebot.adapters import Bot, Event from nonebot.adapters.cqhttp.message import Message import nonebot.adapters.cqhttp import requests import json import time import sqlite3 import _thread scheduler = require('nonebot_plugin_apscheduler').scheduler @scheduler.scheduled_job('interval', minutes=2, id='Seren') async def live_pusher(): bot = get_bot() con = sqlite3.connect(r"/root/nonebot/Seren/seren/plugins/times.db") cur = con.cursor() cur.execute('SELECT * FROM times') data = cur.fetchone() push_times = data[0] v_id = 1437582453 url0 = 'https://api.bilibili.com/x/space/acc/info?mid=' url = url0 + str(v_id) + '&jsonp=jsonp' r = requests.get(url) r_json = json.loads(r.text) live_status = r_json['data']['live_room']['liveStatus'] if live_status == 1: if push_times == 0: cur.execute("UPDATE times SET id=1 WHERE name='times'") con.commit() cur.execute("SELECT * FROM users") objj = cur.fetchall() u_num = len(objj) title = r_json['data']['live_room']['title'] live_url = r_json['data']['live_room']['url'] img_url = r_json['data']['live_room']['cover'] cq = "[CQ:image,file="+ img_url + ",id=40000]" for i in range(0,u_num): userid = objj[i][1] await bot.send_private_msg(user_id=userid, message=cq) await bot.send_private_msg(user_id=userid, message=(f"莲宝开播啦!\n直播标题:{title}\n链接:{live_url}")) cur.close() con.close() else: cur.close() con.close() else: cur.execute("UPDATE times SET id=0 WHERE name='times'") con.commit() cur.close() con.close()
|
第一次因为没测试出了个麻烦的bug,不过现在已经解决了。
没想到这么快就要更新下了,因为nonebot2在前段时间的更新,所以本博客内的大部分源码都不能跑了,不过修改下几个具体的包名称也不算很麻烦,具体改动参见https://github.com/nonebot/discussions/discussions/74。