1. Redis Stream 消费者初始化
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component @RequiredArgsConstructor public class ShortLinkStatsStreamInitializeTask implements InitializingBean { private final StringRedisTemplate stringRedisTemplate;
@Override public void afterPropertiesSet() throws Exception { Boolean hasKey = stringRedisTemplate.hasKey(SHORT_LINK_STATS_STREAM_TOPIC_KEY); if (hasKey == null || !hasKey) { stringRedisTemplate.opsForStream().createGroup(SHORT_LINK_STATS_STREAM_TOPIC_KEY, SHORT_LINK_STATS_STREAM_GROUP_KEY); } } }
|
详细解释:
Stream Group: Redis Stream 中的消费者组(Consumer Group)允许多个消费者共享工作负载。每个消息被分配给消费者组中的一个消费者进行处理,避免了单个消费者的负担过重。
创建消费者组: 这段代码检查是否已经存在一个消费者组,如果不存在,则通过 createGroup
方法创建一个新的消费者组。创建消费者组时,我们指定了流的名字(SHORT_LINK_STATS_STREAM_TOPIC_KEY
)和消费者组的名字(SHORT_LINK_STATS_STREAM_GROUP_KEY
)。如果该消费者组已经存在,则不会重复创建。
应用场景: 这种机制适用于需要多个消费者平衡处理任务的场景,例如在短链接统计或日志处理中,不同的服务或应用可以并行地消费消息进行处理。
2. Redis Stream 消费者配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Configuration @RequiredArgsConstructor public class RedisStreamConfiguration { private final RedisConnectionFactory redisConnectionFactory; private final ShortLinkStatsSaveConsumer shortLinkStatsSaveConsumer;
@Bean public ExecutorService asyncStreamConsumer() { AtomicInteger index = new AtomicInteger(); return new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), runnable -> { Thread thread = new Thread(runnable); thread.setName("stream_consumer_short-link_stats_" + index.incrementAndGet()); thread.setDaemon(true); return thread; }, new ThreadPoolExecutor.DiscardOldestPolicy() ); }
@Bean public Subscription shortLinkStatsSaveConsumerSubscription(ExecutorService asyncStreamConsumer) { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .batchSize(10) .executor(asyncStreamConsumer) .pollTimeout(Duration.ofSeconds(3)) .build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(SHORT_LINK_STATS_STREAM_TOPIC_KEY, ReadOffset.lastConsumed())) .consumer(Consumer.from(SHORT_LINK_STATS_STREAM_GROUP_KEY, "stats-consumer")) .autoAcknowledge(true) .build();
Subscription subscription = listenerContainer.register(streamReadRequest, shortLinkStatsSaveConsumer); listenerContainer.start(); return subscription; } }
|
详细解释:
线程池与消费者: 通过 ExecutorService
使用线程池处理消费任务。消费者通过独立线程异步拉取消息并处理,确保系统的高并发处理能力,避免阻塞其他任务。
批量消息处理: 配置了 batchSize(10)
,意味着消费者每次从 Redis Stream 拉取最多 10 条消息,适应高吞吐量的需求。
轮询超时设置: 设置了 pollTimeout
为 3 秒,表示消费者如果在 3 秒内没有拉取到消息,将会继续等待新消息到来。这样设计确保了消费者在没有消息时不浪费计算资源,只有当有新消息时才开始处理。
消费者组与偏移量: 使用 StreamOffset.lastConsumed()
确保消费者从上次消费的偏移量开始消费,避免重复消费消息。
自动确认: 配置了 autoAcknowledge(true)
,意味着一旦消息被成功消费,Redis 将自动确认该消息,确保消息不被重复消费。
3. 消息幂等性处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component @RequiredArgsConstructor public class MessageQueueIdempotentHandler { private final StringRedisTemplate stringRedisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "short-link:idempotent:";
public boolean isMessageBeingConsumed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES)); }
public boolean isAccomplish(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1"); }
public void setAccomplish(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES); }
public void delMessageProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.delete(key); } }
|
详细解释:
幂等性处理: 这段代码通过 Redis 实现消息的幂等性,确保每条消息只会被处理一次,即使在消息消费失败后重试,也不会出现重复处理的情况。
setIfAbsent
: 使用 Redis 的 setIfAbsent
命令,如果指定的键不存在,才会设置值为 “0”(表示消息正在处理中)。如果键已经存在,表示该消息正在被处理或已处理过,因此直接返回 false
,防止重复处理。
setAccomplish
: 当消息处理完成后,通过 setAccomplish
方法将消息状态标记为完成(1
),并设置过期时间为 2 分钟。
delMessageProcessed
: 如果在处理过程中出现异常,幂等性标志将被删除,允许系统重试该消息。
4. 生产者与消费者
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Slf4j @Component @RequiredArgsConstructor public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> { private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;
@Override public void onMessage(MapRecord<String, String, String> message) { String messageId = message.getId().toString(); if (messageQueueIdempotentHandler.isMessageBeingConsumed(messageId)) { if (messageQueueIdempotentHandler.isAccomplish(messageId)) { return; } throw new ServiceException("消息需要重试"); } messageQueueIdempotentHandler.setAccomplish(messageId); } }
|
解释:
- 消费者会从 Redis 流中获取消息,然后通过
isMessageBeingConsumed
方法检查该消息是否正在处理中。如果是,则跳过;如果该消息已经处理完,则直接返回;如果是第一次处理该消息,则开始消费并标记消息为已处理。
工作流程总结:
- 生产者: 生产者通过
stringRedisTemplate.opsForStream().add()
将消息推送到 Redis Stream。
- 消费者: 消费者从 Redis Stream 中拉取消息,使用幂等性处理确保每条消息只会被处理一次。消费者处理完成后,标记消息已处理。
- 如果在处理过程中发生异常,幂等性标记会被清除,允许系统重试该消息。
通过这种模式,消息的处理确保了至少一次处理,并且在出现异常时,能够重试消息而不会重复处理。
5. 总结
通过 Redis Stream 和幂等性处理,可以实现高效、可靠的消息队列系统,适合需要高吞吐量、低延迟且能处理失败重试的场景。Redis 提供的 Stream 特性和幂等性机制确保了系统的健壮性和稳定性。