package org.apache.seata.core.rpc.processor.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.protocol.AbstractMessage;
import org.apache.seata.core.protocol.AbstractResultMessage;
import org.apache.seata.core.protocol.BatchResultMessage;
import org.apache.seata.core.protocol.MergeResultMessage;
import org.apache.seata.core.protocol.MergedWarpMessage;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.Disposable;
import org.apache.seata.core.rpc.RemotingServer;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.TransactionMessageHandler;
import org.apache.seata.core.rpc.netty.ChannelManager;
import org.apache.seata.core.rpc.netty.NettyServerConfig;
import org.apache.seata.core.rpc.processor.RemotingProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/seata-all-2.1.0.jar:org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor.class */
public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
    private final RemotingServer remotingServer;
    private final TransactionMessageHandler transactionMessageHandler;
    private ExecutorService batchResponseExecutorService;
    private final ConcurrentMap<Channel, BlockingQueue<QueueItem>> basketMap = new ConcurrentHashMap();
    protected final Object batchResponseLock = new Object();
    private volatile boolean isResponding = false;
    private static final int MAX_BATCH_RESPONSE_MILLS = 1;
    private static final int MAX_BATCH_RESPONSE_THREAD = 1;
    private static final long KEEP_ALIVE_TIME = 2147483647L;
    private static final String BATCH_RESPONSE_THREAD_PREFIX = "rpcBatchResponse";
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ServerOnRequestProcessor.class);
    private static final boolean PARALLEL_REQUEST_HANDLE = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_PARALLEL_REQUEST_HANDLE_KEY, true);

    /* loaded from: input_file:BOOT-INF/lib/seata-all-2.1.0.jar:org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor$BatchResponseRunnable.class */
    private class BatchResponseRunnable implements Runnable {
        private BatchResponseRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                synchronized (ServerOnRequestProcessor.this.batchResponseLock) {
                    try {
                        ServerOnRequestProcessor.this.batchResponseLock.wait(1L);
                    } catch (InterruptedException e) {
                        ServerOnRequestProcessor.LOGGER.error("BatchResponseRunnable Interrupted error", (Throwable) e);
                    }
                }
                ServerOnRequestProcessor.this.isResponding = true;
                ServerOnRequestProcessor.this.basketMap.forEach((channel, blockingQueue) -> {
                    if (blockingQueue.isEmpty()) {
                        return;
                    }
                    HashMap hashMap = new HashMap();
                    while (!blockingQueue.isEmpty()) {
                        QueueItem queueItem = (QueueItem) blockingQueue.poll();
                        BatchResultMessage batchResultMessage = (BatchResultMessage) CollectionUtils.computeIfAbsent(hashMap, new ClientRequestRpcInfo(queueItem.getRpcMessage()), clientRequestRpcInfo -> {
                            return new BatchResultMessage();
                        });
                        batchResultMessage.getResultMessages().add(queueItem.getResultMessage());
                        batchResultMessage.getMsgIds().add(queueItem.getMsgId());
                    }
                    hashMap.forEach((clientRequestRpcInfo2, batchResultMessage2) -> {
                        ServerOnRequestProcessor.this.remotingServer.sendAsyncResponse(ServerOnRequestProcessor.this.buildRpcMessage(clientRequestRpcInfo2), channel, batchResultMessage2);
                    });
                });
                ServerOnRequestProcessor.this.isResponding = false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/seata-all-2.1.0.jar:org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor$ClientRequestRpcInfo.class */
    public static class ClientRequestRpcInfo {
        private int rpcMessageId;
        private byte codec;
        private byte compressor;
        private Map<String, String> headMap;

        public ClientRequestRpcInfo(RpcMessage rpcMessage) {
            this.rpcMessageId = rpcMessage.getId();
            this.codec = rpcMessage.getCodec();
            this.compressor = rpcMessage.getCompressor();
            this.headMap = rpcMessage.getHeadMap();
        }

        public int getRpcMessageId() {
            return this.rpcMessageId;
        }

        public void setRpcMessageId(int i) {
            this.rpcMessageId = i;
        }

        public byte getCodec() {
            return this.codec;
        }

        public void setCodec(byte b) {
            this.codec = b;
        }

        public byte getCompressor() {
            return this.compressor;
        }

        public void setCompressor(byte b) {
            this.compressor = b;
        }

        public Map<String, String> getHeadMap() {
            return this.headMap;
        }

        public void setHeadMap(Map<String, String> map) {
            this.headMap = map;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ClientRequestRpcInfo clientRequestRpcInfo = (ClientRequestRpcInfo) obj;
            return this.rpcMessageId == clientRequestRpcInfo.rpcMessageId && this.codec == clientRequestRpcInfo.codec && this.compressor == clientRequestRpcInfo.compressor && this.headMap.equals(clientRequestRpcInfo.headMap);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.rpcMessageId), Byte.valueOf(this.codec), Byte.valueOf(this.compressor), this.headMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/seata-all-2.1.0.jar:org/apache/seata/core/rpc/processor/server/ServerOnRequestProcessor$QueueItem.class */
    public static class QueueItem {
        private AbstractResultMessage resultMessage;
        private Integer msgId;
        private RpcMessage rpcMessage;

        public QueueItem(AbstractResultMessage abstractResultMessage, int i, RpcMessage rpcMessage) {
            this.resultMessage = abstractResultMessage;
            this.msgId = Integer.valueOf(i);
            this.rpcMessage = rpcMessage;
        }

        public AbstractResultMessage getResultMessage() {
            return this.resultMessage;
        }

        public void setResultMessage(AbstractResultMessage abstractResultMessage) {
            this.resultMessage = abstractResultMessage;
        }

        public Integer getMsgId() {
            return this.msgId;
        }

        public void setMsgId(Integer num) {
            this.msgId = num;
        }

        public RpcMessage getRpcMessage() {
            return this.rpcMessage;
        }

        public void setRpcMessage(RpcMessage rpcMessage) {
            this.rpcMessage = rpcMessage;
        }
    }

    public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {
        this.remotingServer = remotingServer;
        this.transactionMessageHandler = transactionMessageHandler;
        if (NettyServerConfig.isEnableTcServerBatchSendResponse()) {
            this.batchResponseExecutorService = new ThreadPoolExecutor(1, 1, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new NamedThreadFactory(BATCH_RESPONSE_THREAD_PREFIX, 1));
            this.batchResponseExecutorService.submit(new BatchResponseRunnable());
        }
    }

    @Override // org.apache.seata.core.rpc.processor.RemotingProcessor
    public void process(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(channelHandlerContext.channel())) {
            onRequestMessage(channelHandlerContext, rpcMessage);
            return;
        }
        try {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("closeChannelHandlerContext channel:" + channelHandlerContext.channel());
            }
            channelHandlerContext.disconnect();
            channelHandlerContext.close();
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("close a unhandled connection! [%s]", channelHandlerContext.channel().toString()));
        }
    }

    @Override // org.apache.seata.core.rpc.Disposable
    public void destroy() {
        if (this.batchResponseExecutorService != null) {
            this.batchResponseExecutorService.shutdown();
        }
    }

    private void onRequestMessage(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) {
        Object body = rpcMessage.getBody();
        RpcContext contextFromIdentified = ChannelManager.getContextFromIdentified(channelHandlerContext.channel());
        if (!(body instanceof AbstractMessage)) {
            LOGGER.error("unrecognized message:{}", body);
            return;
        }
        if (!(body instanceof MergedWarpMessage)) {
            AbstractMessage abstractMessage = (AbstractMessage) body;
            if (LOGGER.isInfoEnabled()) {
                BatchLogHandler.INSTANCE.writeLog(String.format("receive msg[single]: %s, clientIp: %s, vgroup: %s", body, NetUtil.toIpAddress(channelHandlerContext.channel().remoteAddress()), contextFromIdentified.getTransactionServiceGroup()));
            }
            AbstractResultMessage onRequest = this.transactionMessageHandler.onRequest(abstractMessage, contextFromIdentified);
            this.remotingServer.sendAsyncResponse(rpcMessage, channelHandlerContext.channel(), onRequest);
            if (LOGGER.isInfoEnabled()) {
                BatchLogHandler.INSTANCE.writeLog(String.format("result msg[single]: %s, clientIp: %s, vgroup: %s", onRequest, NetUtil.toIpAddress(channelHandlerContext.channel().remoteAddress()), contextFromIdentified.getTransactionServiceGroup()));
                return;
            }
            return;
        }
        if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(contextFromIdentified.getVersion()) && Version.isAboveOrEqualVersion150(contextFromIdentified.getVersion())) {
            List<AbstractMessage> list = ((MergedWarpMessage) body).msgs;
            List<Integer> list2 = ((MergedWarpMessage) body).msgIds;
            for (int i = 0; i < list.size(); i++) {
                AbstractMessage abstractMessage2 = list.get(i);
                int intValue = list2.get(i).intValue();
                if (PARALLEL_REQUEST_HANDLE) {
                    CompletableFuture.runAsync(() -> {
                        handleRequestsByMergedWarpMessageBy150(abstractMessage2, intValue, rpcMessage, channelHandlerContext, contextFromIdentified);
                    });
                } else {
                    handleRequestsByMergedWarpMessageBy150(abstractMessage2, intValue, rpcMessage, channelHandlerContext, contextFromIdentified);
                }
            }
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = null;
        for (int i2 = 0; i2 < ((MergedWarpMessage) body).msgs.size(); i2++) {
            if (PARALLEL_REQUEST_HANDLE) {
                if (arrayList2 == null) {
                    arrayList2 = new ArrayList();
                }
                int i3 = i2;
                arrayList2.add(CompletableFuture.supplyAsync(() -> {
                    return handleRequestsByMergedWarpMessage(((MergedWarpMessage) body).msgs.get(i3), contextFromIdentified);
                }));
            } else {
                arrayList.add(i2, handleRequestsByMergedWarpMessage(((MergedWarpMessage) body).msgs.get(i2), contextFromIdentified));
            }
        }
        if (CollectionUtils.isNotEmpty(arrayList2)) {
            try {
                Iterator it = arrayList2.iterator();
                while (it.hasNext()) {
                    arrayList.add(((CompletableFuture) it.next()).get());
                }
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.error("handle request error: {}", e.getMessage(), e);
            }
        }
        MergeResultMessage mergeResultMessage = new MergeResultMessage();
        mergeResultMessage.setMsgs((AbstractResultMessage[]) arrayList.toArray(new AbstractResultMessage[0]));
        this.remotingServer.sendAsyncResponse(rpcMessage, channelHandlerContext.channel(), mergeResultMessage);
    }

    private void notifyBatchRespondingThread() {
        if (this.isResponding) {
            return;
        }
        synchronized (this.batchResponseLock) {
            this.batchResponseLock.notifyAll();
        }
    }

    private BlockingQueue<QueueItem> computeIfAbsentMsgQueue(Channel channel) {
        return (BlockingQueue) CollectionUtils.computeIfAbsent(this.basketMap, channel, channel2 -> {
            return new LinkedBlockingQueue();
        });
    }

    private void offerMsg(BlockingQueue<QueueItem> blockingQueue, RpcMessage rpcMessage, AbstractResultMessage abstractResultMessage, int i, Channel channel) {
        if (blockingQueue.offer(new QueueItem(abstractResultMessage, i, rpcMessage))) {
            return;
        }
        LOGGER.error("put message into basketMap offer failed, channel:{},rpcMessage:{},resultMessage:{}", channel, rpcMessage, abstractResultMessage);
    }

    private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage abstractMessage, RpcContext rpcContext) {
        if (LOGGER.isInfoEnabled()) {
            BatchLogHandler.INSTANCE.writeLog(String.format("receive msg[merged]: %s, clientIp: %s, vgroup: %s", abstractMessage, NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup()));
        }
        AbstractResultMessage onRequest = this.transactionMessageHandler.onRequest(abstractMessage, rpcContext);
        if (LOGGER.isInfoEnabled()) {
            BatchLogHandler.INSTANCE.writeLog(String.format("result msg[merged]: %s, clientIp: %s, vgroup: %s", onRequest, NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup()));
        }
        return onRequest;
    }

    private void handleRequestsByMergedWarpMessageBy150(AbstractMessage abstractMessage, int i, RpcMessage rpcMessage, ChannelHandlerContext channelHandlerContext, RpcContext rpcContext) {
        if (LOGGER.isInfoEnabled()) {
            BatchLogHandler.INSTANCE.writeLog(String.format("receive msg[merged]: %s, clientIp: %s, vgroup: %s", abstractMessage, NetUtil.toIpAddress(channelHandlerContext.channel().remoteAddress()), rpcContext.getTransactionServiceGroup()));
        }
        AbstractResultMessage onRequest = this.transactionMessageHandler.onRequest(abstractMessage, rpcContext);
        offerMsg(computeIfAbsentMsgQueue(channelHandlerContext.channel()), rpcMessage, onRequest, i, channelHandlerContext.channel());
        notifyBatchRespondingThread();
        if (LOGGER.isInfoEnabled()) {
            BatchLogHandler.INSTANCE.writeLog(String.format("result msg[merged]: %s, clientIp: %s, vgroup: %s", onRequest, NetUtil.toIpAddress(channelHandlerContext.channel().remoteAddress()), rpcContext.getTransactionServiceGroup()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(clientRequestRpcInfo.getRpcMessageId());
        rpcMessage.setCodec(clientRequestRpcInfo.getCodec());
        rpcMessage.setCompressor(clientRequestRpcInfo.getCompressor());
        rpcMessage.setHeadMap(clientRequestRpcInfo.getHeadMap());
        return rpcMessage;
    }
}
