之前写爬虫的时候想找B站的直播流接口抓源,结果发现了个人主页有个监听直播间状态的XHR,就心血来潮想写个开播提醒的QQ机器人,顺便研究了下现有的机器人框架。
go-cqhttp协议层和nonebot2框架层搭建
查了下才发现之前大名鼎鼎的酷Q已经是昨日黄花了,不过现在也有较为完整的QQbot协议onebot了,综合考虑了下决定用比较简单的GO-CQhttp搭建,具体的搭建很容易,查文档就行,具体就是找相应的下下来再改个yml配置文件,为了方便下面api文件的编写,最好用websocket反向代理。
Websocket是与http不同的另一种协议,优势是可以做到双向的通信,在推送上比用HTTP更高效,不过websocket不能用一般的Flask编写后端,再加上直接写后端api可扩展性不好,因此就用nonebot2的框架好了,具体的搭建参考文档即可,其实就是pip下依赖和修改配置文件,之后的插件调用在bot.py修改。
简单解释下cqhttp与nonebot2的关系,cqhttp是onebotQQ机器人协议的具体Go实现,运行在服务器的端口,将QQ服务器发送至机器人QQ号的内容进行符合onebot协议的封装处理,处理后的数据经反向代理发送至后端,nonebot2是后端的机器人框架,可以利用cqhttp协议中的对象完成数据处理和实现具体的api操作,经过处理后再将内容返回CQhttp并发送给QQ服务器。
基于python的具体api插件编写
如前所述,QQ机器人的协议层和框架层已经搭建好了,之后才是真正的api插件编写。为了性能更好,nonebot2的api编写用的python异步编程,我对异步不了解,但并不影响具体的api实现,只是性能差点。方便起见,先写个自动同意加好友和加群请求的插件:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | from nonebot import on_requestfrom nonebot.adapters.cqhttp import Bot, FriendRequestEvent, GroupRequestEvent
 
 from nonebot.typing import T_State
 
 
 friend_req = on_request(priority=5)
 
 @friend_req.handle()
 async def friend_agree(bot: Bot, event: FriendRequestEvent, state: T_State):
 
 await bot.set_friend_add_request(flag=event.flag, approve=True)
 
 group_invite = on_request(priority=5)
 @group_invite.handle()
 async def group_agree(bot: Bot, event: GroupRequestEvent, state: T_State):
 await bot.set_group_add_request(flag=event.flag, sub_type='invite', approve=True)
 
 | 
async def定义异步函数,await异步执行操作,具体的框架编写标准写在注释里了。事件响应器,响应规则,时间处理器之类的概念查文档即可,框架官方文档永远是最重要的参考资料。
之后就开播信息推送api的编写了,先想思路: 首先得找到B站推送开播信息的api接口,然后每隔一段时间对这个接口发起请求,判断是否开播,开播即推送开播信息,未开播则结束程序等待下一次请求。
思路很清晰,先找api接口,直播时会在个人主页页面将置顶视频换为直播间链接,找了下果然在一个XHR里有个json是直播间数据,liveStatus为0时未开播,为1时在直播中。这个接口直链为https://api.bilibili.com/x/space/acc/info?mid=1265680561&jsonp=jsonp 其中mid的参数是uid。
然后是定时执行任务的插件,用nonebot_plugin_apscheduler即可,跨插件访问可以参考nonebot2文档的教程。然后就是requests和json库请求和处理json数据,并返回相应的结果。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 
 | from nonebot import on_command, require, get_botimport requests
 import json
 import sqlite3
 import _thread
 scheduler = require('nonebot_plugin_apscheduler').scheduler
 
 @scheduler.scheduled_job('interval', minutes=2, id='live')
 
 async def lives_pusher():
 bot = get_bot()
 
 v_id = 1111111111
 group_id = 659332197
 
 url0 = 'https://api.bilibili.com/x/space/acc/info?mid='
 url = url0 + str(v_id) + '&jsonp=jsonp'
 r = requests.get(url)
 r_json = json.loads(r.text)
 
 live_status = r_json['data']['live_room']['liveStatus']
 if live_status == 1:
 title = r_json['data']['live_room']['title']
 live_url = r_json['data']['live_room']['url']
 img_url = r_json['data']['live_room']['cover']
 cq = "[CQ:image,file=" + img_url + ",id=40000]"
 name = r_json['data']['name']
 await bot.send_group_msg(group_id=group_id, message=cq)
 await bot.send_group_msg(group_id=group_id, message=(f"你推的{name}开播啦!\n直播标题:{title}\n链接:{live_url}"))
 else:
 continue
 
 | 
