diff --git a/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java b/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java index fabe4ec20..d88e4dfca 100644 --- a/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java +++ b/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java @@ -308,7 +308,7 @@ public enum ErrorCodeEnum { CONFIG_NOT_EXIST(1610006,"配置不存在或被禁用,请确认!",null), MESSAGE_NOT_EXIST(1610007,"消息模板不存在或已被删除",null), MESSAGE_NOT_HANDLED(1610008,"当前消息无需处理,请确认消息处理类型!",null), - + MESSAGE_PUBLISH(1610009,"您选择通知任务正在发布中,请稍后重试!",null), NOT_FLAGSHIP_STORE(16100005,"非直营店,无法跳过缴费阶段!",null), NOT_FLAGSHIP_STORE_NOT_EXIST(16100006,"当前阶段加盟类型不能变更!",null), diff --git a/coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java b/coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java index 3e6083657..403e20fd0 100644 --- a/coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java +++ b/coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java @@ -1,16 +1,13 @@ package com.cool.store.executor; -import com.cool.store.constants.CommonConstants; -import com.cool.store.utils.UUIDUtils; -import org.apache.commons.lang3.StringUtils; +import com.cool.store.utils.ThreadMdcUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.Map; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -34,7 +31,7 @@ public class MdcTaskExecutor extends ThreadPoolTaskExecutor { MDC.setContextMap(context); } //直接给子线程设置MDC - setTraceIdIfAbsent(); + ThreadMdcUtil.setTraceIdIfAbsent(); try { //执行任务 result = task.call(); @@ -63,7 +60,7 @@ public class MdcTaskExecutor extends ThreadPoolTaskExecutor { MDC.setContextMap(context); } //直接给子线程设置MDC - setTraceIdIfAbsent(); + ThreadMdcUtil.setTraceIdIfAbsent(); try { //执行任务 task.run(); @@ -80,17 +77,4 @@ public class MdcTaskExecutor extends ThreadPoolTaskExecutor { } }); } - - public static void setTraceIdIfAbsent() { - String key = CommonConstants.REQUEST_ID; - String value = UUIDUtils.get32UUID(); - String k = MDC.get(key); - if (StringUtils.isBlank(value)) { - value = UUID.randomUUID().toString().replace("-", ""); - } - - if (StringUtils.isBlank(k)) { - MDC.put(key, value); - } - } } diff --git a/coolstore-partner-common/src/main/java/com/cool/store/executor/ThreadPoolTask.java b/coolstore-partner-common/src/main/java/com/cool/store/executor/ThreadPoolTask.java new file mode 100644 index 000000000..e060e32a6 --- /dev/null +++ b/coolstore-partner-common/src/main/java/com/cool/store/executor/ThreadPoolTask.java @@ -0,0 +1,42 @@ +package com.cool.store.executor; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @Author suzhuhong + * @Date 2025/10/15 17:29 + * @Version 1.0 + */ +@Configuration +public class ThreadPoolTask { + + @Bean + public TaskExecutor noticeThreadPool() { + int cores = Runtime.getRuntime().availableProcessors(); + + ThreadPoolTaskExecutor executor = new MdcTaskExecutor(); + // 核心线程数目 + executor.setCorePoolSize(cores*2); + // 指定最大线程数 + executor.setMaxPoolSize(100); + // 队列中最大的数目 + executor.setQueueCapacity(10000); + // 线程名称前缀 + executor.setThreadNamePrefix("noticeThreadPool_"); + // 对拒绝task的处理策略 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 线程空闲后的最大存活时间 + executor.setKeepAliveSeconds(60); + // 加载 + executor.initialize(); + return executor; + } + + + +} diff --git a/coolstore-partner-common/src/main/java/com/cool/store/utils/MDCUtils.java b/coolstore-partner-common/src/main/java/com/cool/store/utils/MDCUtils.java new file mode 100644 index 000000000..0a38c2c56 --- /dev/null +++ b/coolstore-partner-common/src/main/java/com/cool/store/utils/MDCUtils.java @@ -0,0 +1,37 @@ +package com.cool.store.utils; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.MDC; + +import java.util.UUID; + +/** + * @Author suzhuhong + * @Date 2025/10/15 17:32 + * @Version 1.0 + */ +public class MDCUtils { + public MDCUtils() { + } + + public static void putIfAbsent(String key, String value) { + String k = MDC.get(key); + if (StringUtils.isBlank(value)) { + value = UUID.randomUUID().toString().replace("-", ""); + } + + if (StringUtils.isBlank(k)) { + MDC.put(key, value); + } + + } + + public static void put(String key, String value) { + MDC.put(key, value); + } + + public static void put(String key) { + MDC.put(key, UUID.randomUUID().toString().replace("-", "")); + } + +} diff --git a/coolstore-partner-common/src/main/java/com/cool/store/utils/ThreadMdcUtil.java b/coolstore-partner-common/src/main/java/com/cool/store/utils/ThreadMdcUtil.java new file mode 100644 index 000000000..b90fe93c1 --- /dev/null +++ b/coolstore-partner-common/src/main/java/com/cool/store/utils/ThreadMdcUtil.java @@ -0,0 +1,55 @@ +package com.cool.store.utils; + +import com.cool.store.constants.CommonConstants; +import org.slf4j.MDC; + +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * 线程MDC包装类 + * + * @author hetiantian + * @version 1.0 + * @Date 2020/03/18 15:18 + */ +public class ThreadMdcUtil { + public static void setTraceIdIfAbsent() { + MDCUtils.putIfAbsent(CommonConstants.REQUEST_ID, UUIDUtils.get32UUID()); + } + + public static Callable wrap(final Callable callable, final Map context) { + return () -> { + if (context == null) { + MDC.clear(); + } else { + MDC.setContextMap(context); + } + setTraceIdIfAbsent(); + try { + return callable.call(); + } finally { + MDC.clear(); + } + }; + } + + public static Runnable wrap(final Runnable runnable, final Map context) { + return new Runnable() { + @Override + public void run() { + if (context == null) { + MDC.clear(); + } else { + MDC.setContextMap(context); + } + setTraceIdIfAbsent(); + try { + runnable.run(); + } finally { + MDC.clear(); + } + } + }; + } +} diff --git a/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java b/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java index 8904afa7f..76c985caa 100644 --- a/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java +++ b/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java @@ -20,6 +20,7 @@ import com.cool.store.response.bigdata.ApiResponse; import com.cool.store.service.MessageTemplateService; import com.cool.store.service.StoreService; import com.cool.store.utils.CoolDateUtils; +import com.cool.store.utils.RedisUtilPool; import com.cool.store.vo.PartnerUserInfoVO; import com.cool.store.vo.notice.*; import com.github.pagehelper.PageHelper; @@ -28,10 +29,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; +import java.time.Duration; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -63,6 +67,10 @@ public class MessageTemplateServiceImpl implements MessageTemplateService { @Resource MatterConfigDAO matterConfigDAO; @Resource + RedisUtilPool redisUtilPool; + @Resource + TaskExecutor noticeThreadPool; + @Resource MessageIssueService messageIssueService; @@ -135,60 +143,138 @@ public class MessageTemplateServiceImpl implements MessageTemplateService { @Override public Boolean batchPublishMessageTemplate(BatchPublishRequest request, String userId) { + log.info("batchPublishMessageTemplate:{}",JSONObject.toJSONString(request)); if (CollectionUtils.isEmpty(request.getIds())||CollectionUtils.isEmpty(request.getStoreInfoList())||CollectionUtils.isEmpty(request.getUserInfoList())){ throw new ServiceException(ErrorCodeEnum.PARAMS_REQUIRED); } - - List storeAreaDTOS = getStoreRange(request.getStoreInfoList()); - List storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList()); - - Map> authUser = getAuthUser(request.getUserInfoList(), storeIds); - List list = messageTemplateDAO.getByIds(request.getIds()); //过滤 只保留未发布的 list = list.stream().filter(x -> PublishStatusEnum.UNPUBLISHED.getCode().equals(x.getPublishStatus())).collect(Collectors.toList()); if (CollUtil.isEmpty(list)){ log.info("未找到待发布消息模板"); + return Boolean.TRUE; } - List realtimeMessageList = new ArrayList<>(); - list.stream().forEach(x -> { - List result = new ArrayList<>(); - storeAreaDTOS.forEach(y->{ - if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){ - log.info("当前门店没有人员 门店名称:{}",y.getStoreName()); - return; - } - StoreMessageDO storeMessageDO = new StoreMessageDO(); - storeMessageDO.setStoreId(y.getStoreId()); - storeMessageDO.setStoreName(y.getStoreName()); - storeMessageDO.setStoreCode(y.getStoreCode()); - storeMessageDO.setMessageTemplateId(x.getId()); - storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode()); - storeMessageDO.setReadTime(new Date()); - storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode()); - storeMessageDO.setProcessTime(new Date()); - String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(",")); - storeMessageDO.setOperatorList(userIdStr); - result.add(storeMessageDO); - }); - storeMessageDAO.batchInsert(result); - if (MatterTypeEnum.REALTIME.getCode().equals(request.getMatterType())) { - realtimeMessageList.addAll(result); - } - }); + List lockKeys = list.stream() + .map(x -> "msg_template_publish:" + x.getId()) + .collect(Collectors.toList()); - List updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList()); - messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds, - JSONObject.toJSONString(request.getStoreInfoList()), - JSONObject.toJSONString(request.getUserInfoList()), - userId); - // 即时消息下发 - messageIssueService.issueMessage(realtimeMessageList); + boolean allLocksAcquired = tryAcquireLocks(lockKeys); + if (!allLocksAcquired) { + log.warn("存在正在发布的模板,本次操作取消。模板IDs: {}", JSONObject.toJSONString(lockKeys)); + throw new ServiceException(ErrorCodeEnum.MESSAGE_PUBLISH); + } + List finalList = list; + noticeThreadPool.execute(() -> { + syncPublish(request, userId, finalList, lockKeys); + }); return Boolean.TRUE; } + public Boolean syncPublish(BatchPublishRequest request, String userId, List list,List lockKeys){ + try { + List storeAreaDTOS = getStoreRange(request.getStoreInfoList()); + List storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList()); + Map> authUser = getAuthUser(request.getUserInfoList(), storeIds); + List realtimeMessageList = new ArrayList<>(); + list.stream().forEach(x -> { + List result = new ArrayList<>(); + storeAreaDTOS.forEach(y->{ + if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){ + log.info("当前门店没有人员 门店名称:{}",y.getStoreName()); + return; + } + StoreMessageDO storeMessageDO = new StoreMessageDO(); + storeMessageDO.setStoreId(y.getStoreId()); + storeMessageDO.setStoreName(y.getStoreName()); + storeMessageDO.setStoreCode(y.getStoreCode()); + storeMessageDO.setMessageTemplateId(x.getId()); + storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode()); + storeMessageDO.setReadTime(new Date()); + storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode()); + storeMessageDO.setProcessTime(new Date()); + String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(",")); + storeMessageDO.setOperatorList(userIdStr); + result.add(storeMessageDO); + }); + storeMessageDAO.batchInsert(result); + if (MatterTypeEnum.REALTIME.getCode().equals(request.getMatterType())) { + realtimeMessageList.addAll(result); + } + }); + + //存在第一个成功 第二个失败 会有问题 todo + List updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList()); + messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds, + JSONObject.toJSONString(request.getStoreInfoList()), + JSONObject.toJSONString(request.getUserInfoList()), + userId); + // 即时消息下发 + messageIssueService.issueMessage(realtimeMessageList); + } catch (Exception e) { + log.info("发布流程异常,已取消发布"); + } finally { + releaseLocks(lockKeys); + log.info("发布流程结束,已释放Redis锁"); + } + return Boolean.TRUE; + } + + + + + /** + * 尝试获取所有Redis锁 + */ + private boolean tryAcquireLocks(List lockKeys) { + boolean allSuccess = true; + List acquiredLocks = new ArrayList<>(); + + for (String lockKey : lockKeys) { + try { + // 设置锁,过期时间设为5分钟,防止死锁 + Boolean success = redisUtilPool.setNxExpire(lockKey, "locking",1000*60*5); + if (success != null && success) { + acquiredLocks.add(lockKey); + } else { + log.warn("获取Redis锁失败: {}", lockKey); + allSuccess = false; + break; + } + } catch (Exception e) { + log.error("获取Redis锁异常: {}", lockKey, e); + allSuccess = false; + break; + } + } + + // 如果任何一个锁获取失败,释放之前已经获取的锁 + if (!allSuccess) { + releaseLocks(acquiredLocks); + } + + return allSuccess; + } + + /** + * 释放Redis锁 + */ + private void releaseLocks(List lockKeys) { + if (CollUtil.isEmpty(lockKeys)) { + return; + } + for (String lockKey : lockKeys) { + try { + redisUtilPool.delKey(lockKey); + log.debug("释放Redis锁成功: {}", lockKey); + } catch (Exception e) { + log.error("释放Redis锁异常: {}", lockKey, e); + } + } + } + + @Override @Transactional(rollbackFor = Exception.class) diff --git a/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java b/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java index b9952bc74..e9a3e66f4 100644 --- a/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java +++ b/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java @@ -59,7 +59,8 @@ public class MessageTemplateController { @PostMapping("/batchPublish") @ApiOperation("批量发布") public ResponseResult batchPublishMessageTemplate(@RequestBody BatchPublishRequest request) { - return ResponseResult.success(messageTemplateService.batchPublishMessageTemplate(request, CurrentUserHolder.getUser().getUserId())); + messageTemplateService.batchPublishMessageTemplate(request, CurrentUserHolder.getUser().getUserId()); + return ResponseResult.success(Boolean.TRUE); } @PostMapping("/getMessageTemplateList")