Merge branch 'cc_20251016_async' into 'master'

Cc 20251016 async

See merge request hangzhou/java/custom_zxjp!172
This commit is contained in:
苏竹红
2025-10-16 09:29:43 +00:00
7 changed files with 265 additions and 60 deletions

View File

@@ -308,7 +308,7 @@ public enum ErrorCodeEnum {
CONFIG_NOT_EXIST(1610006,"配置不存在或被禁用,请确认!",null),
MESSAGE_NOT_EXIST(1610007,"消息模板不存在或已被删除",null),
MESSAGE_NOT_HANDLED(1610008,"当前消息无需处理,请确认消息处理类型!",null),
MESSAGE_PUBLISH(1610009,"您选择通知任务正在发布中,请稍后重试!",null),
NOT_FLAGSHIP_STORE(16100005,"非直营店,无法跳过缴费阶段!",null),
NOT_FLAGSHIP_STORE_NOT_EXIST(16100006,"当前阶段加盟类型不能变更!",null),

View File

@@ -1,16 +1,13 @@
package com.cool.store.executor;
import com.cool.store.constants.CommonConstants;
import com.cool.store.utils.UUIDUtils;
import org.apache.commons.lang3.StringUtils;
import com.cool.store.utils.ThreadMdcUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@@ -34,7 +31,7 @@ public class MdcTaskExecutor extends ThreadPoolTaskExecutor {
MDC.setContextMap(context);
}
//直接给子线程设置MDC
setTraceIdIfAbsent();
ThreadMdcUtil.setTraceIdIfAbsent();
try {
//执行任务
result = task.call();
@@ -63,7 +60,7 @@ public class MdcTaskExecutor extends ThreadPoolTaskExecutor {
MDC.setContextMap(context);
}
//直接给子线程设置MDC
setTraceIdIfAbsent();
ThreadMdcUtil.setTraceIdIfAbsent();
try {
//执行任务
task.run();
@@ -80,17 +77,4 @@ public class MdcTaskExecutor extends ThreadPoolTaskExecutor {
}
});
}
public static void setTraceIdIfAbsent() {
String key = CommonConstants.REQUEST_ID;
String value = UUIDUtils.get32UUID();
String k = MDC.get(key);
if (StringUtils.isBlank(value)) {
value = UUID.randomUUID().toString().replace("-", "");
}
if (StringUtils.isBlank(k)) {
MDC.put(key, value);
}
}
}

View File

@@ -0,0 +1,42 @@
package com.cool.store.executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author suzhuhong
* @Date 2025/10/15 17:29
* @Version 1.0
*/
@Configuration
public class ThreadPoolTask {
@Bean
public TaskExecutor noticeThreadPool() {
int cores = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new MdcTaskExecutor();
// 核心线程数目
executor.setCorePoolSize(cores*2);
// 指定最大线程数
executor.setMaxPoolSize(100);
// 队列中最大的数目
executor.setQueueCapacity(10000);
// 线程名称前缀
executor.setThreadNamePrefix("noticeThreadPool_");
// 对拒绝task的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程空闲后的最大存活时间
executor.setKeepAliveSeconds(60);
// 加载
executor.initialize();
return executor;
}
}

View File

@@ -0,0 +1,37 @@
package com.cool.store.utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;
import java.util.UUID;
/**
* @Author suzhuhong
* @Date 2025/10/15 17:32
* @Version 1.0
*/
public class MDCUtils {
public MDCUtils() {
}
public static void putIfAbsent(String key, String value) {
String k = MDC.get(key);
if (StringUtils.isBlank(value)) {
value = UUID.randomUUID().toString().replace("-", "");
}
if (StringUtils.isBlank(k)) {
MDC.put(key, value);
}
}
public static void put(String key, String value) {
MDC.put(key, value);
}
public static void put(String key) {
MDC.put(key, UUID.randomUUID().toString().replace("-", ""));
}
}

View File

