短链接核心6:短链接监控

当有短链接访问时,收集短链接的访问数据,并发送到消息队列进行异步处理,确保高并发情况下的数据处理不会影响短链接访问的实时性。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
@RequiredArgsConstructor
public class ShortLinkStatsSaveProducer {

private final RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.producer.topic}")
private String statsSaveTopic;

/**
* 发送延迟消费短链接统计
*/
public void send(Map<String, String> producerMap) {
String keys = UUID.randomUUID().toString();
producerMap.put("keys", keys);
Message<Map<String, String>> build = MessageBuilder
.withPayload(producerMap)
.setHeader(MessageConst.PROPERTY_KEYS, keys)
.build();
SendResult sendResult;
try {
sendResult = rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L);
log.info("[消息访问统计监控] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
} catch (Throwable ex) {
log.error("[消息访问统计监控] 消息发送失败,消息体:{}", JSON.toJSONString(producerMap), ex);
// 自定义行为...
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "${rocketmq.producer.topic}",
consumerGroup = "${rocketmq.consumer.group}"
)
public class ShortLinkStatsSaveConsumer implements RocketMQListener<Map<String, String>> {

private final ShortLinkMapper shortLinkMapper;
private final ShortLinkGotoMapper shortLinkGotoMapper;
private final RedissonClient redissonClient;
private final LinkAccessStatsMapper linkAccessStatsMapper;
private final LinkLocaleStatsMapper linkLocaleStatsMapper;
private final LinkOsStatsMapper linkOsStatsMapper;
private final LinkBrowserStatsMapper linkBrowserStatsMapper;
private final LinkAccessLogsMapper linkAccessLogsMapper;
private final LinkDeviceStatsMapper linkDeviceStatsMapper;
private final LinkNetworkStatsMapper linkNetworkStatsMapper;
private final LinkStatsTodayMapper linkStatsTodayMapper;
private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;

@Value("${short-link.stats.locale.amap-key}")
private String statsLocaleAmapKey;

@Override
public void onMessage(Map<String, String> producerMap) {
String keys = producerMap.get("keys");
if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) {
// 判断当前的这个消息流程是否执行完成
if (messageQueueIdempotentHandler.isAccomplish(keys)) {
return;
}
throw new ServiceException("消息未完成流程,需要消息队列重试");
}
try {
String fullShortUrl = producerMap.get("fullShortUrl");
if (StrUtil.isNotBlank(fullShortUrl)) {
String gid = producerMap.get("gid");
ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
actualSaveShortLinkStats(fullShortUrl, gid, statsRecord);
}
} catch (Throwable ex) {
log.error("记录短链接监控消费异常", ex);
try {
messageQueueIdempotentHandler.delMessageProcessed(keys);
} catch (Throwable remoteEx) {
log.error("删除幂等标识错误", remoteEx);
}
throw ex;
}
messageQueueIdempotentHandler.setAccomplish(keys);
}

public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) {
fullShortUrl = Optional.ofNullable(fullShortUrl).orElse(statsRecord.getFullShortUrl());
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl));
RLock rLock = readWriteLock.readLock();
rLock.lock();
try {
if (StrUtil.isBlank(gid)) {
LambdaQueryWrapper<ShortLinkGotoDO> queryWrapper = Wrappers.lambdaQuery(ShortLinkGotoDO.class)
.eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl);
ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(queryWrapper);
gid = shortLinkGotoDO.getGid();
}
int hour = DateUtil.hour(new Date(), true);
Week week = DateUtil.dayOfWeekEnum(new Date());
int weekValue = week.getIso8601Value();
LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()
.pv(1)
.uv(statsRecord.getUvFirstFlag() ? 1 : 0)
.uip(statsRecord.getUipFirstFlag() ? 1 : 0)
.hour(hour)
.weekday(weekValue)
.fullShortUrl(fullShortUrl)
.gid(gid)
.date(new Date())
.build();
linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO);
Map<String, Object> localeParamMap = new HashMap<>();
localeParamMap.put("key", statsLocaleAmapKey);
localeParamMap.put("ip", statsRecord.getRemoteAddr());
String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap);
JSONObject localeResultObj = JSON.parseObject(localeResultStr);
String infoCode = localeResultObj.getString("infocode");
String actualProvince = "未知";
String actualCity = "未知";
if (StrUtil.isNotBlank(infoCode) && StrUtil.equals(infoCode, "10000")) {
String province = localeResultObj.getString("province");
boolean unknownFlag = StrUtil.equals(province, "[]");
LinkLocaleStatsDO linkLocaleStatsDO = LinkLocaleStatsDO.builder()
.province(actualProvince = unknownFlag ? actualProvince : province)
.city(actualCity = unknownFlag ? actualCity : localeResultObj.getString("city"))
.adcode(unknownFlag ? "未知" : localeResultObj.getString("adcode"))
.cnt(1)
.fullShortUrl(fullShortUrl)
.country("中国")
.gid(gid)
.date(new Date())
.build();
linkLocaleStatsMapper.shortLinkLocaleState(linkLocaleStatsDO);
}
LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()
.os(statsRecord.getOs())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkOsStatsMapper.shortLinkOsState(linkOsStatsDO);
LinkBrowserStatsDO linkBrowserStatsDO = LinkBrowserStatsDO.builder()
.browser(statsRecord.getBrowser())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkBrowserStatsMapper.shortLinkBrowserState(linkBrowserStatsDO);
LinkDeviceStatsDO linkDeviceStatsDO = LinkDeviceStatsDO.builder()
.device(statsRecord.getDevice())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkDeviceStatsMapper.shortLinkDeviceState(linkDeviceStatsDO);
LinkNetworkStatsDO linkNetworkStatsDO = LinkNetworkStatsDO.builder()
.network(statsRecord.getNetwork())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkNetworkStatsMapper.shortLinkNetworkState(linkNetworkStatsDO);
LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()
.user(statsRecord.getUv())
.ip(statsRecord.getRemoteAddr())
.browser(statsRecord.getBrowser())
.os(statsRecord.getOs())
.network(statsRecord.getNetwork())
.device(statsRecord.getDevice())
.locale(StrUtil.join("-", "中国", actualProvince, actualCity))
.gid(gid)
.fullShortUrl(fullShortUrl)
.build();
linkAccessLogsMapper.insert(linkAccessLogsDO);
shortLinkMapper.incrementStats(gid, fullShortUrl, 1, statsRecord.getUvFirstFlag() ? 1 : 0, statsRecord.getUipFirstFlag() ? 1 : 0);
LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()
.todayPv(1)
.todayUv(statsRecord.getUvFirstFlag() ? 1 : 0)
.todayUip(statsRecord.getUipFirstFlag() ? 1 : 0)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO);
} catch (Throwable ex) {
log.error("短链接访问量统计异常", ex);
} finally {
rLock.unlock();
}
}
}

幂等处理

通过 Redis Setnx 命令实现消息幂等处理,保证消息不会重复消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Component
@RequiredArgsConstructor
public class MessageQueueIdempotentHandler {

private final StringRedisTemplate stringRedisTemplate;

private static final String IDEMPOTENT_KEY_PREFIX = "short-link:idempotent:";

/**
* 判断当前消息是否消费过
*
* @param messageId 消息唯一标识
* @return 消息是否消费过
*/
public boolean isMessageProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return Boolean.TRUE.equals(stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES));
}

/**
* 判断消息消费流程是否执行完成
*
* @param messageId 消息唯一标识
* @return 消息是否执行完成
*/
public boolean isAccomplish(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
return Objects.equals(stringRedisTemplate.opsForValue().get(key), "1");
}

/**
* 设置消息流程执行完成
*
* @param messageId 消息唯一标识
*/
public void setAccomplish(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);
}

/**
* 如果消息处理遇到异常情况,删除幂等标识
*
* @param messageId 消息唯一标识
*/
public void delMessageProcessed(String messageId) {
String key = IDEMPOTENT_KEY_PREFIX + messageId;
stringRedisTemplate.delete(key);
}
}