package org.apache.hadoop.hbase.procedure2;

import java.io.IOException;
import java.lang.Comparable;
import java.lang.Thread;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.class */
public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
    private static final Logger LOG;
    public static final String THREAD_POOL_SIZE_CONF_KEY = "hbase.procedure.remote.dispatcher.threadpool.size";
    private static final int DEFAULT_THREAD_POOL_SIZE = 128;
    public static final String DISPATCH_DELAY_CONF_KEY = "hbase.procedure.remote.dispatcher.delay.msec";
    private static final int DEFAULT_DISPATCH_DELAY = 150;
    public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = "hbase.procedure.remote.dispatcher.max.queue.size";
    private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final ConcurrentHashMap<TRemote, RemoteProcedureDispatcher<TEnv, TRemote>.BufferNode> nodeMap = new ConcurrentHashMap<>();
    private final int operationDelay;
    private final int queueMaxSize;
    private final int corePoolSize;
    private RemoteProcedureDispatcher<TEnv, TRemote>.TimeoutExecutorThread timeoutExecutor;
    private ThreadPoolExecutor threadPool;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher$BufferNode.class */
    protected final class BufferNode extends DelayedUtil.DelayedContainerWithTimestamp<TRemote> implements RemoteNode<TEnv, TRemote> {
        private Set<RemoteProcedure> operations;
        private final Set<RemoteProcedure> dispatchedOperations;

        protected BufferNode(TRemote tremote) {
            super(tremote, 0L);
            this.dispatchedOperations = new HashSet();
        }

        @Override // org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteNode
        public TRemote getKey() {
            return (TRemote) getObject();
        }

        @Override // org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteNode
        public synchronized void add(RemoteProcedure remoteProcedure) {
            if (this.operations == null) {
                this.operations = new HashSet();
                setTimeout(EnvironmentEdgeManager.currentTime() + RemoteProcedureDispatcher.this.operationDelay);
                RemoteProcedureDispatcher.this.timeoutExecutor.add(this);
            }
            this.operations.add(remoteProcedure);
            if (this.operations.size() > RemoteProcedureDispatcher.this.queueMaxSize) {
                RemoteProcedureDispatcher.this.timeoutExecutor.remove(this);
                dispatch();
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteNode
        public synchronized void dispatch() {
            if (this.operations != null) {
                RemoteProcedureDispatcher.this.remoteDispatch(getKey(), this.operations);
                this.operations.stream().filter(remoteProcedure -> {
                    return remoteProcedure.storeInDispatchedQueue();
                }).forEach(remoteProcedure2 -> {
                    this.dispatchedOperations.add(remoteProcedure2);
                });
                this.operations = null;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public synchronized void abortOperationsInQueue() {
            if (this.operations != null) {
                RemoteProcedureDispatcher.this.abortPendingOperations(getKey(), this.operations);
                this.operations = null;
            }
            RemoteProcedureDispatcher.this.abortPendingOperations(getKey(), this.dispatchedOperations);
            this.dispatchedOperations.clear();
        }

        public synchronized void operationCompleted(RemoteProcedure remoteProcedure) {
            this.dispatchedOperations.remove(remoteProcedure);
        }

        @Override // org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainer, org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedObject
        public String toString() {
            return super.toString() + ", operations=" + this.operations;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher$DelayedTask.class */
    private static final class DelayedTask extends DelayedUtil.DelayedContainerWithTimestamp<Runnable> {
        public DelayedTask(Runnable runnable, long j, TimeUnit timeUnit) {
            super(runnable, EnvironmentEdgeManager.currentTime() + timeUnit.toMillis(j));
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher$RemoteNode.class */
    public interface RemoteNode<TEnv, TRemote> {
        TRemote getKey();

        void add(RemoteProcedure<TEnv, TRemote> remoteProcedure);

        void dispatch();
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher$RemoteOperation.class */
    public static abstract class RemoteOperation {
        private final RemoteProcedure remoteProcedure;

        protected RemoteOperation(RemoteProcedure remoteProcedure) {
            this.remoteProcedure = remoteProcedure;
        }

        public RemoteProcedure getRemoteProcedure() {
            return this.remoteProcedure;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher$RemoteProcedure.class */
    public interface RemoteProcedure<TEnv, TRemote> {
        Optional<RemoteOperation> remoteCallBuild(TEnv tenv, TRemote tremote);

        void remoteCallFailed(TEnv tenv, TRemote tremote, IOException iOException);

        void remoteOperationCompleted(TEnv tenv);

        void remoteOperationFailed(TEnv tenv, RemoteProcedureException remoteProcedureException);

        default boolean storeInDispatchedQueue() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher$TimeoutExecutorThread.class */
    public final class TimeoutExecutorThread extends Thread {
        private final DelayQueue<DelayedUtil.DelayedWithTimeout> queue;

        public TimeoutExecutorThread() {
            super("ProcedureDispatcherTimeoutThread");
            this.queue = new DelayQueue<>();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (RemoteProcedureDispatcher.this.running.get()) {
                DelayedUtil.DelayedWithTimeout delayedWithTimeout = (DelayedUtil.DelayedWithTimeout) DelayedUtil.takeWithoutInterrupt(this.queue);
                if (delayedWithTimeout != null && delayedWithTimeout != DelayedUtil.DELAYED_POISON) {
                    if (delayedWithTimeout instanceof DelayedTask) {
                        RemoteProcedureDispatcher.this.threadPool.execute(((DelayedTask) delayedWithTimeout).getObject());
                    } else {
                        ((BufferNode) delayedWithTimeout).dispatch();
                    }
                }
            }
        }

        public void add(DelayedUtil.DelayedWithTimeout delayedWithTimeout) {
            this.queue.add((DelayQueue<DelayedUtil.DelayedWithTimeout>) delayedWithTimeout);
        }

        public void remove(DelayedUtil.DelayedWithTimeout delayedWithTimeout) {
            this.queue.remove(delayedWithTimeout);
        }

        public void sendStopSignal() {
            this.queue.add((DelayQueue<DelayedUtil.DelayedWithTimeout>) DelayedUtil.DELAYED_POISON);
        }

        public void awaitTermination() {
            try {
                long currentTime = EnvironmentEdgeManager.currentTime();
                int i = 0;
                while (isAlive()) {
                    sendStopSignal();
                    join(250L);
                    if (i > 0 && i % 8 == 0) {
                        RemoteProcedureDispatcher.LOG.warn("Waiting termination of thread " + getName() + ", " + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - currentTime));
                    }
                    i++;
                }
            } catch (InterruptedException e) {
                RemoteProcedureDispatcher.LOG.warn(getName() + " join wait got interrupted", e);
            }
        }
    }

    protected RemoteProcedureDispatcher(Configuration configuration) {
        this.corePoolSize = configuration.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
        this.operationDelay = configuration.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
        this.queueMaxSize = configuration.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
    }

    public boolean start() {
        if (this.running.getAndSet(true)) {
            LOG.warn("Already running");
            return false;
        }
        LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, operationDelay={}", new Object[]{Integer.valueOf(this.corePoolSize), Integer.valueOf(this.queueMaxSize), Integer.valueOf(this.operationDelay)});
        this.timeoutExecutor = new TimeoutExecutorThread();
        this.timeoutExecutor.start();
        this.threadPool = Threads.getBoundedCachedThreadPool(this.corePoolSize, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory(getClass().getSimpleName(), getUncaughtExceptionHandler()));
        return true;
    }

    protected void setTimeoutExecutorUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.timeoutExecutor.setUncaughtExceptionHandler(uncaughtExceptionHandler);
    }

    public boolean stop() {
        if (!this.running.getAndSet(false)) {
            return false;
        }
        LOG.info("Stopping procedure remote dispatcher");
        this.timeoutExecutor.sendStopSignal();
        this.threadPool.shutdownNow();
        return true;
    }

    public void join() {
        if (!$assertionsDisabled && this.running.get()) {
            throw new AssertionError("expected not running");
        }
        this.timeoutExecutor.awaitTermination();
        this.timeoutExecutor = null;
        this.threadPool.shutdownNow();
        while (!this.threadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
            try {
                LOG.warn("Waiting for thread-pool to terminate");
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for thread-pool termination", e);
                return;
            }
        }
    }

    protected abstract Thread.UncaughtExceptionHandler getUncaughtExceptionHandler();

    public void addNode(TRemote tremote) {
        if (!$assertionsDisabled && tremote == null) {
            throw new AssertionError("Tried to add a node with a null key");
        }
        this.nodeMap.putIfAbsent(tremote, new BufferNode(tremote));
    }

    public void addOperationToNode(TRemote tremote, RemoteProcedure remoteProcedure) throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
        if (tremote == null) {
            throw new NullTargetServerDispatchException(remoteProcedure.toString());
        }
        RemoteProcedureDispatcher<TEnv, TRemote>.BufferNode bufferNode = this.nodeMap.get(tremote);
        if (bufferNode == null) {
            throw new NoServerDispatchException(tremote.toString() + "; " + remoteProcedure.toString());
        }
        bufferNode.add(remoteProcedure);
        if (!this.nodeMap.containsValue(bufferNode)) {
            throw new NoNodeDispatchException(tremote.toString() + "; " + remoteProcedure.toString());
        }
    }

    public void removeCompletedOperation(TRemote tremote, RemoteProcedure remoteProcedure) {
        RemoteProcedureDispatcher<TEnv, TRemote>.BufferNode bufferNode = this.nodeMap.get(tremote);
        if (bufferNode == null) {
            LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", tremote);
        } else {
            bufferNode.operationCompleted(remoteProcedure);
        }
    }

    public boolean removeNode(TRemote tremote) {
        RemoteProcedureDispatcher<TEnv, TRemote>.BufferNode remove = this.nodeMap.remove(tremote);
        if (remove == null) {
            return false;
        }
        remove.abortOperationsInQueue();
        return true;
    }

    protected final void submitTask(Runnable runnable) {
        this.threadPool.execute(runnable);
    }

    protected final void submitTask(Runnable runnable, long j, TimeUnit timeUnit) {
        this.timeoutExecutor.add(new DelayedTask(runnable, j, timeUnit));
    }

    protected abstract void remoteDispatch(TRemote tremote, Set<RemoteProcedure> set);

    protected abstract void abortPendingOperations(TRemote tremote, Set<RemoteProcedure> set);

    protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(TEnv tenv, TRemote tremote, Set<RemoteProcedure> set) {
        ArrayListMultimap<Class<?>, RemoteOperation> create = ArrayListMultimap.create();
        Iterator<RemoteProcedure> it = set.iterator();
        while (it.hasNext()) {
            it.next().remoteCallBuild(tenv, tremote).ifPresent(remoteOperation -> {
                create.put(remoteOperation.getClass(), remoteOperation);
            });
        }
        return create;
    }

    protected <T extends RemoteOperation> List<T> fetchType(ArrayListMultimap<Class<?>, RemoteOperation> arrayListMultimap, Class<T> cls) {
        return arrayListMultimap.removeAll(cls);
    }

    static {
        $assertionsDisabled = !RemoteProcedureDispatcher.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
    }
}