思路很清晰,写起来也容易,但测试了下发现明显有个麻烦的问题,推送后无法终止,只会在下一次启动时再推送,倒是可以用sleep(6000),不过就无法在终止时间内监听直播状态了。而且这样写也只能推送一个V的直播。
想了想,因为在这个python文件内的变量在程序终止后都会清除,应该只能用数据库了,方便起见用的sqlite3,sqlite3增删改查操作。
既然用了数据库,干脆也加个前端加数据的api插件好了,顺便熟悉下python操作sqlite3数据库:
先在shell用python3进入命令行操作python,在目标目录用sql新建个库和表,再插入一条简单事例记录
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | import sqlite3con = sqlite3.connect('vtb.db')
 
 cur = con.cursor()
 sql = "CREATE TABLE IF NOT EXISTS uid(id INTEGER PRIMARY KEY,uid TEXT,push_times INTEGER)"
 cur.execute(sql)
 
 cur.execute('INSERT INTO uid VALUES(?,?,?)', (128,11111111,0))
 con.commit()
 
 
 | 
然后就是编写具体api插件
| 12
 3
 4
 5
 6
 7
 8
 
 | import sqlite3add = on_command("add", aliases={"add","添加推送"}, priority=5, rule=to_me())
 
 async def add_pusher(bot: Bot, event: Event, state: T_State):
 con = sqlite3.connect('/root/nonebot/Seren/seren/plugins/vtb.db')
 
 cur = con.cursor()
 
 
 | 
从cqhttp对象内获取发送信息内的uid(查文档得知具体的对象名),并将其插入数据库
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | uid = str(event.get_message())cur.execute("SELECT * FROM uid")
 data = cur.fetchall()
 num = data[1][0] - 1
 
 cur.execute('INSERT INTO uid VALUES(?,?,?)', (num,uid,0))
 con.commit()
 cur.close()
 con.close()
 await add.send(Message(str(uid))+"已添加至推送列表")
 
 | 
然后再用try-except加个简单的异常处理,防止前端添加的数据不是int在之后遍历时报错:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 
 | from nonebot import on_commandfrom nonebot.typing import T_State
 from nonebot.rule import to_me
 from nonebot.adapters import Bot, Event
 from nonebot.adapters.cqhttp.message import Message
 import sqlite3
 add = on_command("add", aliases={"add","添加推送"}, priority=5, rule=to_me())
 
 @add.handle()
 async def add_pusher(bot: Bot, event: Event, state: T_State):
 con = sqlite3.connect('/root/nonebot/Seren/seren/plugins/vtb.db')
 cur = con.cursor()
 try:
 message = str(event.get_message())
 uid = int(message)
 
 cur.execute("SELECT * FROM uid")
 data = cur.fetchall()
 num = data[1][0] - 1
 cur.execute('INSERT INTO uid VALUES(?,?,?)', (num,uid,0))
 con.commit()
 cur.close()
 con.close()
 await add.send(Message(str(uid))+"已添加至推送列表")
 except:
 cur.close()
 con.close()
 await add.send("请输入UID😅")
 
 | 
之后就是推送api的编写,其实就是加上数据库操作和循环遍历,完整插件源码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 
 | from nonebot import on_command, require, get_botimport requests
 import json
 import sqlite3
 import _thread
 scheduler = require('nonebot_plugin_apscheduler').scheduler
 
 @scheduler.scheduled_job('interval', minutes=2, id='live')
 async def lives_pusher():
 bot = get_bot()
 
 group_id = *************
 
 con = sqlite3.connect(r"/root/nonebot/Seren/seren/plugins/vtb.db")
 cur = con.cursor()
 cur.execute("SELECT * FROM uid")
 obj = cur.fetchall()
 v_num = len(obj)
 
 for i in range(1, v_num):
 v_id = obj[i][1]
 url0 = 'https://api.bilibili.com/x/space/acc/info?mid='
 url = url0 + str(v_id) + '&jsonp=jsonp'
 r = requests.get(url)
 r_json = json.loads(r.text)
 live_status = r_json['data']['live_room']['liveStatus']
 v_id = obj[i][1]
 push_times = obj[i][2]
 if live_status == 1:
 if push_times == 0:
 cur.execute("UPDATE uid SET pusht=1 WHERE uid='%d'"%v_id)
 con.commit()
 title = r_json['data']['live_room']['title']
 live_url = r_json['data']['live_room']['url']
 img_url = r_json['data']['live_room']['cover']
 cq = "[CQ:image,file=" + img_url + ",id=40000]"
 name = r_json['data']['name']
 await bot.send_group_msg(group_id=group_id, message=cq)
 await bot.send_group_msg(group_id=group_id, message=(f"你推的{name}开播啦!\n直播标题:{title}\n链接:{live_url}"))
 await bot.send_private_msg(user_id=user_id, message=cq)
 await bot.send_private_msg(user_id=user_id, message=(f"你推的{name}开播啦!\n直播标题:{title}\n链接:{live_url}"))
 else:
 continue
 else:
 cur.execute("UPDATE uid SET pusht=0 WHERE uid='%d'"%v_id)
 con.commit()
 
 | 
