From 47a371cbd75f9dcf14688772d692950a5e361ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E7=AB=B9=E7=BA=A2?= Date: Thu, 16 Oct 2025 12:05:09 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=E9=80=9A=E7=9F=A5=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=8F=91=E5=B8=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/cool/store/enums/ErrorCodeEnum.java | 2 +- .../cool/store/executor/MdcTaskExecutor.java | 80 +++++++++ .../cool/store/executor/ThreadPoolTask.java | 42 +++++ .../java/com/cool/store/utils/MDCUtils.java | 37 +++++ .../com/cool/store/utils/ThreadMdcUtil.java | 55 +++++++ .../impl/MessageTemplateServiceImpl.java | 153 ++++++++++++++---- .../webb/MessageTemplateController.java | 3 +- 7 files changed, 337 insertions(+), 35 deletions(-) create mode 100644 coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java create mode 100644 coolstore-partner-common/src/main/java/com/cool/store/executor/ThreadPoolTask.java create mode 100644 coolstore-partner-common/src/main/java/com/cool/store/utils/MDCUtils.java create mode 100644 coolstore-partner-common/src/main/java/com/cool/store/utils/ThreadMdcUtil.java diff --git a/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java b/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java index cfc810c3a..1941b2721 100644 --- a/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java +++ b/coolstore-partner-common/src/main/java/com/cool/store/enums/ErrorCodeEnum.java @@ -302,7 +302,7 @@ public enum ErrorCodeEnum { CONFIG_NOT_EXIST(1610006,"配置不存在或被禁用,请确认!",null), MESSAGE_NOT_EXIST(1610007,"消息模板不存在或已被删除",null), MESSAGE_NOT_HANDLED(1610008,"当前消息无需处理,请确认消息处理类型!",null), - + MESSAGE_PUBLISH(1610009,"您选择通知任务正在发布中,请稍后重试!",null), NOT_FLAGSHIP_STORE(16100005,"非直营店,无法跳过缴费阶段!",null), NOT_FLAGSHIP_STORE_NOT_EXIST(16100006,"当前阶段加盟类型不能变更!",null), diff --git a/coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java b/coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java new file mode 100644 index 000000000..403e20fd0 --- /dev/null +++ b/coolstore-partner-common/src/main/java/com/cool/store/executor/MdcTaskExecutor.java @@ -0,0 +1,80 @@ +package com.cool.store.executor; + + +import com.cool.store.utils.ThreadMdcUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + + +/** + * @author zhangchenbiao + * @FileName: MdcTaskExecutor + * @Description: + * @date 2021-11-02 21:00 + */ +public class MdcTaskExecutor extends ThreadPoolTaskExecutor { + private Logger log = LoggerFactory.getLogger(MdcTaskExecutor.class); + + @Override + public Future submit(Callable task) { + Map context = MDC.getCopyOfContextMap(); + return super.submit(() -> { + T result; + if (context != null) { + //将父线程的MDC内容传给子线程 + MDC.setContextMap(context); + } + //直接给子线程设置MDC + ThreadMdcUtil.setTraceIdIfAbsent(); + try { + //执行任务 + result = task.call(); + } finally { + log.info("ThreadMonitor:{}info:ExecutedTasks->{},totalTask->{}, RunningTasks->{}, PendingTasks->{},corePoolSize-{},currentPoolSize->{},LargestPoolSize->{}", + this.getThreadNamePrefix(),this.getThreadPoolExecutor().getCompletedTaskCount(),this.getThreadPoolExecutor().getTaskCount(), + this.getActiveCount(),this.getThreadPoolExecutor().getQueue().size(),this.getCorePoolSize(), + this.getPoolSize(),this.getThreadPoolExecutor().getLargestPoolSize()); + try { + MDC.clear(); + } catch (Exception e) { + log.warn("MDC clear exception", e); + } + } + return result; + }); + } + + @Override + public void execute(Runnable task) { + log.info("mdc thread pool task executor execute"); + Map context = MDC.getCopyOfContextMap(); + super.execute(() -> { + if (context != null) { + //将父线程的MDC内容传给子线程 + MDC.setContextMap(context); + } + //直接给子线程设置MDC + ThreadMdcUtil.setTraceIdIfAbsent(); + try { + //执行任务 + task.run(); + } finally { + log.info("ThreadMonitor:{}info:ExecutedTasks->{},totalTask->{}, RunningTasks->{}, PendingTasks->{},corePoolSize-{},currentPoolSize->{},LargestPoolSize->{}", + this.getThreadNamePrefix(),this.getThreadPoolExecutor().getCompletedTaskCount(),this.getThreadPoolExecutor().getTaskCount(), + this.getActiveCount(),this.getThreadPoolExecutor().getQueue().size(),this.getCorePoolSize(), + this.getPoolSize(),this.getThreadPoolExecutor().getLargestPoolSize()); + try { + MDC.clear(); + } catch (Exception e) { + log.warn("MDC clear exception", e); + } + } + }); + } +} diff --git a/coolstore-partner-common/src/main/java/com/cool/store/executor/ThreadPoolTask.java b/coolstore-partner-common/src/main/java/com/cool/store/executor/ThreadPoolTask.java new file mode 100644 index 000000000..e060e32a6 --- /dev/null +++ b/coolstore-partner-common/src/main/java/com/cool/store/executor/ThreadPoolTask.java @@ -0,0 +1,42 @@ +package com.cool.store.executor; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @Author suzhuhong + * @Date 2025/10/15 17:29 + * @Version 1.0 + */ +@Configuration +public class ThreadPoolTask { + + @Bean + public TaskExecutor noticeThreadPool() { + int cores = Runtime.getRuntime().availableProcessors(); + + ThreadPoolTaskExecutor executor = new MdcTaskExecutor(); + // 核心线程数目 + executor.setCorePoolSize(cores*2); + // 指定最大线程数 + executor.setMaxPoolSize(100); + // 队列中最大的数目 + executor.setQueueCapacity(10000); + // 线程名称前缀 + executor.setThreadNamePrefix("noticeThreadPool_"); + // 对拒绝task的处理策略 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 线程空闲后的最大存活时间 + executor.setKeepAliveSeconds(60); + // 加载 + executor.initialize(); + return executor; + } + + + +} diff --git a/coolstore-partner-common/src/main/java/com/cool/store/utils/MDCUtils.java b/coolstore-partner-common/src/main/java/com/cool/store/utils/MDCUtils.java new file mode 100644 index 000000000..0a38c2c56 --- /dev/null +++ b/coolstore-partner-common/src/main/java/com/cool/store/utils/MDCUtils.java @@ -0,0 +1,37 @@ +package com.cool.store.utils; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.MDC; + +import java.util.UUID; + +/** + * @Author suzhuhong + * @Date 2025/10/15 17:32 + * @Version 1.0 + */ +public class MDCUtils { + public MDCUtils() { + } + + public static void putIfAbsent(String key, String value) { + String k = MDC.get(key); + if (StringUtils.isBlank(value)) { + value = UUID.randomUUID().toString().replace("-", ""); + } + + if (StringUtils.isBlank(k)) { + MDC.put(key, value); + } + + } + + public static void put(String key, String value) { + MDC.put(key, value); + } + + public static void put(String key) { + MDC.put(key, UUID.randomUUID().toString().replace("-", "")); + } + +} diff --git a/coolstore-partner-common/src/main/java/com/cool/store/utils/ThreadMdcUtil.java b/coolstore-partner-common/src/main/java/com/cool/store/utils/ThreadMdcUtil.java new file mode 100644 index 000000000..b90fe93c1 --- /dev/null +++ b/coolstore-partner-common/src/main/java/com/cool/store/utils/ThreadMdcUtil.java @@ -0,0 +1,55 @@ +package com.cool.store.utils; + +import com.cool.store.constants.CommonConstants; +import org.slf4j.MDC; + +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * 线程MDC包装类 + * + * @author hetiantian + * @version 1.0 + * @Date 2020/03/18 15:18 + */ +public class ThreadMdcUtil { + public static void setTraceIdIfAbsent() { + MDCUtils.putIfAbsent(CommonConstants.REQUEST_ID, UUIDUtils.get32UUID()); + } + + public static Callable wrap(final Callable callable, final Map context) { + return () -> { + if (context == null) { + MDC.clear(); + } else { + MDC.setContextMap(context); + } + setTraceIdIfAbsent(); + try { + return callable.call(); + } finally { + MDC.clear(); + } + }; + } + + public static Runnable wrap(final Runnable runnable, final Map context) { + return new Runnable() { + @Override + public void run() { + if (context == null) { + MDC.clear(); + } else { + MDC.setContextMap(context); + } + setTraceIdIfAbsent(); + try { + runnable.run(); + } finally { + MDC.clear(); + } + } + }; + } +} diff --git a/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java b/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java index cd6227e88..257e8e74d 100644 --- a/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java +++ b/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java @@ -21,6 +21,7 @@ import com.cool.store.response.bigdata.ApiResponse; import com.cool.store.service.MessageTemplateService; import com.cool.store.service.StoreService; import com.cool.store.utils.CoolDateUtils; +import com.cool.store.utils.RedisUtilPool; import com.cool.store.vo.PartnerUserInfoVO; import com.cool.store.vo.notice.*; import com.github.pagehelper.PageHelper; @@ -31,10 +32,13 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.annotations.Param; import org.springframework.beans.BeanUtils; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; +import java.time.Duration; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -65,6 +69,10 @@ public class MessageTemplateServiceImpl implements MessageTemplateService { EnterpriseUserDAO enterpriseUserDAO; @Resource MatterConfigDAO matterConfigDAO; + @Resource + RedisUtilPool redisUtilPool; + @Resource + TaskExecutor noticeThreadPool; @@ -136,54 +144,133 @@ public class MessageTemplateServiceImpl implements MessageTemplateService { @Override public Boolean batchPublishMessageTemplate(BatchPublishRequest request, String userId) { + log.info("batchPublishMessageTemplate:{}",JSONObject.toJSONString(request)); if (CollectionUtils.isEmpty(request.getIds())||CollectionUtils.isEmpty(request.getStoreInfoList())||CollectionUtils.isEmpty(request.getUserInfoList())){ throw new ServiceException(ErrorCodeEnum.PARAMS_REQUIRED); } - - List storeAreaDTOS = getStoreRange(request.getStoreInfoList()); - List storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList()); - - Map> authUser = getAuthUser(request.getUserInfoList(), storeIds); - List list = messageTemplateDAO.getByIds(request.getIds()); //过滤 只保留未发布的 list = list.stream().filter(x -> PublishStatusEnum.UNPUBLISHED.getCode().equals(x.getPublishStatus())).collect(Collectors.toList()); if (CollUtil.isEmpty(list)){ log.info("未找到待发布消息模板"); + return Boolean.TRUE; } - list.stream().forEach(x -> { - List result = new ArrayList<>(); - storeAreaDTOS.forEach(y->{ - if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){ - log.info("当前门店没有人员 门店名称:{}",y.getStoreName()); - return; - } - StoreMessageDO storeMessageDO = new StoreMessageDO(); - storeMessageDO.setStoreId(y.getStoreId()); - storeMessageDO.setStoreName(y.getStoreName()); - storeMessageDO.setStoreCode(y.getStoreCode()); - storeMessageDO.setMessageTemplateId(x.getId()); - storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode()); - storeMessageDO.setReadTime(new Date()); - storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode()); - storeMessageDO.setProcessTime(new Date()); - String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(",")); - storeMessageDO.setOperatorList(userIdStr); - result.add(storeMessageDO); - }); - storeMessageDAO.batchInsert(result); - }); + List lockKeys = list.stream() + .map(x -> "msg_template_publish:" + x.getId()) + .collect(Collectors.toList()); - List updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList()); - messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds, - JSONObject.toJSONString(request.getStoreInfoList()), - JSONObject.toJSONString(request.getUserInfoList()), - userId); + boolean allLocksAcquired = tryAcquireLocks(lockKeys); + if (!allLocksAcquired) { + log.warn("存在正在发布的模板,本次操作取消。模板IDs: {}", JSONObject.toJSONString(lockKeys)); + throw new ServiceException(ErrorCodeEnum.MESSAGE_PUBLISH); + } + List finalList = list; + noticeThreadPool.execute(() -> { + syncPublish(request, userId, finalList, lockKeys); + }); return Boolean.TRUE; } + public Boolean syncPublish(BatchPublishRequest request, String userId, List list,List lockKeys){ + try { + List storeAreaDTOS = getStoreRange(request.getStoreInfoList()); + List storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList()); + Map> authUser = getAuthUser(request.getUserInfoList(), storeIds); + Thread.sleep(1000*20); + list.stream().forEach(x -> { + List result = new ArrayList<>(); + storeAreaDTOS.forEach(y->{ + if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){ + log.info("当前门店没有人员 门店名称:{}",y.getStoreName()); + return; + } + StoreMessageDO storeMessageDO = new StoreMessageDO(); + storeMessageDO.setStoreId(y.getStoreId()); + storeMessageDO.setStoreName(y.getStoreName()); + storeMessageDO.setStoreCode(y.getStoreCode()); + storeMessageDO.setMessageTemplateId(x.getId()); + storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode()); + storeMessageDO.setReadTime(new Date()); + storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode()); + storeMessageDO.setProcessTime(new Date()); + String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(",")); + storeMessageDO.setOperatorList(userIdStr); + result.add(storeMessageDO); + }); + storeMessageDAO.batchInsert(result); + }); + + //存在第一个成功 第二个失败 会有问题 todo + List updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList()); + messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds, + JSONObject.toJSONString(request.getStoreInfoList()), + JSONObject.toJSONString(request.getUserInfoList()), + userId); + } catch (Exception e) { + log.info("发布流程异常,已取消发布"); + } finally { + releaseLocks(lockKeys); + log.info("发布流程结束,已释放Redis锁"); + } + return Boolean.TRUE; + } + + + + + /** + * 尝试获取所有Redis锁 + */ + private boolean tryAcquireLocks(List lockKeys) { + boolean allSuccess = true; + List acquiredLocks = new ArrayList<>(); + + for (String lockKey : lockKeys) { + try { + // 设置锁,过期时间设为5分钟,防止死锁 + Boolean success = redisUtilPool.setNxExpire(lockKey, "locking",1000*60*5); + if (success != null && success) { + acquiredLocks.add(lockKey); + } else { + log.warn("获取Redis锁失败: {}", lockKey); + allSuccess = false; + break; + } + } catch (Exception e) { + log.error("获取Redis锁异常: {}", lockKey, e); + allSuccess = false; + break; + } + } + + // 如果任何一个锁获取失败,释放之前已经获取的锁 + if (!allSuccess) { + releaseLocks(acquiredLocks); + } + + return allSuccess; + } + + /** + * 释放Redis锁 + */ + private void releaseLocks(List lockKeys) { + if (CollUtil.isEmpty(lockKeys)) { + return; + } + for (String lockKey : lockKeys) { + try { + redisUtilPool.delKey(lockKey); + log.debug("释放Redis锁成功: {}", lockKey); + } catch (Exception e) { + log.error("释放Redis锁异常: {}", lockKey, e); + } + } + } + + @Override @Transactional(rollbackFor = Exception.class) diff --git a/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java b/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java index b9952bc74..e9a3e66f4 100644 --- a/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java +++ b/coolstore-partner-web/src/main/java/com/cool/store/controller/webb/MessageTemplateController.java @@ -59,7 +59,8 @@ public class MessageTemplateController { @PostMapping("/batchPublish") @ApiOperation("批量发布") public ResponseResult batchPublishMessageTemplate(@RequestBody BatchPublishRequest request) { - return ResponseResult.success(messageTemplateService.batchPublishMessageTemplate(request, CurrentUserHolder.getUser().getUserId())); + messageTemplateService.batchPublishMessageTemplate(request, CurrentUserHolder.getUser().getUserId()); + return ResponseResult.success(Boolean.TRUE); } @PostMapping("/getMessageTemplateList") From f112df17b07c0384b72822c1bf7bb72bf61f362d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E7=AB=B9=E7=BA=A2?= Date: Thu, 16 Oct 2025 13:14:43 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=E9=80=9A=E7=9F=A5=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=8F=91=E5=B8=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/cool/store/service/impl/MessageTemplateServiceImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java b/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java index 257e8e74d..8ed0160bd 100644 --- a/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java +++ b/coolstore-partner-service/src/main/java/com/cool/store/service/impl/MessageTemplateServiceImpl.java @@ -178,7 +178,6 @@ public class MessageTemplateServiceImpl implements MessageTemplateService { List storeAreaDTOS = getStoreRange(request.getStoreInfoList()); List storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList()); Map> authUser = getAuthUser(request.getUserInfoList(), storeIds); - Thread.sleep(1000*20); list.stream().forEach(x -> { List result = new ArrayList<>(); storeAreaDTOS.forEach(y->{