feat:通知异步发布

This commit is contained in:
苏竹红
2025-10-16 12:05:09 +08:00
parent b4e88f5424
commit 47a371cbd7
7 changed files with 337 additions and 35 deletions

View File

@@ -21,6 +21,7 @@ import com.cool.store.response.bigdata.ApiResponse;
import com.cool.store.service.MessageTemplateService;
import com.cool.store.service.StoreService;
import com.cool.store.utils.CoolDateUtils;
import com.cool.store.utils.RedisUtilPool;
import com.cool.store.vo.PartnerUserInfoVO;
import com.cool.store.vo.notice.*;
import com.github.pagehelper.PageHelper;
@@ -31,10 +32,13 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.annotations.Param;
import org.springframework.beans.BeanUtils;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -65,6 +69,10 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
EnterpriseUserDAO enterpriseUserDAO;
@Resource
MatterConfigDAO matterConfigDAO;
@Resource
RedisUtilPool redisUtilPool;
@Resource
TaskExecutor noticeThreadPool;
@@ -136,54 +144,133 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
@Override
public Boolean batchPublishMessageTemplate(BatchPublishRequest request, String userId) {
log.info("batchPublishMessageTemplate:{}",JSONObject.toJSONString(request));
if (CollectionUtils.isEmpty(request.getIds())||CollectionUtils.isEmpty(request.getStoreInfoList())||CollectionUtils.isEmpty(request.getUserInfoList())){
throw new ServiceException(ErrorCodeEnum.PARAMS_REQUIRED);
}
List<StoreAreaDTO> storeAreaDTOS = getStoreRange(request.getStoreInfoList());
List<String> storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList());
Map<String, List<String>> authUser = getAuthUser(request.getUserInfoList(), storeIds);
List<MessageTemplateDO> list = messageTemplateDAO.getByIds(request.getIds());
//过滤 只保留未发布的
list = list.stream().filter(x -> PublishStatusEnum.UNPUBLISHED.getCode().equals(x.getPublishStatus())).collect(Collectors.toList());
if (CollUtil.isEmpty(list)){
log.info("未找到待发布消息模板");
return Boolean.TRUE;
}
list.stream().forEach(x -> {
List<StoreMessageDO> result = new ArrayList<>();
storeAreaDTOS.forEach(y->{
if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){
log.info("当前门店没有人员 门店名称:{}",y.getStoreName());
return;
}
StoreMessageDO storeMessageDO = new StoreMessageDO();
storeMessageDO.setStoreId(y.getStoreId());
storeMessageDO.setStoreName(y.getStoreName());
storeMessageDO.setStoreCode(y.getStoreCode());
storeMessageDO.setMessageTemplateId(x.getId());
storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode());
storeMessageDO.setReadTime(new Date());
storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode());
storeMessageDO.setProcessTime(new Date());
String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(","));
storeMessageDO.setOperatorList(userIdStr);
result.add(storeMessageDO);
});
storeMessageDAO.batchInsert(result);
});
List<String> lockKeys = list.stream()
.map(x -> "msg_template_publish:" + x.getId())
.collect(Collectors.toList());
List<Long> updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList());
messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds,
JSONObject.toJSONString(request.getStoreInfoList()),
JSONObject.toJSONString(request.getUserInfoList()),
userId);
boolean allLocksAcquired = tryAcquireLocks(lockKeys);
if (!allLocksAcquired) {
log.warn("存在正在发布的模板本次操作取消。模板IDs: {}", JSONObject.toJSONString(lockKeys));
throw new ServiceException(ErrorCodeEnum.MESSAGE_PUBLISH);
}
List<MessageTemplateDO> finalList = list;
noticeThreadPool.execute(() -> {
syncPublish(request, userId, finalList, lockKeys);
});
return Boolean.TRUE;
}
public Boolean syncPublish(BatchPublishRequest request, String userId, List<MessageTemplateDO> list,List<String> lockKeys){
try {
List<StoreAreaDTO> storeAreaDTOS = getStoreRange(request.getStoreInfoList());
List<String> storeIds = storeAreaDTOS.stream().map(StoreAreaDTO::getStoreId).collect(Collectors.toList());
Map<String, List<String>> authUser = getAuthUser(request.getUserInfoList(), storeIds);
Thread.sleep(1000*20);
list.stream().forEach(x -> {
List<StoreMessageDO> result = new ArrayList<>();
storeAreaDTOS.forEach(y->{
if (CollectionUtils.isEmpty(authUser.get(y.getStoreId()))){
log.info("当前门店没有人员 门店名称:{}",y.getStoreName());
return;
}
StoreMessageDO storeMessageDO = new StoreMessageDO();
storeMessageDO.setStoreId(y.getStoreId());
storeMessageDO.setStoreName(y.getStoreName());
storeMessageDO.setStoreCode(y.getStoreCode());
storeMessageDO.setMessageTemplateId(x.getId());
storeMessageDO.setReadStatus(ReadStatusEnum.UNREAD.getCode());
storeMessageDO.setReadTime(new Date());
storeMessageDO.setProcessStatus(ProcessStatusEnum.UNTREATED.getCode());
storeMessageDO.setProcessTime(new Date());
String userIdStr = authUser.get(y.getStoreId()).stream().collect(Collectors.joining(","));
storeMessageDO.setOperatorList(userIdStr);
result.add(storeMessageDO);
});
storeMessageDAO.batchInsert(result);
});
//存在第一个成功 第二个失败 会有问题 todo
List<Long> updateIds = list.stream().map(MessageTemplateDO::getId).collect(Collectors.toList());
messageTemplateDAO.batchUpdateStoreInfoAndUserInfo(updateIds,
JSONObject.toJSONString(request.getStoreInfoList()),
JSONObject.toJSONString(request.getUserInfoList()),
userId);
} catch (Exception e) {
log.info("发布流程异常,已取消发布");
} finally {
releaseLocks(lockKeys);
log.info("发布流程结束已释放Redis锁");
}
return Boolean.TRUE;
}
/**
* 尝试获取所有Redis锁
*/
private boolean tryAcquireLocks(List<String> lockKeys) {
boolean allSuccess = true;
List<String> acquiredLocks = new ArrayList<>();
for (String lockKey : lockKeys) {
try {
// 设置锁过期时间设为5分钟防止死锁
Boolean success = redisUtilPool.setNxExpire(lockKey, "locking",1000*60*5);
if (success != null && success) {
acquiredLocks.add(lockKey);
} else {
log.warn("获取Redis锁失败: {}", lockKey);
allSuccess = false;
break;
}
} catch (Exception e) {
log.error("获取Redis锁异常: {}", lockKey, e);
allSuccess = false;
break;
}
}
// 如果任何一个锁获取失败,释放之前已经获取的锁
if (!allSuccess) {
releaseLocks(acquiredLocks);
}
return allSuccess;
}
/**
* 释放Redis锁
*/
private void releaseLocks(List<String> lockKeys) {
if (CollUtil.isEmpty(lockKeys)) {
return;
}
for (String lockKey : lockKeys) {
try {
redisUtilPool.delKey(lockKey);
log.debug("释放Redis锁成功: {}", lockKey);
} catch (Exception e) {
log.error("释放Redis锁异常: {}", lockKey, e);
}
}
}
@Override
@Transactional(rollbackFor = Exception.class)