XXXBot消息中转分发平台的设计与实现

概述

IMMP平台(以下简称平台,访问地址:https://immp.idbhost.com ,点击阅读原文直达)的设计初衷是数据采集与消费分离,各类Bot采集数据后,下游消费程序可多端消费,并且Bot可统一提供对外API服务(按需,如IMBot会有一系列的API服务需对外提供),主要解决以下应用场景:

?注册账号后,各类Bot程序接入,包括不限于各类IMBot,WebBot等,标准化形式进行数据上报(需加密)?为各类Bot的API发布提供统一标准,如消息接收(支持Pull和Push两种方式),发送消息,获取联系人等?平台提供统一的Bot监控提醒,支持消息是否云端存储,敏感消息可不存储只作为消息统一中转。

接入平台的Bot通过实时上报数据(默认消息不落地,请及时消费,否则可能造成消息丢失),供下游程序消费并处理,下游程序可通过httpswss消费掉消息,我们设计这个平台的初衷是将各类IMBot消息汇总上报并对外提供统一接口,关于IM的接口可参考:(API文档点击这里[1])。

关于数据消费支持一处生产(XXBot生产上报的数据),多端消费(https接口支持多端消费,每个消费者各自维护消息顺序号)。

编写XXXBot

如果需要编写Bot,可按以下步骤进行:

1.从github上获取每种聊天工具的破解协议和破解方法,开发聊天采集工具(包括收发,以下简称采集工具),此块业务请自行解决(我们推荐采用官方API收发消息)。2.将第一步的采集工具接入平台非常简单,仅需以下几步:

?处理登录?处理心跳?处理加解密:主要是ws通信需对请求体加密,对回调内容解密,采用AES算法?处理数据上报?提供平台对外接口:如发送消息,获取联系人等

我们从以下Python示例程序,可完整了解整个过程,代码如下:

## 安装依赖:python-socketio==5.0.1 python-engineio==4.3.4 pycryptodome import time import uuid import datetime import json from apscheduler.schedulers.blocking import BlockingScheduler import socketio import socket,requests from Crypto.Cipher import AES import hashlib   UID = '平台提供的机器码' URL_WS = 'https://immp.idbhost.com' URL_PROD = 'https://immp.idbhost.com/prod' sio = socketio.Client(logger=False) VERSION = '20221102' scheduler = BlockingScheduler() secret_key = '平台公钥' machineCode = str(uuid.uuid4()).replace('-','') robotId = 'demo1' # 可以使用当前登录的qq号或微信号   class AesEcbCrypt:     def __init__(self, key):         self.key = key         self.BS = AES.block_size     def padding_pkcs5(self, value):         return str.encode(value + (self.BS - len(value) % self.BS) * chr(self.BS - len(value) % self.BS))     def get_sha1prng_key(self,mykey:str):         signature = hashlib.sha1(mykey.encode()).digest()         signature = hashlib.sha1(signature).digest()         return ''.join(['%02x' % i for i in signature]).upper()[:32]     def encrypt(self, value: str) -> str:         _key = self.get_sha1prng_key(self.key)         cryptor = AES.new(bytes.fromhex(_key), AES.MODE_ECB)         padding_value = self.padding_pkcs5(value)  # padding content with pkcs5         ciphertext = cryptor.encrypt(padding_value)         return ''.join(['%02x' % i for i in ciphertext]).upper()     def decrypt(self, value: str) -> str:         ''' AES/ECB/NoPadding decrypt '''         key = bytes.fromhex(self.get_sha1prng_key(self.key))         cryptor = AES.new(key, AES.MODE_ECB)         ciphertext = cryptor.decrypt(bytes.fromhex(value))         return self.padding_zero(str(ciphertext, "utf-8"))     def padding_zero(self, value):         list = []         for c in value:             # ascii码范围获取             if ord(c) > 31 & ord(c) < 127:                 list.append(c)         return ''.join(list)     @scheduler.scheduled_job("cron", day_of_week='*', hour='*', minute='*', second='*/10') def heartbeat():     global machineCode     ip = [(s.connect(('8.8.8.8', 53)),            s.getsockname()[0], s.close())           for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]     data = json.dumps(dict(uid=UID, version=VERSION,machinecode=machineCode,terminaltype="0"))     AES_SIO = AesEcbCrypt(secret_key)     data = AES_SIO.encrypt(data)     sio.emit('heartbeat', data=data, namespace='/heartbeat')   def post(url,data = {},format='str'):     if format == 'json':         resp = requests.post(url = url,json = data)     else:         resp = requests.post(url = url,data = json.dumps(data))     return resp.json()   def task():     scheduler.start()   def mockResvMsg():     # 登录后,模拟向服务端发送一条聊天消息     postUrl = URL_PROD + '/ss/wechat/user/collectQuotationMessage/' + UID     curDateStr = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')     params = {         'robotId': robotId,         'source': 'wechatAgent',         'receiveDate': curDateStr,         'content': '消息内容',         'groupName': '群聊名',         'groupId': '1111',         'senderName': '发送人名称',         'senderId': '22222',         'receiverId':'33333',         'receiverName':'接收人名称',         'msgId':str(uuid.uuid4()).replace(('-',''))     }     res = post(postUrl, params,format='json')     print('post iqb return: %s' % json.dumps(res))   @sio.on('login') def on_connect(*args):     global SELF_USER_INFO,machineCode,secret_key,robotId     aes = AesEcbCrypt('平台公钥')     mystr = aes.decrypt(*args)     msg = json.loads(mystr)     # 用户私钥     secret_key = msg.get("secretKey")     # 登录信息发往immp     selfInfos = []     SELF_USER_INFO = dict(wxid=robotId, wxNickName='测试账号1', wxSex='',                           wxSignature='', wxBigAvatar='',                           wxNation='CN', wxProvince='Zhejiang',                           wxCity='Ningbo',                           wxSmallAvatar='', wxPhoneNumber='', pid=machineCode)     selfInfos.append(SELF_USER_INFO)     # 处理ws登录     AES = AesEcbCrypt(secret_key)     selfInfo = AES.encrypt(json.dumps(dict(contactLimit=9, list=selfInfos)))     sio.emit(event='login', data=selfInfo)       # 可打开注释,模拟采集工具收到消息后,发往平台     # time.sleep(3)     # mockResvMsg()   @sio.on('sendMessage') def on_sendMessage(*args):     try:         aes = AesEcbCrypt(secret_key)         mystr = aes.decrypt(*args)     except:         print('on sendmessage failed.')         return     msg = json.loads(mystr)     receiverId = msg.get('receiverId')     msgText = msg.get('content')     # 这里根据收到的解密报文处理您的发送消息事件     print('收到发送消息请求,请自行处理:', msg)   # 这里处理对外发布API的响应消息,必须5秒内返回接口,否则消费端无法使用 @sio.on('dynamicApi') def on_dynamicApi(*args):     try:         aes = AesEcbCrypt(secret_key)         mystr = aes.decrypt(*args)     except:         print('on sendmessage failed.')         return     msg = json.loads(mystr)     # 这里根据收到的解密报文处理您的发送消息事件,param对象里附带了事件请求参数列表     postUrl = URL_PROD + '/ss/wechat/user/notify'     _paramTest = msg.get('param').get('title')     if _paramTest is None:         _paramTest = ''     res = post(postUrl, {         # traceId必须传入         'traceId': msg.get('traceId'),         # 返回对外提供API的响应结果         'result': 'hello,你好!' + _paramTest     }, format='json')     print('发送响应:',res)   if __name__ == "__main__":     headers = dict(uid=UID, version=VERSION,machinecode=machineCode,terminaltype="0")     sio.connect(url=URL_WS, headers=headers, namespaces=['/', '/heartbeat'])     sio.start_background_task(task())     sio.wait()

数据加密

平台ws通信数据都需要加密,采用AES加密算法,详见以上代码。

处理登录

首先,通过以上代码可见,需要处理聊天工具与IMMP平台的登录,建立长连接,以便将采集工具采集到聊天消息及时发送到IMMP平台供下游消费,并且处理下游程序与聊天采集工具的统一通信。

?连接地址:https://immp.idbhost.com?对接协议:websocket/socket.io?headers:连接ws时附带的header头信息必须包含以下信息

{   uid:平台分配的机器码,    version:'1.0.0',   machinecode:终端码,   terminaltype:"0" }


注:headers信息无需加密,终端码可随机生成一个uuid,terminaltype区分聊天工具:0(微信),1(QQ),2(企业微信)

处理心跳

成功登录后,就需要维持采集工具与平台之间的心跳,定时通过websocket向IMMP平台发送心跳,假设siosocketio客户端对象

?协议:websocket?请求方向:发送消息?sio.emit('heartbeat', data=data, namespace='/heartbeat') data对象加密前的数据结构:data = json.dumps(dict(ip='xx.xx.xx.xx'))

聊天消息上报

采集工具可实时接收聊天消息,聊天消息通过websocket上报到平台即可,详见以上代码。

?对接协议:http?接口地址:https://immp.idbhost.com/ss/wechat/user/collectQuotationMessage/平台机器码

文件上传与下载

如果要发送文件或图片,需要将文件先上传后才能发送,消息体内容形如:fileid://文件ID,文件上传和下载请调用我们的网关服务,具体请咨询我们。

发送消息

?连接地址:https://immp.idbhost.com?对接协议:websocket/socket.io?侦听事件名:sendMessage?侦听事件参数:*args,解密后可得:

{   "content":"消息内容",   "loginId":"登录QQ或微信号",   "msgType":"0", // 0:文本,1:文件,其中图片类型会自动按图片发送   "receiverId":"接收者微信或qq号或群号",   "type":"1" // 1(对聊),2(群聊) }


其他接口对接

其他接口用户可根据业务需要自行扩展,主要使用dynamicApi接收来自平台下发的指令,在侦听函数里通过调用Rest接口将响应结果告知平台,平台再返回给客户,其中traceId必须传入,响应结果必须在5秒内完成,否则平台接口会返回超时。详见以上代码,其中客户端调用接口如下:

?接口地址:https://immp.idbhost.com/api/open/api/invokeAPI?协议:http?请求参数:eventName:事件类型,loginId:平台里看到登录QQ号或微信号,param事件名对应的请求参数列表

References

[1] API文档点击这里: http://doc.sumslack.com/web/#/p/8a1f3f85b13cc50a53b560bfa647de28



作者:空山雪林

来源:微信公众号:Sumslack团队

出处:https://mp.weixin.qq.com/s/bgkdJ2j4KWQN7QQxv3Au0Q

版权声明:

作者: freeclashnode

链接: https://www.freeclashnode.com/news/article-3300.htm

来源: FreeClashNode

文章版权归作者所有,未经允许请勿转载。

免费节点实时更新

热门文章

最新文章

归档