package org.apache.flink.connector.base.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.function.Consumer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.BasicRequestInfo;
import org.apache.flink.connector.base.sink.writer.strategy.BasicResultInfo;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.class */
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
    private final MailboxExecutor mailboxExecutor;
    private final ProcessingTimeService timeService;
    private long lastSendTimestamp;
    private long ackTime;
    private final SinkWriterMetricGroup metrics;
    private final Counter numBytesOutCounter;
    private final Counter numRecordsOutCounter;
    private final RateLimitingStrategy rateLimitingStrategy;
    private final int maxBatchSize;
    private final int maxBufferedRequests;
    private final long maxBatchSizeInBytes;
    private final long maxTimeInBufferMS;
    private final long maxRecordSizeInBytes;
    private final ElementConverter<InputT, RequestEntryT> elementConverter;
    private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries;
    private int inFlightRequestsCount;
    private double bufferedRequestEntriesTotalSizeInBytes;
    private boolean existsActiveTimerCallback;
    private final Consumer<Exception> fatalExceptionCons;

    protected abstract void submitRequestEntries(List<RequestEntryT> list, Consumer<List<RequestEntryT>> consumer);

    protected abstract long getSizeInBytes(RequestEntryT requestentryt);

    @Deprecated
    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3) {
        this(elementConverter, initContext, i, i2, i3, j, j2, j3, Collections.emptyList());
    }

    @Deprecated
    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext initContext, int i, int i2, int i3, long j, long j2, long j3, Collection<BufferedRequestState<RequestEntryT>> collection) {
        this(elementConverter, initContext, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(i).setMaxBatchSizeInBytes(j).setMaxInFlightRequests(i2).setMaxBufferedRequests(i3).setMaxTimeInBufferMS(j2).setMaxRecordSizeInBytes(j3).build(), collection);
    }

    @Deprecated
    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext initContext, AsyncSinkWriterConfiguration asyncSinkWriterConfiguration, Collection<BufferedRequestState<RequestEntryT>> collection) {
        this.lastSendTimestamp = 0L;
        this.ackTime = Long.MAX_VALUE;
        this.bufferedRequestEntries = new ArrayDeque();
        this.existsActiveTimerCallback = false;
        this.elementConverter = elementConverter;
        this.mailboxExecutor = initContext.getMailboxExecutor();
        this.timeService = initContext.getProcessingTimeService();
        Preconditions.checkNotNull(elementConverter);
        Preconditions.checkArgument(asyncSinkWriterConfiguration.getMaxBatchSize() > 0);
        Preconditions.checkArgument(asyncSinkWriterConfiguration.getMaxBufferedRequests() > 0);
        Preconditions.checkArgument(asyncSinkWriterConfiguration.getMaxBatchSizeInBytes() > 0);
        Preconditions.checkArgument(asyncSinkWriterConfiguration.getMaxTimeInBufferMS() > 0);
        Preconditions.checkArgument(asyncSinkWriterConfiguration.getMaxRecordSizeInBytes() > 0);
        Preconditions.checkArgument(asyncSinkWriterConfiguration.getMaxBufferedRequests() > asyncSinkWriterConfiguration.getMaxBatchSize(), "The maximum number of requests that may be buffered should be strictly greater than the maximum number of requests per batch.");
        Preconditions.checkArgument(asyncSinkWriterConfiguration.getMaxBatchSizeInBytes() >= asyncSinkWriterConfiguration.getMaxRecordSizeInBytes(), "The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.");
        Preconditions.checkNotNull(asyncSinkWriterConfiguration.getRateLimitingStrategy());
        this.maxBatchSize = asyncSinkWriterConfiguration.getMaxBatchSize();
        this.maxBufferedRequests = asyncSinkWriterConfiguration.getMaxBufferedRequests();
        this.maxBatchSizeInBytes = asyncSinkWriterConfiguration.getMaxBatchSizeInBytes();
        this.maxTimeInBufferMS = asyncSinkWriterConfiguration.getMaxTimeInBufferMS();
        this.maxRecordSizeInBytes = asyncSinkWriterConfiguration.getMaxRecordSizeInBytes();
        this.rateLimitingStrategy = asyncSinkWriterConfiguration.getRateLimitingStrategy();
        this.inFlightRequestsCount = 0;
        this.bufferedRequestEntriesTotalSizeInBytes = 0.0d;
        this.metrics = initContext.metricGroup();
        this.metrics.setCurrentSendTimeGauge(() -> {
            return Long.valueOf(this.ackTime - this.lastSendTimestamp);
        });
        this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
        this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
        this.fatalExceptionCons = exc -> {
            this.mailboxExecutor.execute(() -> {
                throw exc;
            }, "A fatal exception occurred in the sink that cannot be recovered from or should not be retried.");
        };
        elementConverter.open(initContext);
        initializeState(collection);
    }

    public AsyncSinkWriter(ElementConverter<InputT, RequestEntryT> elementConverter, WriterInitContext writerInitContext, AsyncSinkWriterConfiguration asyncSinkWriterConfiguration, Collection<BufferedRequestState<RequestEntryT>> collection) {
        this((ElementConverter) elementConverter, (Sink.InitContext) new Sink.InitContextWrapper(writerInitContext), asyncSinkWriterConfiguration, (Collection) collection);
    }

    private void registerCallback() {
        this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + this.maxTimeInBufferMS, j -> {
            this.existsActiveTimerCallback = false;
            while (!this.bufferedRequestEntries.isEmpty()) {
                flush();
            }
        });
        this.existsActiveTimerCallback = true;
    }

    public void write(InputT inputt, SinkWriter.Context context) throws IOException, InterruptedException {
        while (this.bufferedRequestEntries.size() >= this.maxBufferedRequests) {
            flush();
        }
        addEntryToBuffer((AsyncSinkWriter<InputT, RequestEntryT>) this.elementConverter.apply(inputt, context), false);
        nonBlockingFlush();
    }

    private void nonBlockingFlush() throws InterruptedException {
        while (!this.rateLimitingStrategy.shouldBlock(createRequestInfo())) {
            if (this.bufferedRequestEntries.size() < getNextBatchSizeLimit() && this.bufferedRequestEntriesTotalSizeInBytes < this.maxBatchSizeInBytes) {
                return;
            } else {
                flush();
            }
        }
    }

    private BasicRequestInfo createRequestInfo() {
        return new BasicRequestInfo(getNextBatchSize());
    }

    private void flush() throws InterruptedException {
        BasicRequestInfo basicRequestInfo;
        BasicRequestInfo createRequestInfo = createRequestInfo();
        while (true) {
            basicRequestInfo = createRequestInfo;
            if (!this.rateLimitingStrategy.shouldBlock(basicRequestInfo)) {
                break;
            }
            this.mailboxExecutor.yield();
            createRequestInfo = createRequestInfo();
        }
        List<RequestEntryT> createNextAvailableBatch = createNextAvailableBatch(basicRequestInfo);
        if (createNextAvailableBatch.size() == 0) {
            return;
        }
        int batchSize = basicRequestInfo.getBatchSize();
        long currentTimeMillis = System.currentTimeMillis();
        Consumer<List<RequestEntryT>> consumer = list -> {
            this.mailboxExecutor.execute(() -> {
                completeRequest(list, batchSize, currentTimeMillis);
            }, "Mark in-flight request as completed and requeue %d request entries", new Object[]{Integer.valueOf(list.size())});
        };
        this.rateLimitingStrategy.registerInFlightRequest(basicRequestInfo);
        this.inFlightRequestsCount++;
        submitRequestEntries(createNextAvailableBatch, consumer);
    }

    private int getNextBatchSize() {
        return Math.min(getNextBatchSizeLimit(), this.bufferedRequestEntries.size());
    }

    private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
        ArrayList arrayList = new ArrayList(requestInfo.getBatchSize());
        long j = 0;
        for (int i = 0; i < requestInfo.getBatchSize(); i++) {
            long size = this.bufferedRequestEntries.peek().getSize();
            if (j + size > this.maxBatchSizeInBytes) {
                break;
            }
            arrayList.add(this.bufferedRequestEntries.remove().getRequestEntry());
            this.bufferedRequestEntriesTotalSizeInBytes -= size;
            j += size;
        }
        this.numRecordsOutCounter.inc(arrayList.size());
        this.numBytesOutCounter.inc(j);
        return arrayList;
    }

    private void completeRequest(List<RequestEntryT> list, int i, long j) throws InterruptedException {
        this.lastSendTimestamp = j;
        this.ackTime = System.currentTimeMillis();
        this.inFlightRequestsCount--;
        this.rateLimitingStrategy.registerCompletedRequest(new BasicResultInfo(list.size(), i));
        ListIterator<RequestEntryT> listIterator = list.listIterator(list.size());
        while (listIterator.hasPrevious()) {
            addEntryToBuffer((AsyncSinkWriter<InputT, RequestEntryT>) listIterator.previous(), true);
        }
        nonBlockingFlush();
    }

    private void addEntryToBuffer(RequestEntryT requestentryt, boolean z) {
        addEntryToBuffer(new RequestEntryWrapper<>(requestentryt, getSizeInBytes(requestentryt)), z);
    }

    private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> requestEntryWrapper, boolean z) {
        if (this.bufferedRequestEntries.isEmpty() && !this.existsActiveTimerCallback) {
            registerCallback();
        }
        if (requestEntryWrapper.getSize() > this.maxRecordSizeInBytes) {
            throw new IllegalArgumentException(String.format("The request entry sent to the buffer was of size [%s], when the maxRecordSizeInBytes was set to [%s].", Long.valueOf(requestEntryWrapper.getSize()), Long.valueOf(this.maxRecordSizeInBytes)));
        }
        if (z) {
            this.bufferedRequestEntries.addFirst(requestEntryWrapper);
        } else {
            this.bufferedRequestEntries.add(requestEntryWrapper);
        }
        this.bufferedRequestEntriesTotalSizeInBytes += requestEntryWrapper.getSize();
    }

    public void flush(boolean z) throws InterruptedException {
        while (true) {
            if (this.inFlightRequestsCount <= 0 && (this.bufferedRequestEntries.size() <= 0 || !z)) {
                return;
            }
            yieldIfThereExistsInFlightRequests();
            if (z) {
                flush();
            }
        }
    }

    private void yieldIfThereExistsInFlightRequests() throws InterruptedException {
        if (this.inFlightRequestsCount > 0) {
            this.mailboxExecutor.yield();
        }
    }

    public List<BufferedRequestState<RequestEntryT>> snapshotState(long j) {
        return Collections.singletonList(new BufferedRequestState(this.bufferedRequestEntries));
    }

    private void initializeState(Collection<BufferedRequestState<RequestEntryT>> collection) {
        Iterator<BufferedRequestState<RequestEntryT>> it = collection.iterator();
        while (it.hasNext()) {
            Iterator<RequestEntryWrapper<RequestEntryT>> it2 = it.next().getBufferedRequestEntries().iterator();
            while (it2.hasNext()) {
                addEntryToBuffer((RequestEntryWrapper) it2.next(), false);
            }
        }
    }

    public void close() {
    }

    private int getNextBatchSizeLimit() {
        return Math.min(this.maxBatchSize, this.rateLimitingStrategy.getMaxBatchSize());
    }

    protected Consumer<Exception> getFatalExceptionCons() {
        return this.fatalExceptionCons;
    }
}
