当有短链接访问时,收集短链接的访问数据,并发送到消息队列进行异步处理,确保高并发情况下的数据处理不会影响短链接访问的实时性。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Component @RequiredArgsConstructor public class ShortLinkStatsSaveProducer {
private final RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.topic}") private String statsSaveTopic;
public void send(Map<String, String> producerMap) { String keys = UUID.randomUUID().toString(); producerMap.put("keys", keys); Message<Map<String, String>> build = MessageBuilder .withPayload(producerMap) .setHeader(MessageConst.PROPERTY_KEYS, keys) .build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L); log.info("[消息访问统计监控] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys); } catch (Throwable ex) { log.error("[消息访问统计监控] 消息发送失败,消息体:{}", JSON.toJSONString(producerMap), ex); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| @Component @RequiredArgsConstructor @RocketMQMessageListener( topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.consumer.group}" ) public class ShortLinkStatsSaveConsumer implements RocketMQListener<Map<String, String>> {
private final ShortLinkMapper shortLinkMapper; private final ShortLinkGotoMapper shortLinkGotoMapper; private final RedissonClient redissonClient; private final LinkAccessStatsMapper linkAccessStatsMapper; private final LinkLocaleStatsMapper linkLocaleStatsMapper; private final LinkOsStatsMapper linkOsStatsMapper; private final LinkBrowserStatsMapper linkBrowserStatsMapper; private final LinkAccessLogsMapper linkAccessLogsMapper; private final LinkDeviceStatsMapper linkDeviceStatsMapper; private final LinkNetworkStatsMapper linkNetworkStatsMapper; private final LinkStatsTodayMapper linkStatsTodayMapper; private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;
@Value("${short-link.stats.locale.amap-key}") private String statsLocaleAmapKey;
@Override public void onMessage(Map<String, String> producerMap) { String keys = producerMap.get("keys"); if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) { if (messageQueueIdempotentHandler.isAccomplish(keys)) { return; } throw new ServiceException("消息未完成流程,需要消息队列重试"); } try { String fullShortUrl = producerMap.get("fullShortUrl"); if (StrUtil.isNotBlank(fullShortUrl)) { String gid = producerMap.get("gid"); ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class); actualSaveShortLinkStats(fullShortUrl, gid, statsRecord); } } catch (Throwable ex) { log.error("记录短链接监控消费异常", ex); try { messageQueueIdempotentHandler.delMessageProcessed(keys); } catch (Throwable remoteEx) { log.error("删除幂等标识错误", remoteEx); } throw ex; } messageQueueIdempotentHandler.setAccomplish(keys); }
public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) { fullShortUrl = Optional.ofNullable(fullShortUrl).orElse(statsRecord.getFullShortUrl()); RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl)); RLock rLock = readWriteLock.readLock(); rLock.lock(); try { if (StrUtil.isBlank(gid)) { LambdaQueryWrapper<ShortLinkGotoDO> queryWrapper = Wrappers.lambdaQuery(ShortLinkGotoDO.class) .eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl); ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(queryWrapper); gid = shortLinkGotoDO.getGid(); } int hour = DateUtil.hour(new Date(), true); Week week = DateUtil.dayOfWeekEnum(new Date()); int weekValue = week.getIso8601Value(); LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder() .pv(1) .uv(statsRecord.getUvFirstFlag() ? 1 : 0) .uip(statsRecord.getUipFirstFlag() ? 1 : 0) .hour(hour) .weekday(weekValue) .fullShortUrl(fullShortUrl) .gid(gid) .date(new Date()) .build(); linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO); Map<String, Object> localeParamMap = new HashMap<>(); localeParamMap.put("key", statsLocaleAmapKey); localeParamMap.put("ip", statsRecord.getRemoteAddr()); String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap); JSONObject localeResultObj = JSON.parseObject(localeResultStr); String infoCode = localeResultObj.getString("infocode"); String actualProvince = "未知"; String actualCity = "未知"; if (StrUtil.isNotBlank(infoCode) && StrUtil.equals(infoCode, "10000")) { String province = localeResultObj.getString("province"); boolean unknownFlag = StrUtil.equals(province, "[]"); LinkLocaleStatsDO linkLocaleStatsDO = LinkLocaleStatsDO.builder() .province(actualProvince = unknownFlag ? actualProvince : province) .city(actualCity = unknownFlag ? actualCity : localeResultObj.getString("city")) .adcode(unknownFlag ? "未知" : localeResultObj.getString("adcode")) .cnt(1) .fullShortUrl(fullShortUrl) .country("中国") .gid(gid) .date(new Date()) .build(); linkLocaleStatsMapper.shortLinkLocaleState(linkLocaleStatsDO); } LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder() .os(statsRecord.getOs()) .cnt(1) .gid(gid) .fullShortUrl(fullShortUrl) .date(new Date()) .build(); linkOsStatsMapper.shortLinkOsState(linkOsStatsDO); LinkBrowserStatsDO linkBrowserStatsDO = LinkBrowserStatsDO.builder() .browser(statsRecord.getBrowser()) .cnt(1) .gid(gid) .fullShortUrl(fullShortUrl) .date(new Date()) .build(); linkBrowserStatsMapper.shortLinkBrowserState(linkBrowserStatsDO); LinkDeviceStatsDO linkDeviceStatsDO = LinkDeviceStatsDO.builder() .device(statsRecord.getDevice()) .cnt(1) .gid(gid) .fullShortUrl(fullShortUrl) .date(new Date()) .build(); linkDeviceStatsMapper.shortLinkDeviceState(linkDeviceStatsDO); LinkNetworkStatsDO linkNetworkStatsDO = LinkNetworkStatsDO.builder() .network(statsRecord.getNetwork()) .cnt(1) .gid(gid) .fullShortUrl(fullShortUrl) .date(new Date()) .build(); linkNetworkStatsMapper.shortLinkNetworkState(linkNetworkStatsDO); LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder() .user(statsRecord.getUv()) .ip(statsRecord.getRemoteAddr()) .browser(statsRecord.getBrowser()) .os(statsRecord.getOs()) .network(statsRecord.getNetwork()) .device(statsRecord.getDevice()) .locale(StrUtil.join("-", "中国", actualProvince, actualCity)) .gid(gid) .fullShortUrl(fullShortUrl) .build(); linkAccessLogsMapper.insert(linkAccessLogsDO); shortLinkMapper.incrementStats(gid, fullShortUrl, 1, statsRecord.getUvFirstFlag() ? 1 : 0, statsRecord.getUipFirstFlag() ? 1 : 0); LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder() .todayPv(1) .todayUv(statsRecord.getUvFirstFlag() ? 1 : 0) .todayUip(statsRecord.getUipFirstFlag() ? 1 : 0) .gid(gid) .fullShortUrl(fullShortUrl) .date(new Date()) .build(); linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO); } catch (Throwable ex) { log.error("短链接访问量统计异常", ex); } finally { rLock.unlock(); } } }
|
幂等处理
通过 Redis Setnx 命令实现消息幂等处理,保证消息不会重复消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Component @RequiredArgsConstructor public class MessageQueueIdempotentHandler {
private final StringRedisTemplate stringRedisTemplate;
private static final String IDEMPOTENT_KEY_PREFIX = "short-link:idempotent:";
public boolean isMessageProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES)); }
public boolean isAccomplish(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1"); }
public void setAccomplish(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES); }
public void delMessageProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; stringRedisTemplate.delete(key); } }
|