优惠券模块核心逻辑

1. 优惠券活动查询缓存预热

1. 定时程序预热

通过定时任务,查询数据库中一个月内即将开始或者已开始的优惠券活动。
同时将活动的优惠券库存写入缓存,已开始的活动库存不更新,未开始的活动库存更新。
定时程序代码

java
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 活动预热,整点预热
*
*/
@XxlJob("activityPreheat")
public void activityPreHeat() {
log.info("优惠券活动定时预热...");
try {
activityService.preHeat();
} catch (Exception e) {
e.printStackTrace();
}
}

service代码

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void preHeat() {
//当前时间
LocalDateTime now = DateUtils.now();

//查询进行中还未到结束的优惠券活动, 1个月内待开始的优惠券活动
/**
select *
from activity t
where t.distribute_start_time < date_add(now(), INTERVAL 30 day)) and t.status in(1,2)
order by t.distribute_start_time
*/
List<Activity> list = lambdaQuery()
.le(Activity::getDistributeStartTime, now.plusDays(30))//1个月内即将开始的
.in(Activity::getStatus, Arrays.asList(NO_DISTRIBUTE.getStatus(), DISTRIBUTING.getStatus()))//查询待开始和进行中的
.orderByAsc(Activity::getDistributeStartTime)
.list();
if (CollUtils.isEmpty(list)) {
//防止缓存穿透
list = new ArrayList<>();
}
// 2.数据转换
List<SeizeCouponInfoResDTO> seizeCouponInfoResDTOS = BeanUtils.copyToList(list, SeizeCouponInfoResDTO.class);
String seizeCouponInfoStr = JsonUtils.toJsonStr(seizeCouponInfoResDTOS);

// 3.活动列表写入缓存
redisTemplate.opsForValue().set(ACTIVITY_CACHE_LIST, seizeCouponInfoStr);

// 库存写入缓存,已开始的活动不能更新,由于库存在减少,数据库库存根据缓存更新
// 将待生效的活动库存写入redis
list.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==1).forEach(v->{
redisTemplate.opsForHash().put(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10), v.getId(), v.getTotalNum());
});
// 对于已生效的活动库存没有同步时再进行同步
list.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==2).forEach(v->{
redisTemplate.opsForHash().putIfAbsent(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10), v.getId(), v.getTotalNum());
});
}

2. 定时程序更新活动状态

定时任务将数据库中活动的状态根据时间进行更新,已开始的活动状态更新为进行中,已结束的活动状态更新为已失效。

定时程序代码

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 活动状态修改,
* 1.活动进行中状态修改
* 2.活动已失效状态修改
* 每分钟执行一次
*/
@XxlJob("updateActivityStatus")
public void updateActivitySatus(){
log.info("定时修改活动状态...");
try {
activityService.updateStatus();
} catch (Exception e) {
e.printStackTrace();
}
}

service代码

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void updateStatus() {
LocalDateTime now = DateUtils.now();
// 1.更新已经进行中的状态
lambdaUpdate()
.set(Activity::getStatus, ActivityStatusEnum.DISTRIBUTING.getStatus())//更新活动状态为进行中
.eq(Activity::getStatus, NO_DISTRIBUTE)//检索待生效的活动
.le(Activity::getDistributeStartTime, now)//活动开始时间小于等于当前时间
.gt(Activity::getDistributeEndTime,now)//活动结束时间大于当前时间
.update();
// 2.更新已经结束的
lambdaUpdate()
.set(Activity::getStatus, LOSE_EFFICACY.getStatus())//更新活动状态为已失效
.in(Activity::getStatus, Arrays.asList(DISTRIBUTING.getStatus(), NO_DISTRIBUTE.getStatus()))//检索待生效及进行中的活动
.lt(Activity::getDistributeEndTime, now)//活动结束时间小于当前时间
.update();
}

3. 活动查询

从缓存中查询活动信息,同时调用getStatus方法,根据活动的开始时间、结束时间、状态,确定活动的实际状态。
(由于定时任务更新数据库活动状态为1分钟一次,缓存中的活动状态存在不一致,所以需要调用getStatus方法返回实际状态)

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public List<SeizeCouponInfoResDTO> queryForListFromCache(Integer tabType) {
//从redis查询活动信息
Object seizeCouponInfoStr = redisTemplate.opsForValue().get(ACTIVITY_CACHE_LIST);
if (ObjectUtils.isNull(seizeCouponInfoStr)) {
return CollUtils.emptyList();
}
//将json转为List
List<SeizeCouponInfoResDTO> seizeCouponInfoResDTOS = JsonUtils.toList(seizeCouponInfoStr.toString(), SeizeCouponInfoResDTO.class);
//根据tabType确定要查询的状态
int queryStatus = tabType == TabTypeConstants.SEIZING ? DISTRIBUTING.getStatus() : NO_DISTRIBUTE.getStatus();
//过滤数据,并设置剩余数量、实际状态
List<SeizeCouponInfoResDTO> collect = seizeCouponInfoResDTOS.stream().filter(item -> queryStatus == getStatus(item.getDistributeStartTime(), item.getDistributeEndTime(), item.getStatus()))
.peek(item -> {
//剩余数量
item.setRemainNum(item.getStockNum());
//状态
item.setStatus(queryStatus);
}).collect(Collectors.toList());


return collect;
}

