异步业务系统的设计

乐云一
  • 业务设计
  • 业务设计
About 4032 wordsAbout 13 min

异步业务系统的设计

在我们的系统达到一个量级时,频繁的请求互动很容易出现IO资源的匮乏与不稳定,所以 削峰聚合 是不可避免的设计思路;

这时候为了后续更好的系统扩展性,会将请求一个链路的同步系统,慢慢的改造为微服务+异步业务的架构。

本篇将以简单的订单形的业务系统作为例子,探讨一下系统在异步事件下的最佳设计。

需求

刚进公司的你,接手的项目中恰好有一个这样的新需求:

接收来自另一个系统的数据信息,将其解析后入库,并且根据时间,按时、日、月...对应的数据绘画成可视化图表。

数据信息为:

{
    "messageId":"uuid",
    "signCode":"数据标识",
    "value":1,
    "timestamp":1728658311693
    //.....
}

来源方式可以理解为接口/队列/...没有限制的直接投入到我方中。

从1开始

作为一个新手,在接手理解后,设计了如下数据库结构:

image-20241011225653143

而图表化的数据则是:

image-20241011231518700

一张表是保存每条数据的入库记录,另一张则是按照时、日、月区分类型,通过计算累加/减...更新到对应的时间片数据中;

比如数据来了两条:

{
    "messageId":"uuid",
    "signCode":"数据标识",
    "value":1,
    "timestamp":1728658311693
},
{
    "messageId":"uuid",
    "signCode":"数据标识",
    "value":5,
    "timestamp":1728658312693
}

图表化后,图表数据库数据则为:

value:6,
type:,
date:2024-10-11 23:00:00
//...
value:6,
type:日
date:2024-10-11

整个入库流程为:

好了,这样你就可以非常简单的完成这个需求,在流量接入小的初期,非常的直接高效。

而查询图表的操作,同样的非常直接:

image-20241011233459411

流量激增

某一天,发现机器人预警CPU80%、90%、91%.....,你慌乱的进行简单的排查后,很快发现是数据库中你设计的表出现了非常多的请求。

哦豁,原来是查询的时候你并未设计缓存层,所以的查询请求都直接打到了db层,造成数据库所在的服务器资源无底线的被无状态连接消耗。

但是面对生产事故你只能使用紧急手段解决:扩容服务器

最终在当天的晚上,你着手将读与写的接口进行了简单重构:

image-20241011234615433

使用了经典模式Cache Aside方式,先写数据库再更新缓存。于是由于缓存层的介入,用户的查询请求并未全部打到数据库上,基本解决了目前数据库过大的压力...

热点爆破

又在某一天,突发流量爆破事件,发现请求接口大量报错,数据出现丢失或者重复消费的情况。

随着时间的推移,你发现系统逐渐的变卡,CPU在短短的时间内飙升,完蛋。

在追踪双方的后台日志后,发现无非是常见的系统并发下的因素导致我们的入口方法非常的不稳定,而造成这种现象的原因有很多:

  1. 数据库写锁竞争,导致写请求存在等待时效

    基于数据来源的不限制性,同一类型的数据信息很频繁的修改同一行数据;由此存在一个隐形的队列存储同一个signCode的更新操作,等待同一把数据行锁,强行进行等待阻塞。极大的影响了数据库的性能,且当阻塞时间大于接口调用时间时触发 RPC 接口的重试机制

  2. 大量报错,写请求链路强依赖;

    我们系统暴露的入口,从来源到入库整个链路是强依赖性的,其一一环故障就会出现服务断开现象。

  3. 入口未进行幂等

    当发生阻塞而出发请求方数据重发时,数据也就必然出现重复消费现象

  4. 数据库与缓存层的连接数

    由于全部的写请求与部分查询请求会直接与数据库进行连接,当连接数达到了我们设置的安全上限时,新请求会强行等待新连接直到超时

为了解决上述常规问题,你想着今晚要大干一场,将整体模块重构。

于是乎,你紧急修改了入口的令牌桶速率,将流量控制在目前可接收的范围内,然后重启服务,并且广播:系统维护结束...

在这个夜晚,你采用了一贯解决高并发业务的方案:异步架构+CQRS

image-20241015173735431