@@ -0,0 +1,55 @@
package com.cool.store.utils;
import com.cool.store.constants.CommonConstants;
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* 线程MDC包装类
*
* @author hetiantian
* @version 1.0
* @Date 2020/03/18 15:18
*/
public class ThreadMdcUtil {
public static void setTraceIdIfAbsent() {
MDCUtils.putIfAbsent(CommonConstants.REQUEST_ID, UUIDUtils.get32UUID());
}
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
return callable.call();
} finally {
MDC.clear();
}
};
}
public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return new Runnable() {
@Override
public void run() {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
runnable.run();
} finally {
MDC.clear();
}
}
};
}
}

View File

@@ -20,6 +20,7 @@ import com.cool.store.response.bigdata.ApiResponse;
import com.cool.store.service.MessageTemplateService;
import com.cool.store.service.StoreService;
import com.cool.store.utils.CoolDateUtils;
import com.cool.store.utils.RedisUtilPool;
import com.cool.store.vo.PartnerUserInfoVO;
import com.cool.store.vo.notice.*;
import com.github.pagehelper.PageHelper;
@@ -28,10 +29,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -63,6 +67,10 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
@Resource
MatterConfigDAO matterConfigDAO;
@Resource
RedisUtilPool redisUtilPool;
@Resource
TaskExecutor noticeThreadPool;
@Resource
MessageIssueService messageIssueService;
@@ -135,60 +143,138 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
@Override
public Boolean batchPublishMessageTemplate(BatchPublishRequest request, String userId) {
log.info("batchPublishMessageTemplate:{}",JSONObject.toJSONString(request));
if (CollectionUtils.isEmpty(request.getIds())||CollectionUtils.isEmpty(request.getStoreInfoList())||CollectionUtils.isEmpty(request.getUserInfoList())){
throw new ServiceException(ErrorCodeEnum.PARAMS_REQUIRED);
}
List<StoreAreaDTO> storeAreaDTOS = getStoreRange(request.getStoreInfoList());
List<String> storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList());
Map<String, List<String>> authUser = getAuthUser(request.getUserInfoList(), storeIds);
List<MessageTemplateDO> list = messageTemplateDAO.getByIds(request.getIds());
//过滤 只保留未发布的
list = list.stream().filter(x -> PublishStatusEnum.UNPUBLISHED.getCode().equals(x.getPublishStatus())).collect(Collectors.toList());
if (CollUtil.isEmpty(list)){
log.info("未找到待发布消息模板");
return Boolean.TRUE;
}
List<StoreMessageDO> realtimeMessageList = new ArrayList<>();
list.stream().forEach(x -> {
List<StoreMessageDO> result = new ArrayList<>();
storeAreaDTOS.forEach(y->{
if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){
log.info("当前门店没有人员 门店名称:{}",y.getStoreName());
return;
}
StoreMessageDO storeMessageDO = new StoreMessageDO();
storeMessageDO.setStoreId(y.getStoreId());
storeMessageDO.setStoreName(y.getStoreName());
storeMessageDO.setStoreCode(y.getStoreCode());
storeMessageDO.setMessageTemplateId(x.getId());
storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode());
storeMessageDO.setReadTime(new Date());
storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode());
storeMessageDO.setProcessTime(new Date());
String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(","));
storeMessageDO.setOperatorList(userIdStr);
result.add(storeMessageDO);
});
storeMessageDAO.batchInsert(result);
if (MatterTypeEnum.REALTIME.getCode().equals(request.getMatterType())) {
realtimeMessageList.addAll(result);
}
});
List<String> lockKeys = list.stream()
.map(x -> "msg_template_publish:" + x.getId())
.collect(Collectors.toList());
List<Long> updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList());
messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds,
JSONObject.toJSONString(request.getStoreInfoList()),
JSONObject.toJSONString(request.getUserInfoList()),
userId);
// 即时消息下发
messageIssueService.issueMessage(realtimeMessageList);
boolean allLocksAcquired = tryAcquireLocks(lockKeys);
if (!allLocksAcquired) {
log.warn("存在正在发布的模板本次操作取消。模板IDs: {}", JSONObject.toJSONString(lockKeys));
throw new ServiceException(ErrorCodeEnum.MESSAGE_PUBLISH);
}
List<MessageTemplateDO> finalList = list;
noticeThreadPool.execute(() -> {
syncPublish(request, userId, finalList, lockKeys);
});
return Boolean.TRUE;
}
public Boolean syncPublish(BatchPublishRequest request, String userId, List<MessageTemplateDO> list,List<String> lockKeys){
try {
List<StoreAreaDTO> storeAreaDTOS = getStoreRange(request.getStoreInfoList());
List<String> storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList());
Map<String, List<String>> authUser = getAuthUser(request.getUserInfoList(), storeIds);
List<StoreMessageDO> realtimeMessageList = new ArrayList<>();
list.stream().forEach(x -> {
List<StoreMessageDO> result = new ArrayList<>();
storeAreaDTOS.forEach(y->{
if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){
log.info("当前门店没有人员 门店名称:{}",y.getStoreName());
return;
}
StoreMessageDO storeMessageDO = new StoreMessageDO();
storeMessageDO.setStoreId(y.getStoreId());
storeMessageDO.setStoreName(y.getStoreName());
storeMessageDO.setStoreCode(y.getStoreCode());
storeMessageDO.setMessageTemplateId(x.getId());
storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode());
storeMessageDO.setReadTime(new Date());
storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode());
storeMessageDO.setProcessTime(new Date());
String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(","));
storeMessageDO.setOperatorList(userIdStr);
result.add(storeMessageDO);
});
storeMessageDAO.batchInsert(result);
if (MatterTypeEnum.REALTIME.getCode().equals(request.getMatterType())) {
realtimeMessageList.addAll(result);
}
});
//存在第一个成功 第二个失败 会有问题 todo
List<Long> updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList());
messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds,
JSONObject.toJSONString(request.getStoreInfoList()),
JSONObject.toJSONString(request.getUserInfoList()),
userId);
// 即时消息下发
messageIssueService.issueMessage(realtimeMessageList);
} catch (Exception e) {
log.info("发布流程异常,已取消发布");
} finally {
releaseLocks(lockKeys);
log.info("发布流程结束已释放Redis锁");
}
return Boolean.TRUE;
}
/**
* 尝试获取所有Redis锁
*/
private boolean tryAcquireLocks(List<String> lockKeys) {
boolean allSuccess = true;
List<String> acquiredLocks = new ArrayList<>();
for (String lockKey : lockKeys) {
try {
// 设置锁过期时间设为5分钟防止死锁
Boolean success = redisUtilPool.setNxExpire(lockKey, "locking",1000*60*5);
if (success != null && success) {
acquiredLocks.add(lockKey);
} else {
log.warn("获取Redis锁失败: {}", lockKey);
allSuccess = false;
break;
}
} catch (Exception e) {
log.error("获取Redis锁异常: {}", lockKey, e);
allSuccess = false;
break;
}
}
// 如果任何一个锁获取失败,释放之前已经获取的锁
if (!allSuccess) {
releaseLocks(acquiredLocks);
}
return allSuccess;
}
/**
* 释放Redis锁
*/
private void releaseLocks(List<String> lockKeys) {
if (CollUtil.isEmpty(lockKeys)) {
return;
}
for (String lockKey : lockKeys) {
try {
redisUtilPool.delKey(lockKey);
log.debug("释放Redis锁成功: {}", lockKey);
} catch (Exception e) {
log.error("释放Redis锁异常: {}", lockKey, e);
}
}
}
@Override
@Transactional(rollbackFor = Exception.class)

View File

@@ -59,7 +59,8 @@ public class MessageTemplateController {
@PostMapping("/batchPublish")
@ApiOperation("批量发布")
public ResponseResult<Boolean> batchPublishMessageTemplate(@RequestBody BatchPublishRequest request) {
return ResponseResult.success(messageTemplateService.batchPublishMessageTemplate(request, CurrentUserHolder.getUser().getUserId()));
messageTemplateService.batchPublishMessageTemplate(request, CurrentUserHolder.getUser().getUserId());
return ResponseResult.success(Boolean.TRUE);
}
@PostMapping("/getMessageTemplateList")