package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/XceiverClientRatis.class */
public final class XceiverClientRatis extends XceiverClientSpi {
    public static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
    private final Pipeline pipeline;
    private final RpcType rpcType;
    private final RetryPolicy retryPolicy;
    private final GrpcTlsConfig tlsConfig;
    private final ConfigurationSource ozoneConfiguration;
    private final AtomicReference<RaftClient> client = new AtomicReference<>();
    private final XceiverClientMetrics metrics = XceiverClientManager.getXceiverClientMetrics();
    private final ConcurrentHashMap<UUID, Long> commitInfoMap = new ConcurrentHashMap<>();

    public static XceiverClientRatis newXceiverClientRatis(Pipeline pipeline, ConfigurationSource configurationSource) {
        return newXceiverClientRatis(pipeline, configurationSource, null);
    }

    public static XceiverClientRatis newXceiverClientRatis(Pipeline pipeline, ConfigurationSource configurationSource, ClientTrustManager clientTrustManager) {
        String str = configurationSource.get("dfs.container.ratis.rpc.type", "GRPC");
        return new XceiverClientRatis(pipeline, SupportedRpcType.valueOfIgnoreCase(str), RatisHelper.createRetryPolicy(configurationSource), RatisHelper.createTlsClientConfig(new SecurityConfig(configurationSource), clientTrustManager), configurationSource);
    }

