package com.broker.utils.events.support.kafka;

import com.broker.utils.events.TopicTag;
import com.broker.utils.events.support.IMQConsumer;
import com.broker.utils.events.support.MQConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/broker/utils/events/support/kafka/BrokerKafkaConsumer.class */
public class BrokerKafkaConsumer implements IMQConsumer {
    private static final Logger log = LoggerFactory.getLogger(BrokerKafkaConsumer.class);
    private final MQConfig kafkaConfig;
    private ExecutorService singleThreadExecutor;
    private Runnable task;
    private KafkaConsumer<String, String> consumer = null;
    private Map<TopicTag, List<BiConsumer<String, String>>> topicConsumers = new HashMap();
    private boolean taskExit = false;

    public BrokerKafkaConsumer(MQConfig mQConfig) {
        this.kafkaConfig = mQConfig;
        init();
    }

    private void init() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaConfig.getMqServerUrl());
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(properties);
        this.singleThreadExecutor = Executors.newSingleThreadExecutor();
        this.task = () -> {
            while (!this.taskExit) {
                Iterator it = this.consumer.poll(0L).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    String str = consumerRecord.topic();
                    String str2 = (String) consumerRecord.key();
                    String str3 = (String) consumerRecord.value();
                    List<BiConsumer<String, String>> orDefault = this.topicConsumers.getOrDefault(new TopicTag().setTopic(str).setTag(str2), new ArrayList());
                    if (!orDefault.isEmpty()) {
                        log.debug("Kafka: consume message offset:" + consumerRecord.offset() + "   TOPIC:" + str + "   OBJECT:" + str2);
                        orDefault.forEach(biConsumer -> {
                            biConsumer.accept(str2, str3);
                        });
                    }
                }
            }
        };
    }

    @Override // com.broker.utils.events.support.IMQConsumer
    public void appendSubscribe(String str, String str2, BiConsumer<String, String> biConsumer) {
        List<BiConsumer<String, String>> orDefault = this.topicConsumers.getOrDefault(new TopicTag().setTopic(str).setTag(str2), new ArrayList());
        orDefault.add(biConsumer);
        this.topicConsumers.put(new TopicTag().setTopic(str).setTag(str2), orDefault);
    }

    @Override // com.broker.utils.events.support.IMQConsumer
    public void start(String str) {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.consumer.subscribe(Arrays.asList(str), new ConsumerRebalanceListener() { // from class: com.broker.utils.events.support.kafka.BrokerKafkaConsumer.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                BrokerKafkaConsumer.this.consumer.seekToEnd(collection);
                atomicBoolean.set(true);
            }
        });
        if (this.singleThreadExecutor != null) {
            this.singleThreadExecutor.execute(this.task);
        }
        do {
        } while (!atomicBoolean.get());
    }

    @Override // com.broker.utils.events.support.IMQConsumer
    public void stop() {
        this.taskExit = true;
        if (this.singleThreadExecutor != null) {
            this.singleThreadExecutor.shutdown();
        }
        this.consumer.close();
    }
}
