技术 Java 项目 优惠券模块核心逻辑 penjc 2025-02-26 2025-03-10 1. 优惠券活动查询缓存预热
1. 定时程序预热 通过定时任务,查询数据库中一个月内即将开始或者已开始的优惠券活动。 同时将活动的优惠券库存写入缓存,已开始的活动库存不更新,未开始的活动库存更新。 定时程序代码
1 2 3 4 5 6 7 8 9 10 11 12 13 @XxlJob("activityPreheat") public void activityPreHeat () { log.info("优惠券活动定时预热..." ); try { activityService.preHeat(); } catch (Exception e) { e.printStackTrace(); } }
service代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public void preHeat () { LocalDateTime now = DateUtils.now(); List<Activity> list = lambdaQuery() .le(Activity::getDistributeStartTime, now.plusDays(30 )) .in(Activity::getStatus, Arrays.asList(NO_DISTRIBUTE.getStatus(), DISTRIBUTING.getStatus())) .orderByAsc(Activity::getDistributeStartTime) .list(); if (CollUtils.isEmpty(list)) { list = new ArrayList <>(); } List<SeizeCouponInfoResDTO> seizeCouponInfoResDTOS = BeanUtils.copyToList(list, SeizeCouponInfoResDTO.class); String seizeCouponInfoStr = JsonUtils.toJsonStr(seizeCouponInfoResDTOS); redisTemplate.opsForValue().set(ACTIVITY_CACHE_LIST, seizeCouponInfoStr); list.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==1 ).forEach(v->{ redisTemplate.opsForHash().put(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10 ), v.getId(), v.getTotalNum()); }); list.stream().filter(v->getStatus(v.getDistributeStartTime(),v.getDistributeEndTime(),v.getStatus())==2 ).forEach(v->{ redisTemplate.opsForHash().putIfAbsent(String.format(COUPON_RESOURCE_STOCK, v.getId() % 10 ), v.getId(), v.getTotalNum()); }); }
2. 定时程序更新活动状态 定时任务将数据库中活动的状态根据时间进行更新,已开始的活动状态更新为进行中,已结束的活动状态更新为已失效。
定时程序代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @XxlJob("updateActivityStatus") public void updateActivitySatus () { log.info("定时修改活动状态..." ); try { activityService.updateStatus(); } catch (Exception e) { e.printStackTrace(); } }
service代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void updateStatus () { LocalDateTime now = DateUtils.now(); lambdaUpdate() .set(Activity::getStatus, ActivityStatusEnum.DISTRIBUTING.getStatus()) .eq(Activity::getStatus, NO_DISTRIBUTE) .le(Activity::getDistributeStartTime, now) .gt(Activity::getDistributeEndTime,now) .update(); lambdaUpdate() .set(Activity::getStatus, LOSE_EFFICACY.getStatus()) .in(Activity::getStatus, Arrays.asList(DISTRIBUTING.getStatus(), NO_DISTRIBUTE.getStatus())) .lt(Activity::getDistributeEndTime, now) .update(); }
3. 活动查询 从缓存中查询活动信息,同时调用getStatus方法,根据活动的开始时间、结束时间、状态,确定活动的实际状态。 (由于定时任务更新数据库活动状态为1分钟一次,缓存中的活动状态存在不一致,所以需要调用getStatus方法返回实际状态)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Override public List<SeizeCouponInfoResDTO> queryForListFromCache (Integer tabType) { Object seizeCouponInfoStr = redisTemplate.opsForValue().get(ACTIVITY_CACHE_LIST); if (ObjectUtils.isNull(seizeCouponInfoStr)) { return CollUtils.emptyList(); } List<SeizeCouponInfoResDTO> seizeCouponInfoResDTOS = JsonUtils.toList(seizeCouponInfoStr.toString(), SeizeCouponInfoResDTO.class); int queryStatus = tabType == TabTypeConstants.SEIZING ? DISTRIBUTING.getStatus() : NO_DISTRIBUTE.getStatus(); List<SeizeCouponInfoResDTO> collect = seizeCouponInfoResDTOS.stream().filter(item -> queryStatus == getStatus(item.getDistributeStartTime(), item.getDistributeEndTime(), item.getStatus())) .peek(item -> { item.setRemainNum(item.getStockNum()); item.setStatus(queryStatus); }).collect(Collectors.toList()); return collect; } private int getStatus (LocalDateTime distributeStartTime, LocalDateTime distributeEndTime, Integer status) { if (NO_DISTRIBUTE.equals(status) && distributeStartTime.isBefore(DateUtils.now()) && distributeEndTime.isAfter(DateUtils.now())) { return DISTRIBUTING.getStatus(); }else if (NO_DISTRIBUTE.equals(status) && distributeEndTime.isBefore(DateUtils.now())){ return LOSE_EFFICACY.getStatus(); }else if (DISTRIBUTING.equals(status) && distributeEndTime.isBefore(DateUtils.now())) { return LOSE_EFFICACY.getStatus(); } return status; }
2. 抢券 高并发场景下直接去查询数据库会对数据库造成压力,所以从 Redis 缓存操作库存。 采用实现 Redis 的原子性而不是使用分布式锁。
扣减库存逻辑如下: 1、首先查询库存 2、判断库存大小,如果大于0则扣减库存,否则 直接返回 3、记录抢券成功的记录,用于判断用户不能重复抢券的依据。 4、记录抢券同步的记录,用于后续的异步处理,将抢券结果保存到数据库。
非原子性操作,通过 Redis + Lua 脚本实现原子性操作。
抢券完整流程:
数据流:
1. Redis Key 设计 在集群环境下,一个 Lua 脚本中的所有 Redis Key 必须在同一个节点上,否则会出现错误。 一次执行Lua脚本的所有key中使用大括号‘{}’且保证大括号中的内容相同,此时会根据大括号中的内容求哈希,因为内容相同所以求得的哈希数据相同所以就落在了同一个Redis节点。 通过{活动id%10}保证同一个活动的所有key都在同一个Redis节点上。
活动信息 缓存结构:String类型: key: “ACTIVITY:LIST” value: 符合条件的优惠券活动列表JSON数据。 过期时间:永不过期 缓存一致性方案:通过预热程序保证缓存一致性
优惠券活动库存 缓存结构:Hash RedisKey:COUPON:RESOURCE:STOCK:{活动id%10} {活动id%10}表示根据活动id除以10求余,通过这种方法将key分散到不同的redis服务器上,通过“活动id%10”表达式可知优惠券活动库存hash最多有10个。 HashKey:活动id HashValue: 库存 过期时间:永不过期 缓存一致性方案:通过预热程序保证缓存一致性
抢券成功队列 缓存结构:Hash RedisKey:COUPON:SEIZE:LIST:活动id_{活动id%10} HashKey:用户id HashValue:1 过期时间:永不过期
抢券同步队列 缓存结构:Hash RedisKey:QUEUE:COUPON:SEIZE:SYNC:{活动id%10} HashKey:用户id HashValue:活动id 过期时间:永不过期
2. 抢券 Lua 脚本: 判断用户是否在该活动抢过券。 判断库存是否充足 写入抢券成功列表 扣减库存 写入抢券同步列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 ```lua local couponNum = redis.call("HGET" , KEYS[3 ], ARGV[2 ])if couponNum ~= false and tonumber (couponNum) >= 1 then return "-1" ; end local stockNum = redis.call("HGET" ,KEYS[2 ], ARGV[1 ])if stockNum == false or tonumber (stockNum) < 1 then return "-2" ; end local listNum = redis.call("HSET" ,KEYS[3 ], ARGV[2 ], 1 )if listNum == false or tonumber (listNum) < 1 then return "-3" ; end stockNum = redis.call("HINCRBY" ,KEYS[2 ], ARGV[1 ], -1 ) if tonumber (stockNum) < 0 then return "-4" end local result = redis.call("HSETNX" , KEYS[1 ], ARGV[2 ],ARGV[1 ])if result > 0 then return ARGV[1 ] .."" end return "-5"
错误代码: -1: 限领一张 -2: 已抢光 -3: 写入抢券成功队列失败,返回给用户为:抢券失败 -4: 已抢光 -5: 写入抢券同步队列失败,返回给用户为:抢券失败
3. 抢券 Service 代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public void seizeCoupon (SeizeCouponReqDTO seizeCouponReqDTO) { ActivityInfoResDTO activity = activityService.getActivityInfoByIdFromCache(seizeCouponReqDTO.getId()); LocalDateTime now = DateUtils.now(); if (activity == null || activity.getDistributeStartTime().isAfter(now)) { throw new CommonException (SEIZE_COUPON_FAILD, "活动未开始" ); } if (activity.getDistributeEndTime().isBefore(now)) { throw new CommonException (SEIZE_COUPON_FAILD, "活动已结束" ); } int index = (int ) (seizeCouponReqDTO.getId() % 10 ); String couponSeizeSyncRedisKey = RedisSyncQueueUtils.getQueueRedisKey(COUPON_SEIZE_SYNC_QUEUE_NAME, index); String resourceStockRedisKey = String.format(COUPON_RESOURCE_STOCK, index); String couponSeizeListRedisKey = String.format(COUPON_SEIZE_LIST, activity.getId(), index); log.debug("seize coupon keys -> couponSeizeListRedisKey->{},resourceStockRedisKey->{},couponSeizeListRedisKey->{},seizeCouponReqDTO.getId()->{},UserContext.currentUserId():{}" , couponSeizeListRedisKey, resourceStockRedisKey, couponSeizeListRedisKey, seizeCouponReqDTO.getId(), UserContext.currentUserId()); Object execute = redisTemplate.execute(seizeCouponScript, Arrays.asList(couponSeizeSyncRedisKey, resourceStockRedisKey, couponSeizeListRedisKey), seizeCouponReqDTO.getId(), UserContext.currentUserId()); log.debug("seize coupon result : {}" , execute); if (execute == null ) { throw new CommonException (SEIZE_COUPON_FAILD, "抢券失败" ); } long result = NumberUtils.parseLong(execute.toString()); if (result > 0 ) { return ; } if (result == -1 ) { throw new CommonException (SEIZE_COUPON_FAILD, "限领一张" ); } if (result == -2 || result == -4 ) { throw new CommonException (SEIZE_COUPON_FAILD, "已抢光!" ); } throw new CommonException (SEIZE_COUPON_FAILD, "抢券失败" ); }
3. 抢券同步处理 将 Redis 中的抢券结果同步到MySQL的优惠券表。 基本思路:遍历Redis中的抢券结果同步队列,拿到一个元素就向数据库的优惠券表插入记录,插入完成后删除Redis中的这条记录。
同步队列的key为:QUEUE:COUPON:SEIZE:SYNC:{活动id % 10},最多有10个同步列表。 可以用多线程,每个线程处理一个同步队列。 由定时任务去调度,每隔1分钟由多线程对同步队列中的数据进行处理。
1. 线程池设计 同步队列个数为10,我们需要定义一个最多有10个活跃线程的线程池,满负荷工作下每个线程处理一个同步队列,当满负荷工作时如果再有新的任务线程池拒绝任务。 针对这个任务,采用了使用SynchronousQueue阻塞队列,容量为1,在没有线程去消费时不会保存任务。并且指定拒绝策略为 DiscardPolicy,当线程池满了,直接丢弃任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public class ThreadPoolConfiguration { @Bean("syncThreadPool") public ThreadPoolExecutor synchronizeThreadPool (RedisSyncProperties redisSyncProperties) { int corePoolSize = 1 ; int maxPoolSize = redisSyncProperties.getQueueNum(); long keepAliveTime = 120 ; TimeUnit unit = TimeUnit.SECONDS; RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor .DiscardPolicy(); ThreadPoolExecutor executor = new ThreadPoolExecutor (corePoolSize, maxPoolSize, keepAliveTime, unit, new SynchronousQueue <>(),rejectedHandler); return executor; } }
线程池调用
1 2 3 4 5 6 7 8 9 10 11 for (int index = 0 ; index < this .redisSyncProperties.getQueueNum(); ++index) { try { if (dataSyncExecutor == null ) { DEFAULT_SYNC_EXECUTOR.execute(this .getSyncThread(queueName, index, storageType, mode)); } else { dataSyncExecutor.execute(this .getSyncThread(queueName, index, storageType, mode)); } } catch (Exception e) { log.error("同步数据处理异常,e:" , e); } }
2. 获取同步队列中的数据 redisTemplate.opsForHash().scan(H key, ScanOptions options)方法,scan方法通过游标的方式实现从hash中批量获取数据。 每次从同步队列中获取100条数据,获取完数据后需要手动关闭游标。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void getData (String queue) { Cursor<Map.Entry<String, Object>> cursor = null ; ScanOptions scanOptions = ScanOptions.scanOptions() .count(100 ) .build(); try { cursor = redisTemplate.opsForHash().scan(queue, scanOptions); List<SyncMessage<Object>> collect = cursor.stream() .map(entry -> SyncMessage .builder() .key(entry.getKey().toString()) .value(entry.getValue()) .build()) .collect(Collectors.toList()); log.info("{}获取{}数据{}条" , Thread.currentThread().getId(),queue,collect.size()); collect.stream().forEach(System.out::println); }catch (Exception e){ log.error("同步处理异常,e:" , e); throw new RuntimeException (e); } finally { if (cursor != null ) { cursor.close(); } } }
3. 同步函数 同步函数,将获取到的数据同步到数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Transactional(rollbackFor = Exception.class) public void singleProcess (SyncMessage<Object> singleData) { log.info("获取要同步抢券结果数据: {}" ,singleData); long userId = NumberUtils.parseLong(singleData.getKey()); long activityId = NumberUtils.parseLong(singleData.getValue().toString()); log.info("userId={},activity={}" ,userId,activityId); Activity activity = activityService.getById(activityId); if (activity == null ) { return ; } CommonUserResDTO commonUserResDTO = commonUserApi.findById(userId); if (commonUserResDTO == null ){ return ; } Coupon coupon = new Coupon (); coupon.setId(IdUtils.getSnowflakeNextId()); coupon.setActivityId(activityId); coupon.setUserId(userId); coupon.setUserName(commonUserResDTO.getNickname()); coupon.setUserPhone(commonUserResDTO.getPhone()); coupon.setName(activity.getName()); coupon.setType(activity.getType()); coupon.setDiscountAmount(activity.getDiscountAmount()); coupon.setDiscountRate(activity.getDiscountRate()); coupon.setAmountCondition(activity.getAmountCondition()); coupon.setValidityTime(DateUtils.now().plusDays(activity.getValidityDays())); coupon.setStatus(CouponStatusEnum.NO_USE.getStatus()); couponService.save(coupon); activityService.deductStock(activityId); }
4. 线程安全问题 多线程在执行任务时,如果多个线程从同一个Redis Hash中获取数据就会出现重复处理数据的问题。 下边的情况出现这个问题: 定时任务每隔1分钟调用线程池处理一个数据同步任务,假如同步队列1的数据非常多,一轮结束后线程1还在处理同步队列1的数据,此时当第二轮开始又会从同步队列1开始分配线程去处理,将会找一个空闲线程去处理同步队列1的数据,此时将会有多个线程处理一个同步队列的数据。
使用分布式锁去控制:每个同步队列使用一把锁进行控制,当线程1还没有处理完时锁不进行释放,这样线程2就无法获取同步队列1的锁,解决了多个线程处理同一个队列的问题。
使用tryLock方法获取锁时传3个参数:
waitTime:尝试获取锁的最大等待时间,在这个时间范围内会不断地尝试获取锁,如果在 waitTime 时间内未能获取到锁,则返回 false。waitTime默认为-1,表示获取锁失败后立刻返回不重试。
leaseTime:表示持锁的时间,即锁的自动释放时间。在获取锁成功后,锁会在 leaseTime 时间后自动释放。如果在持锁的时间内未手动释放锁,锁也会在 leaseTime 时间后自动释放。
TimeUnit:表示时间单位,可以是秒、毫秒等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 try { if (lock.tryLock(0L , -1L , TimeUnit.SECONDS)) { List<SyncMessage<T>> data = null ; while (CollUtils.isNotEmpty(data = this .getData())) { this .process(data); try { Thread.sleep(500L ); } catch (InterruptedException e) { throw new RuntimeException (e); } } return ; } } catch (Exception var10) { return ; } finally { if (lock != null && lock.isLocked()) { lock.unlock(); } }
5. 完整流程