按秒触发消息收集器

乐云一
  • 设计
  • 设计
About 1190 wordsAbout 4 min

开发时遇到一个需求场景:

需要将接收到的消息进行累计,然后每过X秒之后,统一消费;以此循环,只要有消息接收,就进行累计-统一消费的逻辑

场景

每5秒消费一次收集的消息

数据收集

方案一:

定时任务+一个map

实现:

Task/任何框架或While死循环线程 的定时任务,每5秒执行一次。

使用一个全局的ConcurrentHashMap;

定时任务中 直接对该map进行 每5秒一次的消费 - 清空

缺点: 能想到的关于性能、安全等问题全没有保证

优点: 简单

方案二:

延时队列 **DelayQueue **

实现:

定义一个上一个5秒的时间戳,比如现在的上一个时间戳是8:00:35
那么35秒 - 40秒 ,
在36秒收集一条消息,存入DelayQueue中时间为4秒
在37秒收集一条消息,---------------时间为3秒
......

当到40秒时,存入的消息释放,给予一个100ms的处理【取出】时间放入一个map中。

缺点:很跳脱的想法;如果5秒内,不管哪个时间点上存入一笔多消息,那么队列移动节点的开销以及时间消耗无法估量。

优点: 不需要关注安全问题,因为每个时间点存入的消息在统一消费时都是隔离着的。

方案三:

ScheduledThreadPoolExecutor+状态阻塞

实现:

创建一个5S延时执行的线程池。
当一条消息进入时,创建一个线程加入线程池;
并且将该消息放至唯一的一个消息收集容器中
后续判断线程池是否有线程,如果有则填入容器中

5秒收集结束后,改变status为阻塞【CountDownLatch】,消息处理完以后放开

缺点: 消息处理速度无法估量,导致前一个线程已经过了5秒但是无法立刻放开,主线程阻塞消息堆积,导致ack重发、SQL超时等等问题

优点: 引入线程池+延时的概念,创建收集桶【消息收集容器】,保证了消息的统一收集,统一消费

方案四:

ScheduledThreadPoolExecutor+2个map+轮询

实现:

创建一个5S延时执行的线程池。
当一条消息进入时,创建一个线程加入线程池;
然后判断当前status【默认为1】,
如果为1,则将消息放至2中。
如果为2,则将消息放至1中。

5秒收集结束,
如果当前status为1,则接受容器2中所有对象,然后将status改变为2。
如果当前status为2,则接受容器1中所有对象,然后将status改变为1。
处理消息
清空拿到的容器。

缺点: 如果处理消息时间过长,则会将 下下个5秒的消息累计到下个5秒上。并且由于当前累积,导致下次累积,又会导致下次累积++++。

优点: 理论上根据业务的并发强度以及消息的处理速度,创建2个甚至2个以上map容器进行存储,是可以保证没有缺点的【统一收集,统一消费,5秒】的方案

方案五:

ScheduledThreadPoolExecutor + 线程队列

本方案是根据业务设定的最终方案,也是我觉得最适合“消息收集”场景使用的模板

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "XXX", durable = "true"),
        exchange = @Exchange(value = "XXX", type = ExchangeTypes.TOPIC),
        key = {"XXX"}
))
public class CollectosMessageConsumer {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);

    private final BlockingQueue<ProductionThread> blockingQueue = new LinkedBlockingDeque<>(3);
    
    @Autowired
    private XXXService xxxService;

    @RabbitHandler
    public void processMessage(@Payload String msg, Channel channel, Message message) throws IOException {
        logger.info("RECEIVE MESSAGE = === " + msg);
        Message message = JSON.parseObject(msg, Message.class);
        if (ObjectUtil.isNotNull(message)) {
            ProductionThread lastThread = blockingQueue.peek();
            if (ObjectUtil.isNotNull(lastThread) && lastThread.status == 0) {
                //上次开始的5秒区间收集器
                this.setProductionData(message,lastThread);
            } else {
                //创建新的收集器
                ProductionThread productionThread = new ProductionThread();
                blockingQueue.add(productionThread);
                this.setProductionData(message,productionThread);
                this.startProductionThread(productionThread);
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    private void setProductionData(Message message, ProductionThread thread) {
        ConcurrentHashMap<String, message> productionPlan = thread.getProductionPlan();
        productionPlan.put(message.getId(),message);
    }

    private void startProductionThread(ProductionThread thread) {
        threadPool.schedule(thread, 5, TimeUnit.SECONDS);
    }

    public class ProductionThread extends Thread {

        private final ConcurrentHashMap<String, message> productionPlan = new ConcurrentHashMap<>();

        public volatile int status = 0;

        @SneakyThrows
        @Override
        public void run() {
            status = 1;
            blockingQueue.remove(this);
            ArrayList<Message> messages = CollectionUtil.newArrayList(productionPlan.values());
            int i = xxxService.consumer(messages);
            logger.info("ADD DATA ======>>>>>> CLEAN productionPLan");
            logger.info("ADD DATA ======>>>>>> " + messages.size()+"=>"+i);
            
            //如果消费失败的补偿机制
            xxxService.error();
        }

        public ConcurrentHashMap<String, message> getProductionPlan() {
            return productionPlan;
        }
    }

}

思路、模式等一切都在代码里;

Last update:
Contributors: leyunone
Comments
  • Latest
  • Oldest
  • Hottest
Powered by Waline v2.14.7