毕尔巴鄂对阵皇家社会:两支近邻球队将于西甲联赛 “巴斯克德比”中为捍卫荣耀而战贝壳第三季度营收226亿元 经调净利润17.8 亿元 同比下降17.46%AI营销,让科技巨头尝到了大模型商业化的甜头安恒信息范渊在乌镇峰会谈AI:以工具视之、以工具用之、以工具治理之诺基亚与微软再合作,为 Azure 数据中心供货延长五年天岳先进发布业界首款 300mm(12 英寸)N 型碳化硅衬底三星介绍内部安全团队 Project Infinity 攻防演练项目,高效修复 Galaxy 手机平板漏洞上海市将推进低空飞行服务管理能力建设,2027 年底前累计划设相应航线不少于 400 条岁末,海尔给您备好一套“小红花”为什么说Q10K Pro是今年最值得入手的电视?看完这几点就明白了!“小墨方·大不凡”!Brother“小墨方”系列彩喷一体机全新上市黄仁勋:AI智能需求强劲,“物理定律”限制英伟达芯片增长诺基亚与微软再合作,为Azure数据中心供货延长五年国家数据局:到2029年基本建成国家数据基础设施主体结构中国已发展成为全球最大的互联网市场,拥有全球最多的网民和移动互联网用户中国铁塔:计划按照10:1的比例合股美国FCC正式划定5.9GHz频段用于C-V2X技术在AI领域奋起直追!苹果要对Siri大革新 2026年正式发布日本机构公布量子专利榜单:本源量子、国盾量子位居全球第1中国联通:拟向华为、中兴展开5G网络设备竞争性谈判采购
  • 首页 > 数据存储频道 > 数据库频道 > 操作系统与开源

    开源项目:用环信MQTT实现"世界频道"只需5分钟【附源码】

    2022年04月27日 16:09:14   来源:中文科技资讯

      说到“世界频道”想必大家都不陌生,常见的如王者荣耀的世界广播摇人组队以及最近兴起的Discord社区交友等等。究其目的就是在应用内让海量用户可以实时互动。有些开发者为了实现这种场景会选择聊天室方案来实现,但是这种方式存在一定的局限性,比如聊天室人数上限、海量消息处理等各种情况。

      当然如果有钱有颜,可以直接选择云厂商产品(比如环信的聊天室方案和超级社区),如果有才有time,也可以选择平替版MQTT实现方案。今天小猿将介绍用环信MQTT消息云实现应用内的世界频道,满满干货,不要错过~~

      使用MQTT实现世界频道-Demo效果演示

      协议优势:

      在介绍具体方案之前,我们先唠一唠为啥选择MQTT协议。

      轻量级:MQTT本身是物联网的连接协议,专为受限设备和低带宽场景使用。所以其代码占用空间较小,同样适用于注重SDK大小的移动应用领域(比如:游戏领域)。

      易集成:MQTT作为标准开放的消息协议,经过多年演进,已支持30多种开发语言,10余种SDK,无论何种开发环境,都可以快速找到开源SDK。

      高并发:MQTT是轻量级的消息传输协议,2字节心跳报文,最小化传输和连接成本,云厂商broker产品都可支持千万级并发接入,适用于高并发连接场景。

      低成本:MQTT是基于客户端-服务器的订阅/发布模型,通过服务器中间件实现消息分发,减少消息复制成本,快速实现一对多在线推送。

      灵活性:MQTT协议支持多种消息特性,包括:topic主题层级、消息分级(QoS0,1,2)、遗嘱消息、保留消息等,可以灵活实现多种业务场景。

      衍生功能:随着MQTT云服务的发展,部分服务器厂商已支持消息存储、获取在线设备列表、查看历史消息等衍生功能,降低开发工作量与消息存储成本。

      实现方案:

      言归正传,上干货。本次技术实现方案包含:移动客户端(Android)、后端服务(Java)以及MQTT服务器。这里提一下,MQTT服务器使用环信MQTT消息云,使用三方云服务比较省心,既节省开发时间,产品性能也不需要担心,现在注册可以直接使用环信MQTT消息云超高额度的免费版:每月100并发连接、300万消息,完全满足功能开发使用。

      客户端实现:

      客户端实现主要包含以下两部分:

      底层MQTT业务集成:包含引入SDK、MQTT方法封装、业务交互(消息收发)。

      APP上层交互:在APP首页提供世界频道入口,实现心情弹幕飘窗(接收)和发送。

      接下来上底层MQTT业务集成代码。

      引入SDK:

      这一步环信官方文档比较明确,就是根据自己的平台引入相应的mqtt客户端sdk,这里简单贴一下AndroidStudio的引入配置

      1// 在根目录 build.gradle repositories 下加入配置

      2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

      3...

      4// 然后加入 MQTT 依赖

      5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk

      6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'

      7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

      方法封装

      这里贴一下对mqtt相关方法的简单封装,代码在vmmqtt模块儿的MQTTHelper类下:

      1 /**

      2 * Create by lzan13 on 2022/3/22

      3 * 描述:MQTT 帮助类

      4 */

      5 object MQTTHelper {

      6

      7    private var mqttClient: MqttAndroidClient? = null

      8

      9    // 缓存主题集合

      10    private val topicList = mutableListOf()

      11

      12    /**

      13     * 链接MQTT

      14     * @param id 用户 Id

      15     * @param token 用户链接 MQTT 的 Token

      16     * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅

      17     */

      18    fun connect(id: String, token: String, topic: String = "") {

      19        // 处理订阅主题

      20        if (topic.isNotEmpty()) topicList.add(topic)

      21

      22        // 拼接链接地址

      23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

      24        // 拼接 clientId

      25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"

      26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

      27

      28        //连接参数

      29        val options = MqttConnectOptions()

      30        options.isAutomaticReconnect = true //设置自动重连

      31        options.isCleanSession = true // 缓存

      32        options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒

      33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒

      34        options.userName = id // 用户名

      35        options.password = token.toCharArray() // 密码

      36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

      37        // 设置MQTT监听

      38        mqttClient?.setCallback(object : MqttCallback {

      39            override fun connectionLost(t: Throwable) {

      40                // 通知链接断开

      41                VMLog.d("MQTT 链接断开 $t")

      42            }

      43

      44            @Throws(Exception::class)

      45            override fun messageArrived(topic: String, message: MqttMessage) {

      46                // 通知收到消息

      47                VMLog.d("MQTT 收到消息:$message")

      48                // 如果未订阅则直接丢弃

      49                if (!topicList.contains(topic)) return

      50                notifyEvent(topic, String(message.payload))

      51            }

      52

      53            override fun deliveryComplete(token: IMqttDeliveryToken) {}

      54        })

      55        //进行连接

      56        mqttClient?.connect(options, null, object : IMqttActionListener {

      57            override fun onSuccess(token: IMqttToken) {

      58                VMLog.d("MQTT 链接成功")

      59                // 链接成功,循环订阅缓存的主题

      60                topicList.forEach { subscribe(it) }

      61            }

      62

      63            override fun onFailure(token: IMqttToken, t: Throwable) {

      64                VMLog.d("MQTT 链接失败 $t")

      65            }

      66        })

      67    }

      68

      69    /**

      70     * 订阅主题

      71     * @param topic 主题

      72     */

      73    fun subscribe(topic: String) {

      74        if (!topicList.contains(topic)) {

      75            topicList.add(topic)

      76        }

      77        try {

      78            //连接成功后订阅主题

      79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

      80                override fun onSuccess(token: IMqttToken) {

      81                    VMLog.d("MQTT 订阅成功 $topic")

      82                }

      83

      84                override fun onFailure(token: IMqttToken, t: Throwable) {

      85                    VMLog.d("MQTT 订阅失败 $topic $t")

      86                }

      87            })

      88        } catch (e: MqttException) {

      89            e.printStackTrace()

      90        }

      91    }

      92

      93    /**

      94     * 取消订阅

      95     * @param topic 主题

      96     */

      97    fun unsubscribe(topic: String) {

      98        if (topicList.contains(topic)) {

      99            topicList.remove(topic)

      100        }

      101        try {

      102            mqttClient?.unsubscribe(topic)

      103        } catch (e: MqttException) {

      104            e.printStackTrace()

      105        }

      106    }

      107

      108    /**

      109     * 发送 MQTT 消息

      110     * @param topic 主题

      111     * @param content 内容

      112     */

      113    fun sendMsg(topic: String, content: String) {

      114        val msg = MqttMessage()

      115        msg.payload = content.encodeToByteArray() // 设置消息内容

      116        msg.qos = 0 //设置消息发送质量,可为0,1,2.

      117        // 设置消息的topic,并发送。

      118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

      119            override fun onSuccess(asyncActionToken: IMqttToken) {

      120                VMLog.d("MQTT 消息发送成功")

      121            }

      122

      123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

      124                VMLog.d("MQTT 消息发送失败 ${exception.message}")

      125            }

      126        })

      127    }

      128

      129    /**

      130     * 通知 MQTT 事件

      131     */

      132    private fun notifyEvent(topic: String, data: String) {

      133        LDEventBus.post(topic, data)

      134    }

      135 }

      业务交互

      和业务相关的就是在启动APP后,使用后端服务器返回的鉴权token信息及连接封装接口登录环信通MQTT服务器,登录成功后订阅主题并监听消息。

      1// 请求 token 成功后,调用MQTTHelper.connect()链接 MQTT 服务器,这里会同时传递监听的主题

      2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)

      3

      4/**

      5 * 发送匹配信息

      6 */

      7private fun sendMatchInfo() {

      8    if (selfMatch.user.nickname.isEmpty()) return

      9    // 提交自己的匹配信息到服务器

      10    mViewModel.submitMatch(selfMatch)

      11    val json = JSONObject()

      12    json.put("content", selfMatch.content)

      13    json.put("emotion", selfMatch.emotion)

      14    json.put("gender", selfMatch.gender)

      15    json.put("type", selfMatch.type)

      16    val jsonUser = JSONObject()

      17    jsonUser.put("avatar", mUser.avatar)

      18    jsonUser.put("id", mUser.id)

      19    jsonUser.put("nickname", mUser.nickname)

      20    jsonUser.put("username", mUser.username)

      21    json.put("user", jsonUser)

      22    MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())

      23}

      24

      25// 监听消息这里使用了一个事件总线进行通知,在上边封装 MQTTHelper 发送消息也使用了这个,

      26// 订阅 MQTT 事件

      27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {

      28        val match = JsonUtils.fromJson(it, Match::class.java)

      29        // 这里收到匹配信息之后就增加一条弹幕

      30    addBarrage(match)

      31}

      后端服务实现

      接下来介绍后端服务实现,主要包含以下两部分:

      配置连接信息:配置环信MQTT消息云连接信息。

      获取鉴权信息:获取客户端连接需要的鉴权信息。

      配置连接信息

      配置部分只需要按照环信后台配置信息进行替换就好,配置在config目录下的config.xxx.json文件内

      1/**

      2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService

      3 */

      4config.mqtt = {

      5    host: 'mqtt host', // MQTT 链接地址

      6  appId: 'appId', // MQTT AppId

      7  port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)

      8  restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服务 API 地址

      9  clientId: 'client id', // 替换环信后台 clientId

      10  clientSecret: 'client secret', // 替换环信后台 clientSecret

      11};

      获取鉴权信息

      这里主要是获取客户端连接所需要的鉴权信息token,为了安全token肯定是要放在服务器端生成的,废话不多说,上代码:

      1/**

      2 * Create by lzan13 on 2022/3/22

      3 * 描述:MQTT 帮助类

      4 */

      5object MQTTHelper {

      6

      7    private var mqttClient: MqttAndroidClient? = null

      8

      9    // 缓存主题集合

      10    private val topicList = mutableListOf()

      11

      12    /**

      13     * 链接MQTT

      14     * @param id 用户 Id

      15     * @param token 用户链接 MQTT 的 Token

      16     * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅

      17     */

      18    fun connect(id: String, token: String, topic: String = "") {

      19        // 处理订阅主题

      20        if (topic.isNotEmpty()) topicList.add(topic)

      21

      22        // 拼接链接地址

      23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

      24        // 拼接 clientId

      25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"

      26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

      27

      28        //连接参数

      29        val options = MqttConnectOptions()

      30        options.isAutomaticReconnect = true //设置自动重连

      31        options.isCleanSession = true // 缓存

      32        options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒

      33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒

      34        options.userName = id // 用户名

      35        options.password = token.toCharArray() // 密码

      36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

      37        // 设置MQTT监听

      38        mqttClient?.setCallback(object : MqttCallback {

      39            override fun connectionLost(t: Throwable) {

      40                // 通知链接断开

      41                VMLog.d("MQTT 链接断开 $t")

      42            }

      43

      44            @Throws(Exception::class)

      45            override fun messageArrived(topic: String, message: MqttMessage) {

      46                // 通知收到消息

      47                VMLog.d("MQTT 收到消息:$message")

      48                // 如果未订阅则直接丢弃

      49                if (!topicList.contains(topic)) return

      50                notifyEvent(topic, String(message.payload))

      51            }

      52

      53            override fun deliveryComplete(token: IMqttDeliveryToken) {}

      54        })

      55        //进行连接

      56        mqttClient?.connect(options, null, object : IMqttActionListener {

      57            override fun onSuccess(token: IMqttToken) {

      58                VMLog.d("MQTT 链接成功")

      59                // 链接成功,循环订阅缓存的主题

      60                topicList.forEach { subscribe(it) }

      61            }

      62

      63            override fun onFailure(token: IMqttToken, t: Throwable) {

      64                VMLog.d("MQTT 链接失败 $t")

      65            }

      66        })

      67    }

      68

      69    /**

      70     * 订阅主题

      71     * @param topic 主题

      72     */

      73    fun subscribe(topic: String) {

      74        if (!topicList.contains(topic)) {

      75            topicList.add(topic)

      76        }

      77        try {

      78            //连接成功后订阅主题

      79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

      80                override fun onSuccess(token: IMqttToken) {

      81                    VMLog.d("MQTT 订阅成功 $topic")

      82                }

      83

      84                override fun onFailure(token: IMqttToken, t: Throwable) {

      85                    VMLog.d("MQTT 订阅失败 $topic $t")

      86                }

      87            })

      88        } catch (e: MqttException) {

      89            e.printStackTrace()

      90        }

      91    }

      92

      93    /**

      94     * 取消订阅

      95     * @param topic 主题

      96     */

      97    fun unsubscribe(topic: String) {

      98        if (topicList.contains(topic)) {

      99            topicList.remove(topic)

      100        }

      101        try {

      102            mqttClient?.unsubscribe(topic)

      103        } catch (e: MqttException) {

      104            e.printStackTrace()

      105        }

      106    }

      107

      108    /**

      109     * 发送 MQTT 消息

      110     * @param topic 主题

      111     * @param content 内容

      112     */

      113    fun sendMsg(topic: String, content: String) {

      114        val msg = MqttMessage()

      115        msg.payload = content.encodeToByteArray() // 设置消息内容

      116        msg.qos = 0 //设置消息发送质量,可为0,1,2.

      117        // 设置消息的topic,并发送。

      118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

      119            override fun onSuccess(asyncActionToken: IMqttToken) {

      120                VMLog.d("MQTT 消息发送成功")

      121            }

      122

      123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

      124                VMLog.d("MQTT 消息发送失败 ${exception.message}")

      125            }

      126        })

      127    }

      128

      129    /**

      130     * 通知 MQTT 事件

      131     */

      132    private fun notifyEvent(topic: String, data: String) {

      133        LDEventBus.post(topic, data)

      134    }

      135}

      源码地址

      核心代码就这么多,不超过500行,这里没有直接调用环信历史消息接口获取消息存储记录,后续可以在进行改良,简化实现流程。源码链接附上,配合使用效果更佳。

      服务端github源码:

      https://github.com/lzan13/vmtemplateserver

      客户端github源码:

      https://gitee.com/lzan13/VMTemplateAndroid

      写在最后

      MQTT协议资源占用小,并发连接高,集成简单,特别适用于高频数据交互场景,比如:游戏的世界广场、视频平台弹幕等等等等,欢迎各位小伙伴集思广益,基于MQTT服务实现更多的业务场景,享受技术带来的便利与快乐。

      文章内容仅供阅读,不构成投资建议,请谨慎对待。投资者据此操作,风险自担。

    [No. X016-1]
    分享到微信

    即时

    新闻

    明火炊具市场:三季度健康属性贯穿全类目

    奥维云网(AVC)推总数据显示,2024年1-9月明火炊具线上零售额94.2亿元,同比增加3.1%,其中抖音渠道表现优异,同比有14%的涨幅,传统电商略有下滑,同比降低2.3%。

    企业IT

    重庆创新公积金应用,“区块链+政务服务”显成效

    “以前都要去窗口办,一套流程下来都要半个月了,现在方便多了!”打开“重庆公积金”微信小程序,按照提示流程提交相关材料,仅几秒钟,重庆市民曾某的账户就打进了21600元。

    3C消费

    华硕ProArt创艺27 Pro PA279CRV显示器,高能实力,创

    华硕ProArt创艺27 Pro PA279CRV显示器,凭借其优秀的性能配置和精准的色彩呈现能力,为您的创作工作带来实质性的帮助,双十一期间低至2799元,性价比很高,简直是创作者们的首选。

    研究

    中国信通院罗松:深度解读《工业互联网标识解析体系

    9月14日,2024全球工业互联网大会——工业互联网标识解析专题论坛在沈阳成功举办。