This commit is contained in:
zhangchenbiao
2023-06-01 11:37:03 +08:00
parent 0cabc939fc
commit 09f651ed0d
20 changed files with 314 additions and 212 deletions

View File

@@ -0,0 +1,123 @@
package com.cool.store.mq.consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.cool.store.constants.CommonConstants;
import com.cool.store.enums.RocketMqGroupEnum;
import com.cool.store.mq.RocketMqConfig;
import com.cool.store.mq.consumer.listener.EnterpriseInitListener;
import com.cool.store.mq.consumer.listener.EnterpriseScriptListener;
import com.cool.store.mq.consumer.listener.TestListener;
import com.google.common.collect.Maps;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.Map;
import java.util.Properties;
/**
* @author zhangchenbiao
* @FileName: OrderlyConsumerClient
* @Description: 普通消息消费client
* @date 2021-12-21 11:35
*/
@Configuration
public class ConsumerClient {
@Resource
private RocketMqConfig rocketMqConfig;
@Resource
private EnterpriseInitListener enterpriseInitListener;
@Resource
private EnterpriseScriptListener enterpriseScriptListener;
@Resource
private TestListener testListener;
/**
* 获取通用配置
* @param groupEnum
* @return
*/
private Properties getCommonProperties(RocketMqGroupEnum groupEnum) {
//配置文件
Properties properties = rocketMqConfig.getMqProperties();
//消费者需指定groupId 根据实际情况指定groupId
properties.setProperty(PropertyKeyConst.GROUP_ID, RocketMqGroupEnum.getGroupId(groupEnum));
//消费模式 集群消费
properties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
//消息最大重试次数
properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, CommonConstants.MaxReconsumeTimes);
//开启最大线程数
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, CommonConstants.FIVE_STR);
return properties;
}
/**
* 获取通用订阅关系
* @param groupEnum
* @param listener
* @return
*/
public Map<Subscription, MessageListener> getCommonSubscriptionTable(RocketMqGroupEnum groupEnum, MessageListener listener) {
Map<Subscription, MessageListener> subscriptionTable = Maps.newHashMap();
Subscription subscription = new Subscription();
subscription.setTopic(rocketMqConfig.getTopic());
subscription.setExpression(RocketMqGroupEnum.getTag(groupEnum));
subscriptionTable.put(subscription, listener);
return subscriptionTable;
}
/**
* 测试test
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean test() {
RocketMqGroupEnum groupEnum = RocketMqGroupEnum.TEST;
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = getCommonProperties(groupEnum);
consumerBean.setProperties(properties);
Map<Subscription, MessageListener> commonSubscriptionTable = getCommonSubscriptionTable(groupEnum, testListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(commonSubscriptionTable);
return consumerBean;
}
/**
* 企业开通
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean enterpriseOpenBean() {
RocketMqGroupEnum groupEnum = RocketMqGroupEnum.ENTERPRISE_OPEN_DATA_SYNC;
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = getCommonProperties(groupEnum);
consumerBean.setProperties(properties);
Map<Subscription, MessageListener> commonSubscriptionTable = getCommonSubscriptionTable(groupEnum, testListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(commonSubscriptionTable);
return consumerBean;
}
/**
* 企业库脚本开通
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean enterpriseScriptBean() {
RocketMqGroupEnum groupEnum = RocketMqGroupEnum.ENTERPRISE_OPEN_ENTERPRISE_RUN_SCRIPT;
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = getCommonProperties(groupEnum);
consumerBean.setProperties(properties);
Map<Subscription, MessageListener> commonSubscriptionTable = getCommonSubscriptionTable(groupEnum, enterpriseScriptListener);
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(commonSubscriptionTable);
return consumerBean;
}
}

View File

@@ -1,4 +1,4 @@
package com.cool.store.consumer;
package com.cool.store.mq.consumer.listener;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;

View File

@@ -1,4 +1,4 @@
package com.cool.store.consumer;
package com.cool.store.mq.consumer.listener;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;

View File

@@ -0,0 +1,58 @@
package com.cool.store.mq.consumer.listener;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.cool.store.constants.CommonConstants;
import com.cool.store.dto.enterprise.EnterpriseInitDTO;
import com.cool.store.enums.AppTypeEnum;
import com.cool.store.enums.EnterpriseStatusEnum;
import com.cool.store.service.EnterpriseInitService;
import com.cool.store.utils.RedisUtilPool;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.MessageFormat;
import java.util.Arrays;
/**
* 企业开通初始化
*
* @author chenyupeng
* @since 2022/1/26
*/
@Slf4j
@Service
public class TestListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
if(message.getReconsumeTimes() + 1 >= Integer.parseInt(CommonConstants.MaxReconsumeTimes)){
//超过最大消费次数
return Action.CommitMessage;
}
String text = new String(message.getBody());
log.info("text:{}", text);
if(StringUtils.isBlank(text)){
return Action.CommitMessage;
}
return Action.CommitMessage;
}
}

View File

@@ -1,4 +1,4 @@
package com.cool.store.mq;
package com.cool.store.mq.producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.cool.store.enums.RocketMqTagEnum;

View File

@@ -1,4 +1,4 @@
package com.cool.store.mq;
package com.cool.store.mq.producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;

View File

@@ -1,11 +1,11 @@
package com.cool.store.mq.impl;
package com.cool.store.mq.producer.impl;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.OrderProducerBean;
import com.cool.store.constants.CommonConstants;
import com.cool.store.enums.RocketMqTagEnum;
import com.cool.store.mq.OrderMessageService;
import com.cool.store.mq.producer.OrderMessageService;
import com.cool.store.mq.RocketMqConfig;
import com.cool.store.utils.UUIDUtils;
import lombok.extern.slf4j.Slf4j;

View File

@@ -1,4 +1,4 @@
package com.cool.store.mq.impl;
package com.cool.store.mq.producer.impl;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Message;
@@ -8,7 +8,7 @@ import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.cool.store.constants.CommonConstants;
import com.cool.store.enums.RocketMqTagEnum;
import com.cool.store.mq.RocketMqConfig;
import com.cool.store.mq.SimpleMessageService;
import com.cool.store.mq.producer.SimpleMessageService;
import com.cool.store.utils.UUIDUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

View File

@@ -11,7 +11,7 @@ import com.cool.store.entity.UserRegionMappingDO;
import com.cool.store.enums.*;
import com.cool.store.exception.ServiceException;
import com.cool.store.http.ISVHttpRequest;
import com.cool.store.mq.SimpleMessageService;
import com.cool.store.mq.producer.SimpleMessageService;
import com.cool.store.request.EnterpriseUserRequest;
import com.cool.store.service.EnterpriseInitService;
import com.cool.store.service.EnterpriseUserService;