package com.broker.utils.events.support.rocketmq;

import com.broker.utils.events.TopicTag;
import com.broker.utils.events.support.IMQConsumer;
import com.broker.utils.events.support.MQConfig;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/broker/utils/events/support/rocketmq/RocketMQConsumer.class */
public class RocketMQConsumer implements IMQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQConsumer.class);
    private final MQConfig rocketMQConfig;
    private DefaultMQPushConsumer consumer = null;
    Map<TopicTag, List<BiConsumer<String, String>>> topicConsumers = new HashMap();

    public RocketMQConsumer(MQConfig mQConfig) {
        this.rocketMQConfig = mQConfig;
        init();
    }

    private void init() {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");
        defaultMQPushConsumer.setVipChannelEnabled(false);
        defaultMQPushConsumer.setNamesrvAddr(this.rocketMQConfig.getMqServerUrl());
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.consumer = defaultMQPushConsumer;
        this.consumer.registerMessageListener(new MessageListenerConcurrently() { // from class: com.broker.utils.events.support.rocketmq.RocketMQConsumer.1
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    String topic = messageExt.getTopic();
                    String tags = messageExt.getTags();
                    String str = new String(messageExt.getBody());
                    List<BiConsumer<String, String>> orDefault = RocketMQConsumer.this.topicConsumers.getOrDefault(new TopicTag().setTopic(topic).setTag(tags), new ArrayList());
                    if (!orDefault.isEmpty()) {
                        RocketMQConsumer.log.debug("RocketMQ: 消费消息ID:" + messageExt.getMsgId() + "   TOPIC:" + topic + "   OBJECT:" + tags);
                        orDefault.forEach(biConsumer -> {
                            biConsumer.accept(tags, str);
                        });
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }

    @Override // com.broker.utils.events.support.IMQConsumer
    public void appendSubscribe(String str, String str2, BiConsumer<String, String> biConsumer) {
        List<BiConsumer<String, String>> orDefault = this.topicConsumers.getOrDefault(new TopicTag().setTopic(str).setTag(str2), new ArrayList());
        orDefault.add(biConsumer);
        this.topicConsumers.put(new TopicTag().setTopic(str).setTag(str2), orDefault);
    }

    @Override // com.broker.utils.events.support.IMQConsumer
    public void start(String str) {
        try {
            this.consumer.subscribe(str, "*");
            this.consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    @Override // com.broker.utils.events.support.IMQConsumer
    public void stop() {
        this.consumer.shutdown();
    }
}
