package com.geoway.ns.sys.config.RocketMQ;

import com.geoway.ns.sys.service.impl.RocketMqMessageWrapperImpl;
import org.apache.commons.lang.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
/* loaded from: input_file:com/geoway/ns/sys/config/RocketMQ/ConsumerConfiguration.class */
public class ConsumerConfiguration {
    private final Logger logger = LoggerFactory.getLogger(ConsumerConfiguration.class);

    @Autowired
    private RocketMqConfig rocketMqConfig;

    @Autowired
    private RocketMqMessageWrapperImpl rocketMqMessageWrapperImpl;

    @Value("${rocketmq.enabled}")
    private boolean enabled;

    @Bean
    public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
        if (!this.enabled) {
            return new DefaultMQPushConsumer();
        }
        System.out.println("defaultConsumer 正在创建---------------------------------------");
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(this.rocketMqConfig.getConsumerGroup(), getAclRPCHook(), new AllocateMessageQueueAveragely());
        defaultMQPushConsumer.setNamesrvAddr(this.rocketMqConfig.getNameserverAddr());
        defaultMQPushConsumer.setConsumeThreadMin(this.rocketMqConfig.getConsumerThreadMin().intValue());
        defaultMQPushConsumer.setConsumeThreadMax(this.rocketMqConfig.getConsumerThreadMax().intValue());
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(this.rocketMqConfig.getConsumeMessageBatchMaxSize().intValue());
        defaultMQPushConsumer.registerMessageListener(this.rocketMqMessageWrapperImpl);
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        try {
            for (String str : this.rocketMqConfig.getConsumerTopics().split(";")) {
                String[] split = str.split("~");
                if (split.length == 2) {
                    defaultMQPushConsumer.subscribe(split[0], split[1]);
                } else if (split.length == 1) {
                    defaultMQPushConsumer.subscribe(split[0], "*");
                }
            }
            defaultMQPushConsumer.start();
            System.out.println(String.format("consumer 创建成功 groupName=%s, topics=%s, namesrvAddr=%s", this.rocketMqConfig.getConsumerGroup(), this.rocketMqConfig.getConsumerTopics(), this.rocketMqConfig.getNameserverAddr()));
        } catch (MQClientException e) {
            System.out.println("consumer 创建失败!");
        }
        return defaultMQPushConsumer;
    }

    private RPCHook getAclRPCHook() {
        if (StringUtils.isBlank(this.rocketMqConfig.getAccessKey()) || StringUtils.isBlank(this.rocketMqConfig.getSecretKey())) {
            return null;
        }
        return new AclClientRPCHook(new SessionCredentials(this.rocketMqConfig.getAccessKey(), this.rocketMqConfig.getSecretKey()));
    }
}