    private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, RetryPolicy retryPolicy, GrpcTlsConfig grpcTlsConfig, ConfigurationSource configurationSource) {
        this.pipeline = pipeline;
        this.rpcType = rpcType;
        this.retryPolicy = retryPolicy;
        this.tlsConfig = grpcTlsConfig;
        this.ozoneConfiguration = configurationSource;
        if (LOG.isTraceEnabled()) {
            LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(), new Throwable("TRACE"));
        }
    }

    private long updateCommitInfosMap(RaftClientReply raftClientReply) {
        return ((Long) Optional.ofNullable(raftClientReply).filter((v0) -> {
            return v0.isSuccess();
        }).map((v0) -> {
            return v0.getCommitInfos();
        }).map(this::updateCommitInfosMap).orElse(0L)).longValue();
    }

    public long updateCommitInfosMap(Collection<RaftProtos.CommitInfoProto> collection) {
        return (this.commitInfoMap.isEmpty() ? collection.stream().map(this::putCommitInfo) : collection.stream().map(commitInfoProto -> {
            return this.commitInfoMap.computeIfPresent(RatisHelper.toDatanodeId(commitInfoProto.getServer()), (uuid, l) -> {
                return Long.valueOf(commitInfoProto.getCommitIndex());
            });
        })).mapToLong((v0) -> {
            return v0.longValue();
        }).min().orElse(0L);
    }

    private long putCommitInfo(RaftProtos.CommitInfoProto commitInfoProto) {
        long commitIndex = commitInfoProto.getCommitIndex();
        this.commitInfoMap.put(RatisHelper.toDatanodeId(commitInfoProto.getServer()), Long.valueOf(commitIndex));
        return commitIndex;
    }

    public HddsProtos.ReplicationType getPipelineType() {
        return HddsProtos.ReplicationType.RATIS;
    }

    public Pipeline getPipeline() {
        return this.pipeline;
    }

    public void connect() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Connecting to pipeline:{} leaderDatanode:{}, primaryDatanode:{}", new Object[]{getPipeline().getId(), RatisHelper.toRaftPeerId(this.pipeline.getLeaderNode()), RatisHelper.toRaftPeerId(this.pipeline.getClosestNode())});
        }
        if (!this.client.compareAndSet(null, RatisHelper.newRaftClient(this.rpcType, getPipeline(), this.retryPolicy, this.tlsConfig, this.ozoneConfiguration))) {
            throw new IllegalStateException("Client is already connected.");
        }
    }

    public void close() {
        RaftClient andSet = this.client.getAndSet(null);
        if (andSet != null) {
            closeRaftClient(andSet);
        }
    }

    private void closeRaftClient(RaftClient raftClient) {
        try {
            raftClient.close();
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    private RaftClient getClient() {
        return (RaftClient) Objects.requireNonNull(this.client.get(), "client is null");
    }

    @VisibleForTesting
    public ConcurrentMap<UUID, Long> getCommitInfoMap() {
        return this.commitInfoMap;
    }

    private CompletableFuture<RaftClientReply> sendRequestAsync(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) {
        return (CompletableFuture) TracingUtil.executeInNewSpan("XceiverClientRatis." + containerCommandRequestProto.getCmdType().name(), () -> {
            ContainerCommandRequestMessage message = ContainerCommandRequestMessage.toMessage(containerCommandRequestProto, TracingUtil.exportCurrentSpan());
            if (HddsUtils.isReadOnly(containerCommandRequestProto)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("sendCommandAsync ReadOnly {}", message);
                }
                return getClient().async().sendReadOnly(message);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("sendCommandAsync {}", message);
            }
            return getClient().async().send(message);
        });
    }

    public long getReplicatedMinCommitIndex() {
        return this.commitInfoMap.values().parallelStream().mapToLong((v0) -> {
            return v0.longValue();
        }).min().orElse(0L);
    }

    private void addDatanodetoReply(UUID uuid, XceiverClientReply xceiverClientReply) {
        DatanodeDetails.Builder newBuilder = DatanodeDetails.newBuilder();
        newBuilder.setUuid(uuid);
        xceiverClientReply.addDatanode(newBuilder.build());
    }

    private XceiverClientReply newWatchReply(long j, Object obj, long j2) {
        LOG.debug("watchForCommit({}) returns {} {}", new Object[]{Long.valueOf(j), obj, Long.valueOf(j2)});
        XceiverClientReply xceiverClientReply = new XceiverClientReply((CompletableFuture) null);
        xceiverClientReply.setLogIndex(j2);
        return xceiverClientReply;
    }

    public XceiverClientReply watchForCommit(long j) throws InterruptedException, ExecutionException, TimeoutException, IOException {
        long replicatedMinCommitIndex = getReplicatedMinCommitIndex();
        if (replicatedMinCommitIndex >= j) {
            return newWatchReply(j, "replicatedMin", replicatedMinCommitIndex);
        }
        try {
            long updateCommitInfosMap = updateCommitInfosMap((RaftClientReply) getClient().async().watch(j, RaftProtos.ReplicationLevel.ALL_COMMITTED).get());
            Preconditions.checkState(updateCommitInfosMap >= j);
            return newWatchReply(j, RaftProtos.ReplicationLevel.ALL_COMMITTED, updateCommitInfosMap);
        } catch (Exception e) {
            LOG.warn("3 way commit failed on pipeline {}", this.pipeline, e);
            if (HddsClientUtils.containsException(e, GroupMismatchException.class) != null) {
                throw e;
            }
            RaftClientReply raftClientReply = (RaftClientReply) getClient().async().watch(j, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED).get();
            XceiverClientReply newWatchReply = newWatchReply(j, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED, j);
            raftClientReply.getCommitInfos().stream().filter(commitInfoProto -> {
                return commitInfoProto.getCommitIndex() < j;
            }).forEach(commitInfoProto2 -> {
                UUID datanodeId = RatisHelper.toDatanodeId(commitInfoProto2.getServer());
                addDatanodetoReply(datanodeId, newWatchReply);
                this.commitInfoMap.remove(datanodeId);
                LOG.info("Could not commit index {} on pipeline {} to all the nodes. Server {} has failed. Committed by majority.", new Object[]{Long.valueOf(j), this.pipeline, datanodeId});
            });
            return newWatchReply;
        }
    }

    public XceiverClientReply sendCommandAsync(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) {
        XceiverClientReply xceiverClientReply = new XceiverClientReply((CompletableFuture) null);
        long currentTimeMillis = System.currentTimeMillis();
        CompletableFuture<RaftClientReply> sendRequestAsync = sendRequestAsync(containerCommandRequestProto);
        this.metrics.incrPendingContainerOpsMetrics(containerCommandRequestProto.getCmdType());
        xceiverClientReply.setResponse(sendRequestAsync.whenComplete((raftClientReply, th) -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("received reply {} for request: cmdType={} containerID={} pipelineID={} traceID={} exception: {}", new Object[]{raftClientReply, containerCommandRequestProto.getCmdType(), Long.valueOf(containerCommandRequestProto.getContainerID()), containerCommandRequestProto.getPipelineID(), containerCommandRequestProto.getTraceID(), th});
            }
            this.metrics.decrPendingContainerOpsMetrics(containerCommandRequestProto.getCmdType());
            this.metrics.addContainerOpsLatency(containerCommandRequestProto.getCmdType(), System.currentTimeMillis() - currentTimeMillis);
        }).thenApply(raftClientReply2 -> {
            try {
                if (!raftClientReply2.isSuccess()) {
                    RaftException exception = raftClientReply2.getException();
                    Preconditions.checkNotNull(exception, "Raft reply failure but no exception propagated.");
                    throw new CompletionException((Throwable) exception);
                }
                ContainerProtos.ContainerCommandResponseProto parseFrom = ContainerProtos.ContainerCommandResponseProto.parseFrom(raftClientReply2.getMessage().getContent());
                UUID datanodeId = RatisHelper.toDatanodeId(raftClientReply2.getReplierId());
                if (parseFrom.getResult() == ContainerProtos.Result.SUCCESS) {
                    updateCommitInfosMap(raftClientReply2.getCommitInfos());
                }
                xceiverClientReply.setLogIndex(raftClientReply2.getLogIndex());
                addDatanodetoReply(datanodeId, xceiverClientReply);
                return parseFrom;
            } catch (InvalidProtocolBufferException e) {
                throw new CompletionException((Throwable) e);
            }
        }));
        return xceiverClientReply;
    }

    public Map<DatanodeDetails, ContainerProtos.ContainerCommandResponseProto> sendCommandOnAllNodes(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) {
        throw new UnsupportedOperationException("Operation Not supported for ratis client");
    }

    public DataStreamApi getDataStreamApi() {
        return getClient().getDataStreamApi();
    }
}