测试下,效果如下图:


大功告成咯。
本来想自己用,不过前几天看有人说开播提醒群之类的,就想稍微改改能批量发送推从信息:
添加推送的用户:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | from nonebot import on_commandfrom nonebot.typing import T_State
 from nonebot.rule import to_me
 from nonebot.adapters import Bot, Event
 from nonebot.adapters.cqhttp.message import Message
 import sqlite3
 adduser = on_command("add_user", aliases={"adduser","添加用户"}, priority=5, rule=to_me())
 
 @adduser.handle()
 async def add_user(bot: Bot, event: Event, state: T_State):
 con = sqlite3.connect('/root/nonebot/Seren/seren/plugins/times.db')
 cur = con.cursor()
 try:
 message = str(event.get_message())
 qqid = int(message)
 cur.execute("SELECT * FROM users")
 data = cur.fetchall()
 num = data[0][0] - 1
 cur.execute('INSERT INTO users VALUES(?,?)', (num,qqid))
 con.commit()
 cur.close()
 con.close()
 await adduser.send(Message(str(qqid))+"已添加至推送列表")
 except:
 await adduser.send("格式错误!请输入QQ号捏~")
 ~
 
 | 
推送api:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 
 | from nonebot import on_command, require, get_botfrom nonebot.typing import T_State
 from nonebot.adapters import Bot, Event
 from nonebot.adapters.cqhttp.message import Message
 import nonebot.adapters.cqhttp
 import requests
 import json
 import time
 import sqlite3
 import _thread
 
 scheduler = require('nonebot_plugin_apscheduler').scheduler
 
 @scheduler.scheduled_job('interval', minutes=2, id='Seren')
 async def live_pusher():
 bot = get_bot()
 con = sqlite3.connect(r"/root/nonebot/Seren/seren/plugins/times.db")
 cur = con.cursor()
 cur.execute('SELECT * FROM times')
 data = cur.fetchone()
 push_times = data[0]
 
 v_id = 1437582453
 
 url0 = 'https://api.bilibili.com/x/space/acc/info?mid='
 url = url0 + str(v_id) + '&jsonp=jsonp'
 r = requests.get(url)
 r_json = json.loads(r.text)
 live_status = r_json['data']['live_room']['liveStatus']
 if live_status == 1:
 if push_times == 0:
 cur.execute("UPDATE times SET id=1 WHERE name='times'")
 con.commit()
 cur.execute("SELECT * FROM users")
 objj = cur.fetchall()
 u_num = len(objj)
 title = r_json['data']['live_room']['title']
 live_url = r_json['data']['live_room']['url']
 img_url = r_json['data']['live_room']['cover']
 cq = "[CQ:image,file="+ img_url + ",id=40000]"
 for i in range(0,u_num):
 userid = objj[i][1]
 await bot.send_private_msg(user_id=userid, message=cq)
 await bot.send_private_msg(user_id=userid, message=(f"莲宝开播啦!\n直播标题:{title}\n链接:{live_url}"))
 cur.close()
 con.close()
 else:
 cur.close()
 con.close()
 else:
 cur.execute("UPDATE times SET id=0 WHERE name='times'")
 con.commit()
 cur.close()
 con.close()
 
 | 
第一次因为没测试出了个麻烦的bug,不过现在已经解决了。
没想到这么快就要更新下了,因为nonebot2在前段时间的更新,所以本博客内的大部分源码都不能跑了,不过修改下几个具体的包名称也不算很麻烦,具体改动参见https://github.com/nonebot/discussions/discussions/74。