在mq的选型上你预估未来的并发量,选择了使用Kafka作为队列,消费端订阅其topic完成数据的消费,同时做好对数据的容错、重试、兼容等处理;

因为引入了异步消费的任务端,原服务对请求的处理变成了非常简单的:

  1. 接收
  2. 埋点(记录行为、鉴权等操作)
  3. 发布消息
  4. 响应

使用了异步系统处理的你,在很大程度上直接杜绝了上述原因中 1和2 的情况;而幂等性问题,基于当前需求数据结构的简单性,你选择在网关处进行幂等判断,而这并不是你考虑的范畴中。不过你还是在数据消费前,通过messageId唯一id进行行为的唯一性判断。但这也不是你考虑的范畴中,因为数据库的主键已经帮你完成了设计;![img](file:///C:\Users\DELL\AppData\Local\Temp\SGPicFaceTpBq\11800\25DBE08B.png)

消费性能

在系统平稳运行了一段时间后,随着接入系统的数量变多,并发量逐渐稳步上升。

你发现图表的数据延时越来越严重,你知道,这是因为为了解决数据库写锁竞争问题,kafka的分区策略是按照指定key的消息保存策略。

因此一个类型signCode的数据会以串行化的队列依次分发给消费服务:

image-20241016111407375

但是由于一个kafka实例的分区数量存在最大数的限制,因此即使我们无线扩容消费服务节点,也无法可观的增加数据消费能力。同时消息队列系统如果只基于提高消费能力而增加服务集群,会增加其集群的复杂度,影响整个集群的稳定性等等...

所以除了有选择性的增加集群外,更需要提高单节点的服务消费能力;

你想了想,目前的消费服务处理数据是这样的:

image-20241016134224731

数据被线性的加工入库,同时针对单条数据的ack操作也是所以事件完成后确认;

我们将其拆分一下,通过数据收集桶+异步处理器的方式分发数据:

image-20241016135950004

在数据进入到消费服务中,直接添加到本地消费队列中,并且将其记录到一张临时的记录表里,随后ack掉这条消息,这样一来消息在kafka侧已经被消费,进行下一条数据的发放,细节在于:

  1. 本地记录表的作用是当服务崩溃时,进行数据恢复将未处理完成的数据回源到数据收集桶中
  2. 异步处理器采用线程池,内部独立,因此只需要修改线程大小就可以控制消息的实际消费速率

数据聚合

经过上述的改造,你发现系统的吞吐量获得了极大的提升,但是当同一个signCode类型数据非常频繁的上报时,消费服务会出现间断性由于本地队列过大,处理线程不能及时解析,导致GC频繁的问题存在;

你想了想,先暂时将这个队列的大小根据JVM大小进行限制,但是你知道,这样会回到最开始的问题:kafka单节点串行消费速率过慢。

这时候你想到了,因为需求的特殊性,完成可以将数据按时间分片进行一次聚合后再累计计算入库;

说干就干:

image-20241016145500496

在数据收集后,将其直接投入到时间循环队列中,根据数据信息中的signCode字段判断采用秒级延时还是分级延时...的时间片;

然后时间片所做的动作也很简单:

  1. 在第2s时,获取上一秒时间片中的数据
  2. 将其聚合计算,入库一次
  3. 将上一秒时间片数据清空

但是也伴随着风险:

  1. 时间片队列上限问题,达到最大值时,这一秒之后的数据一定存在延时
  2. 数据聚合处理一定存在数据延时问题
  3. 数据批计算之后,只入库一次总值,这一批数据的原子性问题
  4. 服务崩溃后,从记录表恢复数据的难度与准确性变高
  5. ...

你将上述风险逐个击破或是采用兜底机制处理完之后,消费服务端越加稳定高效

下流崩溃

某一天,睡梦中的你被一通电话惊醒,电话那头领导催促着你赶紧检查系统出现了什么问题;用户反馈系统中的所有图表都没有更新,查询数据正常但是没有写入;

你连忙打开服务器,看到所有的上游请求都指向了一个异常,因为某种原因kafka挂掉/订阅节点挂点...写请求未进行正常消费

哦豁,下流崩溃了!

这时候你才意识到,你并没有设计消费端的最低兜底机制,也就是当下流崩溃时无法进行人工干预或服务降级的操作;

你摸了摸下巴,虽然现在的mq和服务节点都搭有集群并且保证可用性;但是外部节点考虑成本因素,一是租的,二是节点有限,三是维护问题;

所以当mq不可用时,更希望的是当没有mq时,写请求也可以正常使用;

你汗颜的捂着额头,所以最佳的服务降级就是未引入mq时期的链路状态:

image-20241016162236534

当kafka不可用时,将数据推到另一个服务中进行负载均衡做最低限度的模拟signCode 分片的方式,然后再直接调用写服务暴露出来的接口完成数据接收,进入到写服务的本地队列中等待处理,通过这种方式保障了消息链路在主通道崩溃时的服务可靠性;

但是当写服务所有节点崩溃时,又怎么处理源源不断的写请求呢?

缓存,你想到了缓存;

因为写的目的,只是为了查询,那么为何不能假设写入请求发送成功 = 写入成功

那么是否说明写入请求发送成功 = 缓存可更新?

你想了想,可行,但是操作难度很大很大:

  1. 一是要保证写入请求发送成功 = 缓存可更新 恒等于
  2. 二是查询请求修改幅度过大
  3. 不符合系统的从上至下的表述模式
  4. ...

所以,写服务崩溃的话,你编写了一个系统监控脚本:当服务节点<=2时,自动在一个稳定服务器中启动一个新的服务

超热点数据

你松了口气,直到公司做大做强的那天,某个signCode数据在某时端内出现了巨大的并发上升。于是写服务线程占据了时间片中本地队列的多数,使其他signCode数据无法正常的分配队列而出现阻塞;

当晚你想了两种处理方式:

  1. 优先级分配,将signCode处理线程优先级提高
  2. 隔离队列,每个signCode 拥有自己的队列

你试了试,第二种在现阶段改动少容易实现且效果显著,通过策略+map的模式,有效的隔绝了signCode处理队列的互相影响。

而第一种,通过查阅资料,你学起了美团动态线程池的骚操作,设计一个初始为0的线程池;

当系统检测到有热点signCode数据时,将该线程池动态调配,并且单独服务热点signCode

而这一切都可以基于Prometheus 等监控系统完成,编写脚本或人工干预;

灵活装配

随着你的系统完善,越来越多的奇奇怪怪的系统要求接入到你的系统,其他项目也想要白嫖一个可以根据自己上报的数据生成可视化图表的功能;

于是你将数据结构文档给了他们,也就是你的数据来源体:

{
    "signCode":"xx",
    "value":""
    ....
}

但是你发现了一个问题,并非所有系统提供的value都是整数,而且有的系统中的value还涉及到了单位需要进行装换。

你仔细思索,如果想把系统做大做强,value肯定不能限定在整数中,并且处理逻辑也不能只是简单的累加计算;

所以你扩展了数据结构:

{
    "signCode":"xx",
    "value":"",
    //....
	"transferFunction":"",
    "logicType":""
    //...
}

加入了转换函数和处理逻辑类型,但是输入怎样的值才能灵活的让系统自行解析转换呢?

你想到了让他们将想要进行的数据转换以伪代码的方式写到transferFunction中,但是一定得有规则;

查阅资料,你知道可以使用 Jexl 表达式引擎让系统有规则的识别伪代码,经过精心设计,足以支持让接入者自行做数据计算与单元转换

但是处理逻辑要怎么弄才好?简单的数据有累加、减乘除,但是复杂的可是不可定义的数学公式。

那么是否可以让他们直接把他们希望计算的代码给我呢?

你想到了热加载class文件的办法,动态的注册对应signCode的处理器,于是乎你要求了接入你系统的流程:

  1. 如果不是简单的计算,请引入你定义的sdk依赖,并且继承其中的某个类,然后编写你希望的计算方式
  2. 将这个类的class文件交给你
  3. 同步到系统中,进行热部署
  4. 按照文档完成接入

总结

从头写到尾,说是设计,其实更是是自己对于一个功能从小白到老白的步步理解;

自己做过大大小小的类似的功能同时也是查阅了相关资料和问了有这种系统经验内业人事所得的一些心得

希望能帮助到看到这篇文章的兄弟 :)

Last update:
Contributors: LeYunone
Comments
  • Latest
  • Oldest
  • Hottest
Powered by Waline v2.14.7