package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.SequenceNumberRange;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/changelog/fs/FsStateChangelogWriter.class */
public class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandleStreamImpl> {
    private static final long DUMMY_PERSIST_CHECKPOINT = -1;
    private final UUID logId;
    private final KeyGroupRange keyGroupRange;
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private long activeChangeSetSize;

    @Nullable
    private Tuple2<SequenceNumber, Throwable> highestFailed;
    private boolean closed;
    private final MailboxExecutor mailboxExecutor;
    private final TaskChangelogRegistry changelogRegistry;

    @Nonnull
    private final LocalRecoveryConfig localRecoveryConfig;
    private final LocalChangelogRegistry localChangelogRegistry;
    private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogWriter.class);
    private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0);
    private final List<UploadCompletionListener> uploadCompletionListeners = new ArrayList();
    private SequenceNumber activeSequenceNumber = INITIAL_SQN;
    private SequenceNumber lowestSequenceNumber = INITIAL_SQN;
    private SequenceNumber highestSequenceNumber = SequenceNumber.of(Long.MAX_VALUE);
    private List<StateChange> activeChangeSet = new ArrayList();
    private final NavigableMap<SequenceNumber, StateChangeSet> notUploaded = new TreeMap();
    private final NavigableMap<SequenceNumber, UploadResult> uploaded = new TreeMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/changelog/fs/FsStateChangelogWriter$UploadCompletionListener.class */
    public final class UploadCompletionListener {
        private final NavigableMap<SequenceNumber, UploadResult> uploaded;
        private final CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> completionFuture;
        private final KeyGroupRange keyGroupRange;
        private final SequenceNumberRange changeRange;

        private UploadCompletionListener(KeyGroupRange keyGroupRange, SequenceNumberRange sequenceNumberRange, Map<SequenceNumber, UploadResult> map, CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> completableFuture) {
            Preconditions.checkArgument(!sequenceNumberRange.isEmpty(), "Empty change range not allowed: %s", new Object[]{sequenceNumberRange});
            this.uploaded = new TreeMap(map);
            this.completionFuture = completableFuture;
            this.keyGroupRange = keyGroupRange;
            this.changeRange = sequenceNumberRange;
        }

        public boolean onSuccess(List<UploadResult> list) {
            long j = 0;
            for (UploadResult uploadResult : list) {
                if (this.changeRange.contains(uploadResult.sequenceNumber)) {
                    this.uploaded.put(uploadResult.sequenceNumber, uploadResult);
                    j += uploadResult.getSize();
                    if (this.uploaded.size() == this.changeRange.size()) {
                        this.completionFuture.complete(FsStateChangelogWriter.this.buildSnapshotResult(this.keyGroupRange, this.uploaded, j));
                        return true;
                    }
                }
            }
            return false;
        }

        public boolean onFailure(List<SequenceNumber> list, Throwable th) {
            IOException iOException = th instanceof IOException ? (IOException) th : new IOException(th);
            Iterator<SequenceNumber> it = list.iterator();
            while (it.hasNext()) {
                if (this.changeRange.contains(it.next())) {
                    this.completionFuture.completeExceptionally(iOException);
                    return true;
                }
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FsStateChangelogWriter(UUID uuid, KeyGroupRange keyGroupRange, StateChangeUploadScheduler stateChangeUploadScheduler, long j, MailboxExecutor mailboxExecutor, TaskChangelogRegistry taskChangelogRegistry, LocalRecoveryConfig localRecoveryConfig, LocalChangelogRegistry localChangelogRegistry) {
        this.logId = uuid;
        this.keyGroupRange = keyGroupRange;
        this.uploader = stateChangeUploadScheduler;
        this.preEmptivePersistThresholdInBytes = j;
        this.mailboxExecutor = mailboxExecutor;
        this.changelogRegistry = taskChangelogRegistry;
        this.localRecoveryConfig = localRecoveryConfig;
        this.localChangelogRegistry = localChangelogRegistry;
    }

    public void appendMeta(byte[] bArr) throws IOException {
        if (this.closed) {
            LOG.warn("{} is closed.", this.logId);
            return;
        }
        LOG.trace("append metadata to {}: {} bytes", this.logId, Integer.valueOf(bArr.length));
        this.activeChangeSet.add(StateChange.ofMetadataChange(bArr));
        preEmptiveFlushIfNeeded(bArr);
    }

    public void append(int i, byte[] bArr) throws IOException {
        LOG.trace("append to {}: keyGroup={} {} bytes", new Object[]{this.logId, Integer.valueOf(i), Integer.valueOf(bArr.length)});
        if (this.closed) {
            LOG.warn("{} is closed.", this.logId);
        } else {
            this.activeChangeSet.add(StateChange.ofDataChange(i, bArr));
            preEmptiveFlushIfNeeded(bArr);
        }
    }

    public SequenceNumber initialSequenceNumber() {
        return INITIAL_SQN;
    }

    public SequenceNumber nextSequenceNumber() {
        rollover();
        LOG.trace("query {} sqn: {}", this.logId, this.activeSequenceNumber);
        return this.activeSequenceNumber;
    }

    public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(SequenceNumber sequenceNumber, long j) throws IOException {
        LOG.debug("persist {} starting from sqn {} (incl.), active sqn: {}", new Object[]{this.logId, sequenceNumber, this.activeSequenceNumber});
        return persistInternal(sequenceNumber, j);
    }

    private void preEmptiveFlushIfNeeded(byte[] bArr) throws IOException {
        this.activeChangeSetSize += bArr.length;
        if (this.activeChangeSetSize >= this.preEmptivePersistThresholdInBytes) {
            LOG.debug("pre-emptively flush {}MB of appended changes to the common store", Long.valueOf((this.activeChangeSetSize / 1024) / 1024));
            persistInternal(this.notUploaded.isEmpty() ? this.activeSequenceNumber : this.notUploaded.firstKey(), DUMMY_PERSIST_CHECKPOINT);
        }
    }

    private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistInternal(SequenceNumber sequenceNumber, long j) throws IOException {
        ensureCanPersist(sequenceNumber);
        rollover();
        Map<SequenceNumber, StateChangeSet> drainTailMap = drainTailMap(this.notUploaded, sequenceNumber);
        NavigableMap<SequenceNumber, UploadResult> tailMap = this.uploaded.tailMap(sequenceNumber, true);
        LOG.debug("collected readyToReturn: {}, toUpload: {}, checkpointId: {}.", new Object[]{tailMap, drainTailMap, Long.valueOf(j)});
        if (j != DUMMY_PERSIST_CHECKPOINT) {
            for (UploadResult uploadResult : tailMap.values()) {
                if (uploadResult.localStreamHandle != null) {
                    this.localChangelogRegistry.register(uploadResult.localStreamHandle, j);
                }
            }
        }
        SequenceNumberRange generic = SequenceNumberRange.generic(sequenceNumber, this.activeSequenceNumber);
        if (generic.size() == tailMap.size()) {
            Preconditions.checkState(drainTailMap.isEmpty());
            return CompletableFuture.completedFuture(buildSnapshotResult(this.keyGroupRange, tailMap, 0L));
        }
        CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> completableFuture = new CompletableFuture<>();
        this.uploadCompletionListeners.add(new UploadCompletionListener(this.keyGroupRange, generic, tailMap, completableFuture));
        if (!drainTailMap.isEmpty()) {
            this.uploader.upload(new StateChangeUploadScheduler.UploadTask(drainTailMap.values(), list -> {
                handleUploadSuccess(list, j);
            }, this::handleUploadFailure));
        }
        return completableFuture;
    }

    private void handleUploadFailure(List<SequenceNumber> list, Throwable th) {
        this.mailboxExecutor.execute(() -> {
            if (this.closed) {
                return;
            }
            this.uploadCompletionListeners.removeIf(uploadCompletionListener -> {
                return uploadCompletionListener.onFailure(list, th);
            });
            list.stream().max(Comparator.naturalOrder()).filter(sequenceNumber -> {
                return sequenceNumber.compareTo(this.lowestSequenceNumber) >= 0;
            }).filter(sequenceNumber2 -> {
                return this.highestFailed == null || sequenceNumber2.compareTo(this.highestFailed.f0) > 0;
            }).ifPresent(sequenceNumber3 -> {
                this.highestFailed = Tuple2.of(sequenceNumber3, th);
            });
        }, "handleUploadFailure");
    }

    private void handleUploadSuccess(List<UploadResult> list, long j) {
        this.mailboxExecutor.execute(() -> {
            if (this.closed) {
                list.forEach(uploadResult -> {
                    IOUtils.closeAllQuietly(new AutoCloseable[]{() -> {
                        uploadResult.getStreamStateHandle().discardState();
                    }});
                });
                return;
            }
            this.uploadCompletionListeners.removeIf(uploadCompletionListener -> {
                return uploadCompletionListener.onSuccess(list);
            });
            Iterator it = list.iterator();
            while (it.hasNext()) {
                UploadResult uploadResult2 = (UploadResult) it.next();
                if (j != DUMMY_PERSIST_CHECKPOINT && uploadResult2.localStreamHandle != null) {
                    this.localChangelogRegistry.register(uploadResult2.localStreamHandle, j);
                }
                SequenceNumber sequenceNumber = uploadResult2.sequenceNumber;
                if (sequenceNumber.compareTo(this.lowestSequenceNumber) < 0 || sequenceNumber.compareTo(this.highestSequenceNumber) >= 0) {
                    this.changelogRegistry.release(uploadResult2.streamStateHandle);
                    if (uploadResult2.localStreamHandle != null) {
                        this.changelogRegistry.release(uploadResult2.localStreamHandle);
                    }
                } else {
                    this.uploaded.put(sequenceNumber, uploadResult2);
                }
            }
        }, "handleUploadSuccess");
    }

    public void close() {
        LOG.debug("close {}", this.logId);
        Preconditions.checkState(!this.closed);
        this.closed = true;
        this.activeChangeSet.clear();
        this.activeChangeSetSize = 0L;
        this.notUploaded.clear();
        this.uploaded.clear();
    }

    public void truncate(SequenceNumber sequenceNumber) {
        LOG.debug("truncate {} to sqn {} (excl.)", this.logId, sequenceNumber);
        Preconditions.checkArgument(sequenceNumber.compareTo(this.activeSequenceNumber) <= 0);
        this.lowestSequenceNumber = sequenceNumber;
        this.notUploaded.headMap(this.lowestSequenceNumber, false).clear();
        SortedMap<SequenceNumber, UploadResult> headMap = this.uploaded.headMap(sequenceNumber);
        notifyStateNotUsed(headMap);
        headMap.clear();
    }

    public void truncateAndClose(SequenceNumber sequenceNumber) {
        LOG.debug("truncate {} tail from sqn {} (incl.)", this.logId, sequenceNumber);
        this.highestSequenceNumber = sequenceNumber;
        notifyStateNotUsed(this.uploaded.tailMap(sequenceNumber));
        close();
    }

    private void rollover() {
        if (this.activeChangeSet.isEmpty()) {
            return;
        }
        this.notUploaded.put(this.activeSequenceNumber, new StateChangeSet(this.logId, this.activeSequenceNumber, this.activeChangeSet));
        this.activeSequenceNumber = this.activeSequenceNumber.next();
        LOG.debug("bump active sqn to {}", this.activeSequenceNumber);
        this.activeChangeSet = new ArrayList();
        this.activeChangeSetSize = 0L;
    }

    public void confirm(SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j) {
        Preconditions.checkState(sequenceNumber.compareTo(sequenceNumber2) <= 0, "Invalid confirm range: [%s,%s)", new Object[]{sequenceNumber, sequenceNumber2});
        Preconditions.checkState(sequenceNumber.compareTo(this.activeSequenceNumber) <= 0 && sequenceNumber2.compareTo(this.activeSequenceNumber) <= 0, "Invalid confirm range: [%s,%s), active sqn: %s", new Object[]{sequenceNumber, sequenceNumber2, this.activeSequenceNumber});
        LOG.debug("Confirm [{}, {})", sequenceNumber, sequenceNumber2);
        Stream<R> map = this.uploaded.subMap(sequenceNumber, sequenceNumber2).values().stream().map((v0) -> {
            return v0.getStreamStateHandle();
        });
        TaskChangelogRegistry taskChangelogRegistry = this.changelogRegistry;
        taskChangelogRegistry.getClass();
        map.forEach(taskChangelogRegistry::stopTracking);
        this.uploaded.subMap(sequenceNumber, sequenceNumber2).values().stream().map((v0) -> {
            return v0.getLocalStreamHandleStateHandle();
        }).filter(streamStateHandle -> {
            return streamStateHandle != null;
        }).forEach(streamStateHandle2 -> {
            this.changelogRegistry.stopTracking(streamStateHandle2);
        });
        this.localChangelogRegistry.discardUpToCheckpoint(j);
    }

    public void reset(SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j) {
        this.localChangelogRegistry.discardUpToCheckpoint(j + 1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(KeyGroupRange keyGroupRange, NavigableMap<SequenceNumber, UploadResult> navigableMap, long j) {
        ArrayList arrayList = new ArrayList();
        long j2 = 0;
        for (UploadResult uploadResult : navigableMap.values()) {
            arrayList.add(Tuple2.of(uploadResult.getStreamStateHandle(), Long.valueOf(uploadResult.getOffset())));
            j2 += uploadResult.getSize();
        }
        ChangelogStateHandleStreamImpl changelogStateHandleStreamImpl = new ChangelogStateHandleStreamImpl(arrayList, keyGroupRange, j2, j, FsStateChangelogStorageFactory.IDENTIFIER);
        if (this.localRecoveryConfig.isLocalRecoveryEnabled()) {
            long j3 = 0;
            ArrayList arrayList2 = new ArrayList();
            for (UploadResult uploadResult2 : navigableMap.values()) {
                if (uploadResult2.getLocalStreamHandleStateHandle() != null) {
                    arrayList2.add(Tuple2.of(uploadResult2.getLocalStreamHandleStateHandle(), Long.valueOf(uploadResult2.getLocalOffset())));
                    j3 += uploadResult2.getSize();
                }
            }
            if (arrayList2.size() == arrayList.size()) {
                return SnapshotResult.withLocalState(changelogStateHandleStreamImpl, new ChangelogStateHandleStreamImpl(arrayList2, keyGroupRange, j3, 0L, FsStateChangelogStorageFactory.IDENTIFIER));
            }
            LOG.warn("local handles are different from remote");
        }
        return SnapshotResult.of(changelogStateHandleStreamImpl);
    }

    @VisibleForTesting
    SequenceNumber lastAppendedSqnUnsafe() {
        return this.activeSequenceNumber;
    }

    private void ensureCanPersist(SequenceNumber sequenceNumber) throws IOException {
        Preconditions.checkNotNull(sequenceNumber);
        if (this.highestFailed != null && ((SequenceNumber) this.highestFailed.f0).compareTo(sequenceNumber) >= 0) {
            throw new IOException("The upload for " + this.highestFailed.f0 + " has already failed previously", (Throwable) this.highestFailed.f1);
        }
        if (this.lowestSequenceNumber.compareTo(sequenceNumber) > 0) {
            throw new IllegalArgumentException(String.format("Requested changes were truncated (requested: %s, truncated: %s)", sequenceNumber, this.lowestSequenceNumber));
        }
        if (this.activeSequenceNumber.compareTo(sequenceNumber) < 0) {
            throw new IllegalArgumentException(String.format("Requested changes were not yet appended (requested: %s, appended: %s)", sequenceNumber, this.activeSequenceNumber));
        }
    }

    private static Map<SequenceNumber, StateChangeSet> drainTailMap(NavigableMap<SequenceNumber, StateChangeSet> navigableMap, SequenceNumber sequenceNumber) {
        NavigableMap<SequenceNumber, StateChangeSet> tailMap = navigableMap.tailMap(sequenceNumber, true);
        HashMap hashMap = new HashMap(tailMap);
        tailMap.clear();
        return hashMap;
    }

    private void notifyStateNotUsed(Map<SequenceNumber, UploadResult> map) {
        LOG.trace("Uploaded state to discard: {}", map);
        for (UploadResult uploadResult : map.values()) {
            this.changelogRegistry.release(uploadResult.streamStateHandle);
            if (uploadResult.localStreamHandle != null) {
                this.changelogRegistry.release(uploadResult.localStreamHandle);
            }
        }
    }
}