/**
* 获取状态,
* 用于xxl或其他定时任务在高性能要求下无法做到实时状态
*
* @return
*/
private int getStatus(LocalDateTime distributeStartTime, LocalDateTime distributeEndTime, Integer status) {
if (NO_DISTRIBUTE.equals(status) &&
distributeStartTime.isBefore(DateUtils.now()) &&
distributeEndTime.isAfter(DateUtils.now())) {//待生效状态,实际活动已开始
return DISTRIBUTING.getStatus();
}else if(NO_DISTRIBUTE.equals(status) &&
distributeEndTime.isBefore(DateUtils.now())){//待生效状态,实际活动已结束
return LOSE_EFFICACY.getStatus();
}else if (DISTRIBUTING.equals(status) &&
distributeEndTime.isBefore(DateUtils.now())) {//进行中状态,实际活动已结束
return LOSE_EFFICACY.getStatus();
}
return status;
}

2. 抢券

高并发场景下直接去查询数据库会对数据库造成压力,所以从 Redis 缓存操作库存。
采用实现 Redis 的原子性而不是使用分布式锁。

扣减库存逻辑如下:
1、首先查询库存
2、判断库存大小,如果大于0则扣减库存,否则 直接返回
3、记录抢券成功的记录,用于判断用户不能重复抢券的依据。
4、记录抢券同步的记录,用于后续的异步处理,将抢券结果保存到数据库。

抢券流程

非原子性操作,通过 Redis + Lua 脚本实现原子性操作。

抢券完整流程:

数据流:

1. Redis Key 设计

在集群环境下,一个 Lua 脚本中的所有 Redis Key 必须在同一个节点上,否则会出现错误。
一次执行Lua脚本的所有key中使用大括号‘{}’且保证大括号中的内容相同,此时会根据大括号中的内容求哈希,因为内容相同所以求得的哈希数据相同所以就落在了同一个Redis节点。
通过{活动id%10}保证同一个活动的所有key都在同一个Redis节点上。

  1. 活动信息
    缓存结构:String类型:
    key: “ACTIVITY:LIST”
    value: 符合条件的优惠券活动列表JSON数据。
    过期时间:永不过期
    缓存一致性方案:通过预热程序保证缓存一致性

  2. 优惠券活动库存
    缓存结构:Hash
    RedisKey:COUPON:RESOURCE:STOCK:{活动id%10}
    {活动id%10}表示根据活动id除以10求余,通过这种方法将key分散到不同的redis服务器上,通过“活动id%10”表达式可知优惠券活动库存hash最多有10个。
    HashKey:活动id
    HashValue: 库存
    过期时间:永不过期
    缓存一致性方案:通过预热程序保证缓存一致性

  3. 抢券成功队列
    缓存结构:Hash
    RedisKey:COUPON:SEIZE:LIST:活动id_{活动id%10}
    HashKey:用户id
    HashValue:1
    过期时间:永不过期

  4. 抢券同步队列
    缓存结构:Hash
    RedisKey:QUEUE:COUPON:SEIZE:SYNC:{活动id%10}
    HashKey:用户id
    HashValue:活动id
    过期时间:永不过期

2. 抢券 Lua 脚本:

判断用户是否在该活动抢过券。
判断库存是否充足
写入抢券成功列表
扣减库存
写入抢券同步列表

lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
```lua
-- 抢券Lua实现
-- key: 抢券同步队列,资源库存,抢券成功列表
-- argv:活动id,用户id

--优惠券是否已经抢过
local couponNum = redis.call("HGET", KEYS[3], ARGV[2])
-- hget 获取不到数据返回false而不是nil
if couponNum ~= false and tonumber(couponNum) >= 1
then
return "-1";
end
-- --库存是否充足校验
local stockNum = redis.call("HGET",KEYS[2], ARGV[1])
if stockNum == false or tonumber(stockNum) < 1
then
return "-2";
end
--抢券列表
local listNum = redis.call("HSET",KEYS[3], ARGV[2], 1)
if listNum == false or tonumber(listNum) < 1
then
return "-3";
end

--减库存
stockNum = redis.call("HINCRBY",KEYS[2], ARGV[1], -1)
if tonumber(stockNum) < 0
then
return "-4"
end
-- 抢单结果写入同步队列
local result = redis.call("HSETNX", KEYS[1], ARGV[2],ARGV[1])
if result > 0
then
return ARGV[1] ..""
end
return "-5"

错误代码:
-1: 限领一张
-2: 已抢光
-3: 写入抢券成功队列失败,返回给用户为:抢券失败
-4: 已抢光
-5: 写入抢券同步队列失败,返回给用户为:抢券失败

3. 抢券 Service 代码

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void seizeCoupon(SeizeCouponReqDTO seizeCouponReqDTO) {
// 1.校验活动开始时间或结束
// 抢券时间
ActivityInfoResDTO activity = activityService.getActivityInfoByIdFromCache(seizeCouponReqDTO.getId());
LocalDateTime now = DateUtils.now();
if (activity == null ||
activity.getDistributeStartTime().isAfter(now)) {
throw new CommonException(SEIZE_COUPON_FAILD, "活动未开始");
}
if (activity.getDistributeEndTime().isBefore(now)) {
throw new CommonException(SEIZE_COUPON_FAILD, "活动已结束");
}

// 2.抢券准备
// key: 抢券同步队列,资源库存,抢券列表
// argv:抢券id,用户id
int index = (int) (seizeCouponReqDTO.getId() % 10);
// 同步队列redisKey
String couponSeizeSyncRedisKey = RedisSyncQueueUtils.getQueueRedisKey(COUPON_SEIZE_SYNC_QUEUE_NAME, index);
// 资源库存redisKey
String resourceStockRedisKey = String.format(COUPON_RESOURCE_STOCK, index);
// 抢券列表
String couponSeizeListRedisKey = String.format(COUPON_SEIZE_LIST, activity.getId(), index);

log.debug("seize coupon keys -> couponSeizeListRedisKey->{},resourceStockRedisKey->{},couponSeizeListRedisKey->{},seizeCouponReqDTO.getId()->{},UserContext.currentUserId():{}",
couponSeizeListRedisKey, resourceStockRedisKey, couponSeizeListRedisKey, seizeCouponReqDTO.getId(), UserContext.currentUserId());
// 3.抢券结果
Object execute = redisTemplate.execute(seizeCouponScript, Arrays.asList(couponSeizeSyncRedisKey, resourceStockRedisKey, couponSeizeListRedisKey),
seizeCouponReqDTO.getId(), UserContext.currentUserId());
log.debug("seize coupon result : {}", execute);
// 4.处理lua脚本结果
if (execute == null) {
throw new CommonException(SEIZE_COUPON_FAILD, "抢券失败");
}
long result = NumberUtils.parseLong(execute.toString());
if (result > 0) {
return;
}
if (result == -1) {
throw new CommonException(SEIZE_COUPON_FAILD, "限领一张");
}
if (result == -2 || result == -4) {
throw new CommonException(SEIZE_COUPON_FAILD, "已抢光!");
}
throw new CommonException(SEIZE_COUPON_FAILD, "抢券失败");
}

3. 抢券同步处理

将 Redis 中的抢券结果同步到MySQL的优惠券表。
基本思路:遍历Redis中的抢券结果同步队列,拿到一个元素就向数据库的优惠券表插入记录,插入完成后删除Redis中的这条记录。

同步队列的key为:QUEUE:COUPON:SEIZE:SYNC:{活动id % 10},最多有10个同步列表。
可以用多线程,每个线程处理一个同步队列。
由定时任务去调度,每隔1分钟由多线程对同步队列中的数据进行处理。

1. 线程池设计

同步队列个数为10,我们需要定义一个最多有10个活跃线程的线程池,满负荷工作下每个线程处理一个同步队列,当满负荷工作时如果再有新的任务线程池拒绝任务。
针对这个任务,采用了使用SynchronousQueue阻塞队列,容量为1,在没有线程去消费时不会保存任务。并且指定拒绝策略为 DiscardPolicy,当线程池满了,直接丢弃任务。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class ThreadPoolConfiguration {

@Bean("syncThreadPool")
public ThreadPoolExecutor synchronizeThreadPool(RedisSyncProperties redisSyncProperties) {
// 定义线程池参数
int corePoolSize = 1; // 核心线程数
int maxPoolSize = redisSyncProperties.getQueueNum(); // 最大线程数
long keepAliveTime = 120; // 线程空闲时间
TimeUnit unit = TimeUnit.SECONDS; // 时间单位
// 指定拒绝策略为 DiscardPolicy
RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.DiscardPolicy();
// 任务队列,使用SynchronousQueue阻塞队列容量为1,在没有线程去消费时不会保存任务
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit,
new SynchronousQueue<>(),rejectedHandler);

return executor;
}
}

线程池调用

java
1
2
3
4
5
6
7
8
9
10
11
for(int index = 0; index < this.redisSyncProperties.getQueueNum(); ++index) {
try {
if (dataSyncExecutor == null) {
DEFAULT_SYNC_EXECUTOR.execute(this.getSyncThread(queueName, index, storageType, mode));
} else {
dataSyncExecutor.execute(this.getSyncThread(queueName, index, storageType, mode));
}
} catch (Exception e) {
log.error("同步数据处理异常,e:", e);
}
}

2. 获取同步队列中的数据

redisTemplate.opsForHash().scan(H key, ScanOptions options)方法,scan方法通过游标的方式实现从hash中批量获取数据。
每次从同步队列中获取100条数据,获取完数据后需要手动关闭游标。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void getData(String queue) {     
Cursor<Map.Entry<String, Object>> cursor = null;
// 通过scan从redis hash数据中批量获取数据,获取完数据需要手动关闭游标
ScanOptions scanOptions = ScanOptions.scanOptions()
.count(100)
.build();
try {
// sscan获取数据
cursor = redisTemplate.opsForHash().scan(queue, scanOptions);
// 遍历数据转换成SyncMessage列表
List<SyncMessage<Object>> collect = cursor.stream()
.map(entry -> SyncMessage
.builder()
.key(entry.getKey().toString())
.value(entry.getValue())
.build())
.collect(Collectors.toList());
log.info("{}获取{}数据{}条", Thread.currentThread().getId(),queue,collect.size());
collect.stream().forEach(System.out::println);
}catch (Exception e){
log.error("同步处理异常,e:", e);
throw new RuntimeException(e);
} finally {
// 关闭游标
if (cursor != null) {
cursor.close();
}
}
}

3. 同步函数

同步函数,将获取到的数据同步到数据库。

  • 插入优惠券表
  • 扣减库存
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Transactional(rollbackFor = Exception.class)
public void singleProcess(SyncMessage<Object> singleData) {
log.info("获取要同步抢券结果数据: {}",singleData);
//用户id
long userId = NumberUtils.parseLong(singleData.getKey());
//活动id
long activityId = NumberUtils.parseLong(singleData.getValue().toString());
log.info("userId={},activity={}",userId,activityId);
// 1.获取活动
Activity activity = activityService.getById(activityId);
if (activity == null) {
return;
}
CommonUserResDTO commonUserResDTO = commonUserApi.findById(userId);
if(commonUserResDTO == null){
return;
}
// 2.新增优惠券
Coupon coupon = new Coupon();
coupon.setId(IdUtils.getSnowflakeNextId());
coupon.setActivityId(activityId);
coupon.setUserId(userId);
coupon.setUserName(commonUserResDTO.getNickname());
coupon.setUserPhone(commonUserResDTO.getPhone());
coupon.setName(activity.getName());
coupon.setType(activity.getType());
coupon.setDiscountAmount(activity.getDiscountAmount());
coupon.setDiscountRate(activity.getDiscountRate());
coupon.setAmountCondition(activity.getAmountCondition());
coupon.setValidityTime(DateUtils.now().plusDays(activity.getValidityDays()));
coupon.setStatus(CouponStatusEnum.NO_USE.getStatus());

couponService.save(coupon);
//扣减库存
activityService.deductStock(activityId);
}

4. 线程安全问题

多线程在执行任务时,如果多个线程从同一个Redis Hash中获取数据就会出现重复处理数据的问题。
下边的情况出现这个问题:
定时任务每隔1分钟调用线程池处理一个数据同步任务,假如同步队列1的数据非常多,一轮结束后线程1还在处理同步队列1的数据,此时当第二轮开始又会从同步队列1开始分配线程去处理,将会找一个空闲线程去处理同步队列1的数据,此时将会有多个线程处理一个同步队列的数据。

使用分布式锁去控制:每个同步队列使用一把锁进行控制,当线程1还没有处理完时锁不进行释放,这样线程2就无法获取同步队列1的锁,解决了多个线程处理同一个队列的问题。

使用tryLock方法获取锁时传3个参数:

  • waitTime:尝试获取锁的最大等待时间,在这个时间范围内会不断地尝试获取锁,如果在 waitTime 时间内未能获取到锁,则返回 false。waitTime默认为-1,表示获取锁失败后立刻返回不重试。
  • leaseTime:表示持锁的时间,即锁的自动释放时间。在获取锁成功后,锁会在 leaseTime 时间后自动释放。如果在持锁的时间内未手动释放锁,锁也会在 leaseTime 时间后自动释放。
  • TimeUnit:表示时间单位,可以是秒、毫秒等。
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
try {
if (lock.tryLock(0L, -1L, TimeUnit.SECONDS)) {
List<SyncMessage<T>> data = null;

while(CollUtils.isNotEmpty(data = this.getData())) {
this.process(data);

try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return;
}
} catch (Exception var10) {
return;
} finally {
if (lock != null && lock.isLocked()) {
lock.unlock();
}
}

5. 完整流程