package com.geoway.cloudquery.activemq;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/geoway/cloudquery/activemq/AmqConsumerBase.class */
public class AmqConsumerBase implements IAmqConsumer, Runnable {
    public Connection _connection;
    public Session _session;
    public Destination _destination;
    public MessageConsumer _consumer;
    public AmqListener _listener;
    public AmqParam _param;
    protected Logger _logger = LoggerFactory.getLogger(getClass());
    private ExecutorService _threadPool = null;
    LinkedBlockingQueue<Runnable> queue = null;
    ThreadPoolExecutor executor = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/geoway/cloudquery/activemq/AmqConsumerBase$AmqMessageListener.class */
    public class AmqMessageListener implements MessageListener {
        AmqConsumerBase _consumerBase;

        public AmqMessageListener(AmqConsumerBase amqConsumerBase) {
            this._consumerBase = amqConsumerBase;
        }

        public void onMessage(Message message) {
            try {
                HandleMessageBase handleMessageBase = new HandleMessageBase(new AmqMessageObject(message), this._consumerBase);
                if (AmqConsumerBase.this._param.is_singleThread()) {
                    handleMessageBase.run();
                } else {
                    AmqConsumerBase.this._threadPool.execute(handleMessageBase);
                }
            } catch (Exception e) {
                AmqConsumerBase.this._logger.error(e.getMessage(), e);
                e.printStackTrace();
            }
        }
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public Connection getConnection() {
        return this._connection;
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public Destination getDestination() {
        return this._destination;
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public MessageConsumer getConsumer() {
        return this._consumer;
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public Session getSession() {
        return this._session;
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public AmqParam getParam() {
        return this._param;
    }

    public AmqListener get_listener() {
        return this._listener;
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public boolean init() {
        return true;
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public void addListener(AmqListener amqListener) {
        this._listener = amqListener;
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public boolean startReceiveMsg() {
        try {
            switch (this._param.get_pushPull()) {
                case Push:
                    this._threadPool = Executors.newFixedThreadPool(this._param.get_threadCount());
                    push();
                    return true;
                case Pull:
                    pull();
                    return true;
                default:
                    return true;
            }
        } catch (Exception e) {
            this._logger.error(e.getMessage(), e);
            e.printStackTrace();
            return false;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        startReceiveMsg();
    }

    private void push() throws JMSException {
        this._consumer.setMessageListener(new AmqMessageListener(this));
    }

    private void pull() {
        if (!this._param.is_singleThread() && this.queue == null) {
            this.queue = new LinkedBlockingQueue<>(this._param.get_threadCount());
            this.executor = new ThreadPoolExecutor(this._param.get_threadCount(), this._param.get_threadCount(), 0L, TimeUnit.SECONDS, this.queue);
        }
        while (true) {
            try {
                if (this._param.is_singleThread()) {
                    new HandleMessageBase(new AmqMessageObject(this._consumer.receive()), this).run();
                } else {
                    int activeCount = this.executor.getActiveCount();
                    if (activeCount < this._param.get_threadCount()) {
                        this._logger.info("total thread count:" + this._param.get_threadCount() + " running thread count:" + activeCount);
                        try {
                            long currentTimeMillis = System.currentTimeMillis();
                            Message receive = this._consumer.receive();
                            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                            if (currentTimeMillis2 > 300) {
                                this._logger.warn("msg recv " + currentTimeMillis2 + " ms");
                            }
                            this.executor.execute(new HandleMessageBase(new AmqMessageObject(receive), this));
                        } catch (JMSException e) {
                            this._logger.error("", e);
                        }
                    } else {
                        this._logger.warn("total thread count:" + this._param.get_threadCount() + " running thread count:" + activeCount + ",当前消费者已满负荷运行");
                        Thread.sleep(500L);
                    }
                }
            } catch (Exception e2) {
                this._logger.error("", e2);
            }
        }
    }

    @Override // com.geoway.cloudquery.activemq.IAmqConsumer
    public boolean close() {
        try {
            if (this._consumer != null) {
                this._consumer.close();
            }
            if (this._session != null) {
                this._session.close();
            }
            if (this._connection == null) {
                return true;
            }
            this._connection.close();
            return true;
        } catch (Exception e) {
            this._logger.error(e.getMessage(), e);
            e.printStackTrace();
            return false;
        }
    }
}
