物联网项目总结篇
本篇总结收集以及经历到的JAVA物联网项目的困难点
前言
被人问到现在做的项目的难点是什么地方,仔细的想了想;发现物联网项目与平时容易接触到的WEB开发、app开发等BS模式下的应用,从数据量,并发量,考虑点甚至业务架构,开发思路都存在一定的差异。
这也就导致了某些场景的解决方案,下意识的会去按被物联网熏陶过的思路进行设计。导致对方get不到点,容易出现想得复杂和过度设计。
为避免此情况出现,单独出一篇文章用来隔离大脑,总结记录物联网项目开发时的设计和思路。
想到哪写到哪
难点有什么
万物联网,Java物联网的项目可以定义为设备的云平台。
与我们使用的阿里云,腾讯云的各式各类服务一样,作为云平台的应用,天然就存在几个问题:
- 带宽流量大
- 安全性
- 协议对接
- 性能问题
- 可扩展性
- ....
并且在与设备联网对接中,又会诞生各式各样的限定问题,所以我大体分为三部分:
- 接入层的抗压,流量限制,吞吐量瓶颈...
- 处理层的稳定,高效消费,重试补偿机制...
- 业务层的扩展,有效落库,数据的时效性..
接下来将针对三个架构层,讨论我开发时面临的问题与落地方案。
业务方
接入层
接入层包括:设备连接部分,接受设备上报的报文的服务
安全连接与集群
和用户登录一样,设备同样也需要登录鉴权的过程;
不同的是,用户是通过网站或APP提供的登录入口,通过账号密码登录;
而设备则是需要申请拿到登录地址之后,才能进行一系列的密钥证书,请求应答的过程;
因此设备如何拿到登录地址,也就是连接mqtt(emqx)的路由,是设备首先要做的事;
那么这里就考验一个关于公司成本与设计的问题了:一台emqx的客户端最大连接数是10W,当然我们可以通过集群扩容;
但是 给交给设备连接的地址,如果是一个路由,然后通过Nginx负载打到不同的mqtt上 ;
有的人发现了,首先路由最好由Nginx反向代理,但是Nginx默认的Https最大连接数是1024个;在物联网环境下,是很容易一瞬间有这么多个请求打过来的;
所以Nginx可以设置最大连接数,或者集群部署;
不过,考虑成本,这里推荐设备连接通过业务型的转发,即:通过一个应用,设备请求应用时,拿到属于自己的mqtt地址;这样做的好处有:
- mqtt与nginx都可以不考虑集群的问题
- 可以自定义分配策略,为各客户端找到网络区域最近的服务器地址
- 减轻nginx与服务器的压力
网络带宽问题
定义一个假设:有一台车,每秒将自己的最新状态上报,每条报文有10K,在当地有10W台车;
那么我们的服务器需要每秒在网络上下载 10K*10W的宽带数据,因此关于带宽成本问题是无法逃避的。
作为开发人员我们能做的只有两种办法:
- 建议内网
- 减少报文大小
减少报文大小可以使用各种压缩算法,虽然可以有效的减少一次报文的开销,但是相对的:设备端于云平台都存在压缩于解压的消耗,因此需结合项目成本于数据性能考虑;
一个落地的压缩算法:LZW+RLE
LZW是维护一份字典map,key=a
value=你好
那么 你好世界,可以用a世界
表示
RLE是将重复的连续字符进行计数,但是我们可以稍作修改,用字符+重复次数+所在下标定位:比如 你好,你是谁,我是你爸爸
,可以形容为你039好1,26是48谁5我7爸AB
。
我们可以通过于通讯端协商压缩方式,选择符合各类设备上报习惯的压缩算法。
限流过滤
针对报文限流,由于设备上报的消息基本都是通过各种mq过来,比如真实设备的mqtt,虚拟设备的rabbitmq;
那么不免会出现以下可能:
- 过去的消息延时到现在接收
- 相同的消息
- 无效的消息
并且很多时候,对于一些不需要关注中间值变化的设备,前一秒上报1,后一秒也上报1,这种时间帧的消息对于这种设备属于心跳包而非有效的设备消息;
所以我们需要发挥边缘计算的优势,在有搭建边缘网关的物联网架构里,一定要在边缘网关上进行消息时间戳+版本号+无效报文并且加锁(Lua脚本)的判断;
而不是一股脑的丢到云端服务器。
但是对于直接会连接到云端的设备,这里推荐用一个纯内存的程序,用另一台服务器上CPU的性能分担云端集群的压力。
纯内存指的是,使用Map.List等虚拟机性能,不依赖第三方组件。
处理层
处理层包括:消费设备上报消息,发布给设备指令的服务
规则校验
规则校验指的是,比方说云端有拉黑设备功能,那么设备上报上来肯定是return;
也必须到了处理层才能知道这个设备是否被拉黑;
所以这里的规则就是指需要到处理层才能判断过滤掉的消息组;
那么问题来了,规则1耗时XX时间,规则2耗时XXX时间....
在处理中交叉进行执行校验规则时间不敢...,并且对于某一些规则,某些设备又可以去忽略掉规则校验的结果;
所以第一个方案是:动态线程池+规则引擎+自旋
指的是,开启一个可以动态修改参数的线程池,比如核心线程数,阻塞队列....
规则引擎,指的是可以将if-else的判断公式设计在一个对象和配置中的工具
自旋,指的是主线程等待结果后继续运行;
这样实现的优点与缺点:
优点:
- 动态线程池,可以
debug
上线新规则后的适应期 - 规则引擎,通过线程池下发各个规则,代码简洁且高效,并且可以自由搭配
与
或
等逻辑判断运算 - 隔离各个规则,互不影响,并且各个规则中可以自定义自己的重试机制
- ...
缺点:
- 主线程等待问题,可以换成不等待,优先处理后续逻辑在边界点时回流校验
- 规则不方便扩展数量,因为使用线程池的问题,在规则越来越多后,任务等待的可能性越来越大
- 线程池阻塞时,主线程无法响应
- ...
因此对于规则的数量,线程池的优化,主线程的执行链路都需要综合考量到,此方式可以通过CountDownLatch
非常简单的实现。
但是随着项目体量的增长,奇奇怪怪的需求越来越多,这样简单的 校验-执行 的逻辑流不符合各业务方需要的消息校验;
所以就有了第二个方案:动态线程池+规则引擎+规则目录
少了自旋,即主线程不在等待,而是继续执行后续消费逻辑
多了规则目录,指的是每条消息的唯一业务id,生成一份规则目录比如:
{
"messageId":"111",
"rule":[1,2,3,4,5],
"result":[false,false,false,true,true],
"ignore":[3]
}
对于这条消息的所有判断,都通过后续业务方消费时自行拿到目录中的值确认;
相当于将规则校验的业务与处理层分割,后置到了业务方
鉴权
责任链模式,最适合链路鉴权的设计模式
- 方便扩展
- 隔离处理
- 链路清晰
- 排序
业务分流
TODO
消息下发
处理层在将设备上报的消息下发给实际消费的业务端前,有一个很重要的事:记录本地消息表
这关乎了后续消息丢失,消费失败,未消费等等事故级别问题的修复;
简单的来讲,就是我方在收到消息后,经过过滤,鉴权等处理;将其投至业务方后,进行该消息的记录,也就是直接洛库。
这样的设计是针对以下几点的补偿处理机制:
- 业务方反馈消息异常,请求查看日志
- 业务方消费异常,本表在凌晨时做一次对账操作,进入重试机制
- 人工干预,现场调试失败时,直接通过该表发起重试消息
- ...
有点像交付系统中的对账操作
路由政策
主要针对,业务方调用处理层接口下发指令消息给设备,业务方将设备应答消息下发给业务方这条链路问题;
在物联网中,一次通讯消息的sessionId
会由云端或者设备端生成;
我们debug日志也是通过该sessionId找到消息应答的链路,但是这样在有集群的服务中很不好;
因为这条链路日志可能发起者在A服务,接收到应答消息的却在B服务中;
所以需要重写dubbo的负载算法,当然了这里指的前提是:处理层的消息来自上次应用调用处理层的rpc接口,我们看的日志也指的是处理层。
流程是业务方发起时消息时,往A服务中写入该设备的sessionId缓存,标识为sessionId,指是A服务地址;
业务层
业务层则是实际消费设备上报过来的报文,以及发起指令的应用
落库问题
最后还是来到了问题最大的地方,业务层如何接住一次性的大量插入请求;
先提供技术选型:时序数据库 + 非关系型数据库 + 关系型数据库
时序数据库记录的是大数据量的日志数据,比如设备上报的所有时间帧下的消息
非关系型数据库记录的是经常读且更新的模型数据,比如设备在设备中心的影子模型
关系型数据库记录的是我们需要的数据;
这一块确实很难想到什么优化方案,因为脑袋里面除了顶用商业版数据库外,也就是批处理,优化,参数等等耳熟能详的操作;
但是在时序数据中,需要时效性特别好的场景时,是逃不过每秒高并发插入的实时的;
并发消费
减小锁粒度
指的是两类:一是有锁竞争的业务方,尽可能的以时间作为锁粒度,比如200毫秒收集一批数据,就放开锁等待而非数据量大于n值时才放开;二是指数据库上,关注行锁范围;
巧用异步编排,CompletableFuture类
线程池都是CPU密集型
TODO