XXXBot消息中转分发平台的设计与实现
概述
IMMP平台(以下简称平台,访问地址:https://immp.idbhost.com ,点击阅读原文直达)的设计初衷是数据采集与消费分离,各类Bot采集数据后,下游消费程序可多端消费,并且Bot可统一提供对外API服务(按需,如IMBot会有一系列的API服务需对外提供),主要解决以下应用场景:
?注册账号后,各类Bot程序接入,包括不限于各类IMBot,WebBot等,标准化形式进行数据上报(需加密)?为各类Bot的API发布提供统一标准,如消息接收(支持Pull和Push两种方式),发送消息,获取联系人等?平台提供统一的Bot监控提醒,支持消息是否云端存储,敏感消息可不存储只作为消息统一中转。
接入平台的Bot通过实时上报数据(默认消息不落地,请及时消费,否则可能造成消息丢失),供下游程序消费并处理,下游程序可通过https或wss消费掉消息,我们设计这个平台的初衷是将各类IMBot消息汇总上报并对外提供统一接口,关于IM的接口可参考:(API文档点击这里[1])。
关于数据消费支持一处生产(XXBot生产上报的数据),多端消费(https接口支持多端消费,每个消费者各自维护消息顺序号)。
编写XXXBot
如果需要编写Bot,可按以下步骤进行:
1.从github上获取每种聊天工具的破解协议和破解方法,开发聊天采集工具(包括收发,以下简称采集工具),此块业务请自行解决(我们推荐采用官方API收发消息)。2.将第一步的采集工具接入平台非常简单,仅需以下几步:
?处理登录?处理心跳?处理加解密:主要是ws通信需对请求体加密,对回调内容解密,采用AES算法?处理数据上报?提供平台对外接口:如发送消息,获取联系人等
我们从以下Python示例程序,可完整了解整个过程,代码如下:
## 安装依赖:python-socketio==5.0.1 python-engineio==4.3.4 pycryptodome import time import uuid import datetime import json from apscheduler.schedulers.blocking import BlockingScheduler import socketio import socket,requests from Crypto.Cipher import AES import hashlib UID = '平台提供的机器码' URL_WS = 'https://immp.idbhost.com' URL_PROD = 'https://immp.idbhost.com/prod' sio = socketio.Client(logger=False) VERSION = '20221102' scheduler = BlockingScheduler() secret_key = '平台公钥' machineCode = str(uuid.uuid4()).replace('-','') robotId = 'demo1' # 可以使用当前登录的qq号或微信号 class AesEcbCrypt: def __init__(self, key): self.key = key self.BS = AES.block_size def padding_pkcs5(self, value): return str.encode(value + (self.BS - len(value) % self.BS) * chr(self.BS - len(value) % self.BS)) def get_sha1prng_key(self,mykey:str): signature = hashlib.sha1(mykey.encode()).digest() signature = hashlib.sha1(signature).digest() return ''.join(['%02x' % i for i in signature]).upper()[:32] def encrypt(self, value: str) -> str: _key = self.get_sha1prng_key(self.key) cryptor = AES.new(bytes.fromhex(_key), AES.MODE_ECB) padding_value = self.padding_pkcs5(value) # padding content with pkcs5 ciphertext = cryptor.encrypt(padding_value) return ''.join(['%02x' % i for i in ciphertext]).upper() def decrypt(self, value: str) -> str: ''' AES/ECB/NoPadding decrypt ''' key = bytes.fromhex(self.get_sha1prng_key(self.key)) cryptor = AES.new(key, AES.MODE_ECB) ciphertext = cryptor.decrypt(bytes.fromhex(value)) return self.padding_zero(str(ciphertext, "utf-8")) def padding_zero(self, value): list = [] for c in value: # ascii码范围获取 if ord(c) > 31 & ord(c) < 127: list.append(c) return ''.join(list) @scheduler.scheduled_job("cron", day_of_week='*', hour='*', minute='*', second='*/10') def heartbeat(): global machineCode ip = [(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1] data = json.dumps(dict(uid=UID, version=VERSION,machinecode=machineCode,terminaltype="0")) AES_SIO = AesEcbCrypt(secret_key) data = AES_SIO.encrypt(data) sio.emit('heartbeat', data=data, namespace='/heartbeat') def post(url,data = {},format='str'): if format == 'json': resp = requests.post(url = url,json = data) else: resp = requests.post(url = url,data = json.dumps(data)) return resp.json() def task(): scheduler.start() def mockResvMsg(): # 登录后,模拟向服务端发送一条聊天消息 postUrl = URL_PROD + '/ss/wechat/user/collectQuotationMessage/' + UID curDateStr = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') params = { 'robotId': robotId, 'source': 'wechatAgent', 'receiveDate': curDateStr, 'content': '消息内容', 'groupName': '群聊名', 'groupId': '1111', 'senderName': '发送人名称', 'senderId': '22222', 'receiverId':'33333', 'receiverName':'接收人名称', 'msgId':str(uuid.uuid4()).replace(('-','')) } res = post(postUrl, params,format='json') print('post iqb return: %s' % json.dumps(res)) @sio.on('login') def on_connect(*args): global SELF_USER_INFO,machineCode,secret_key,robotId aes = AesEcbCrypt('平台公钥') mystr = aes.decrypt(*args) msg = json.loads(mystr) # 用户私钥 secret_key = msg.get("secretKey") # 登录信息发往immp selfInfos = [] SELF_USER_INFO = dict(wxid=robotId, wxNickName='测试账号1', wxSex='', wxSignature='', wxBigAvatar='', wxNation='CN', wxProvince='Zhejiang', wxCity='Ningbo', wxSmallAvatar='', wxPhoneNumber='', pid=machineCode) selfInfos.append(SELF_USER_INFO) # 处理ws登录 AES = AesEcbCrypt(secret_key) selfInfo = AES.encrypt(json.dumps(dict(contactLimit=9, list=selfInfos))) sio.emit(event='login', data=selfInfo) # 可打开注释,模拟采集工具收到消息后,发往平台 # time.sleep(3) # mockResvMsg() @sio.on('sendMessage') def on_sendMessage(*args): try: aes = AesEcbCrypt(secret_key) mystr = aes.decrypt(*args) except: print('on sendmessage failed.') return msg = json.loads(mystr) receiverId = msg.get('receiverId') msgText = msg.get('content') # 这里根据收到的解密报文处理您的发送消息事件 print('收到发送消息请求,请自行处理:', msg) # 这里处理对外发布API的响应消息,必须5秒内返回接口,否则消费端无法使用 @sio.on('dynamicApi') def on_dynamicApi(*args): try: aes = AesEcbCrypt(secret_key) mystr = aes.decrypt(*args) except: print('on sendmessage failed.') return msg = json.loads(mystr) # 这里根据收到的解密报文处理您的发送消息事件,param对象里附带了事件请求参数列表 postUrl = URL_PROD + '/ss/wechat/user/notify' _paramTest = msg.get('param').get('title') if _paramTest is None: _paramTest = '' res = post(postUrl, { # traceId必须传入 'traceId': msg.get('traceId'), # 返回对外提供API的响应结果 'result': 'hello,你好!' + _paramTest }, format='json') print('发送响应:',res) if __name__ == "__main__": headers = dict(uid=UID, version=VERSION,machinecode=machineCode,terminaltype="0") sio.connect(url=URL_WS, headers=headers, namespaces=['/', '/heartbeat']) sio.start_background_task(task()) sio.wait()
数据加密
平台ws通信数据都需要加密,采用AES加密算法,详见以上代码。
处理登录
首先,通过以上代码可见,需要处理聊天工具与IMMP平台的登录,建立长连接,以便将采集工具采集到聊天消息及时发送到IMMP平台供下游消费,并且处理下游程序与聊天采集工具的统一通信。
?连接地址:https://immp.idbhost.com?对接协议:websocket/socket.io?headers:连接ws时附带的header头信息必须包含以下信息
{ uid:平台分配的机器码, version:'1.0.0', machinecode:终端码, terminaltype:"0" }
注:headers信息无需加密,终端码可随机生成一个uuid,terminaltype区分聊天工具:0(微信),1(QQ),2(企业微信)
处理心跳
成功登录后,就需要维持采集工具与平台之间的心跳,定时通过websocket向IMMP平台发送心跳,假设sio是socketio客户端对象
?协议:websocket?请求方向:发送消息?sio.emit('heartbeat', data=data, namespace='/heartbeat') data对象加密前的数据结构:data = json.dumps(dict(ip='xx.xx.xx.xx'))
聊天消息上报
采集工具可实时接收聊天消息,聊天消息通过websocket上报到平台即可,详见以上代码。
?对接协议:http?接口地址:https://immp.idbhost.com/ss/wechat/user/collectQuotationMessage/平台机器码
文件上传与下载
如果要发送文件或图片,需要将文件先上传后才能发送,消息体内容形如:fileid://文件ID,文件上传和下载请调用我们的网关服务,具体请咨询我们。
发送消息
?连接地址:https://immp.idbhost.com?对接协议:websocket/socket.io?侦听事件名:sendMessage?侦听事件参数:*args,解密后可得:
{ "content":"消息内容", "loginId":"登录QQ或微信号", "msgType":"0", // 0:文本,1:文件,其中图片类型会自动按图片发送 "receiverId":"接收者微信或qq号或群号", "type":"1" // 1(对聊),2(群聊) }
其他接口对接
其他接口用户可根据业务需要自行扩展,主要使用dynamicApi接收来自平台下发的指令,在侦听函数里通过调用Rest接口将响应结果告知平台,平台再返回给客户,其中traceId必须传入,响应结果必须在5秒内完成,否则平台接口会返回超时。详见以上代码,其中客户端调用接口如下:
?接口地址:https://immp.idbhost.com/api/open/api/invokeAPI?协议:http?请求参数:eventName:事件类型,loginId:平台里看到登录QQ号或微信号,param事件名对应的请求参数列表
References
[1] API文档点击这里: http://doc.sumslack.com/web/#/p/8a1f3f85b13cc50a53b560bfa647de28
作者:空山雪林
来源:微信公众号:Sumslack团队
出处:https://mp.weixin.qq.com/s/bgkdJ2j4KWQN7QQxv3Au0Q
版权声明:
作者: freeclashnode
链接: https://www.freeclashnode.com/news/article-3300.htm
来源: FreeClashNode
文章版权归作者所有,未经允许请勿转载。
热门文章
- 1月10日|22M/S,Shadowrocket/Clash/SSR/V2ray免费节点订阅链接每天更新
- 1月2日|22.9M/S,SSR/Shadowrocket/Clash/V2ray免费节点订阅链接每天更新
- 12月25日|18.4M/S,Shadowrocket/SSR/Clash/V2ray免费节点订阅链接每天更新
- 1月1日|19.9M/S,SSR/Shadowrocket/V2ray/Clash免费节点订阅链接每天更新
- 12月19日|22M/S,V2ray/Shadowrocket/SSR/Clash免费节点订阅链接每天更新
- 12月20日|19.6M/S,V2ray/Clash/Shadowrocket/SSR免费节点订阅链接每天更新
- 12月31日|18.9M/S,Shadowrocket/SSR/Clash/V2ray免费节点订阅链接每天更新
- 12月28日|18.5M/S,V2ray/SSR/Shadowrocket/Clash免费节点订阅链接每天更新
- 1月9日|21.6M/S,Clash/SSR/Shadowrocket/V2ray免费节点订阅链接每天更新
- 12月26日|20M/S,V2ray/SSR/Shadowrocket/Clash免费节点订阅链接每天更新
最新文章
- 1月17日|19.7M/S,V2ray/SSR/Clash(小猫咪)免费节点订阅链接每天更新
- 1月16日|22.6M/S,SSR/V2ray/Clash(小猫咪)免费节点订阅链接每天更新
- 1月15日|22.7M/S,V2ray/Clash(小猫咪)/SSR免费节点订阅链接每天更新
- 1月14日|21.2M/S,SSR/Shadowrocket/V2ray/Clash免费节点订阅链接每天更新
- 1月13日|18M/S,Shadowrocket/SSR/V2ray/Clash免费节点订阅链接每天更新
- 1月12日|19.6M/S,SSR/Shadowrocket/Clash/V2ray免费节点订阅链接每天更新
- 1月11日|18.5M/S,SSR/V2ray/Clash/Shadowrocket免费节点订阅链接每天更新
- 1月10日|22M/S,Shadowrocket/Clash/SSR/V2ray免费节点订阅链接每天更新
- 1月9日|21.6M/S,Clash/SSR/Shadowrocket/V2ray免费节点订阅链接每天更新
- 1月8日|18.7M/S,V2ray/SSR/Shadowrocket/Clash免费节点订阅链接每天更新