package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Stream;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.class */
public class RocksIncrementalSnapshotStrategy<K> extends RocksDBSnapshotStrategyBase<K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
    private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot";

    @Nonnull
    private final SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedSstFiles;
    private long lastCompletedCheckpointId;
    private final RocksDBStateUploader stateUploader;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy = new int[SnapshotType.SharingFilesStrategy.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[SnapshotType.SharingFilesStrategy.FORWARD_BACKWARD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[SnapshotType.SharingFilesStrategy.FORWARD.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[SnapshotType.SharingFilesStrategy.NO_SHARING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.class */
    public final class RocksDBIncrementalSnapshotOperation extends RocksDBSnapshotStrategyBase<K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources>.RocksDBSnapshotOperation {

        @Nonnull
        private final RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot;

        @Nonnull
        private final SnapshotType.SharingFilesStrategy sharingFilesStrategy;

        private RocksDBIncrementalSnapshotOperation(long j, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull SnapshotDirectory snapshotDirectory, @Nonnull RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot, @Nonnull SnapshotType.SharingFilesStrategy sharingFilesStrategy, @Nonnull List<StateMetaInfoSnapshot> list) {
            super(j, checkpointStreamFactory, snapshotDirectory, list);
            this.previousSnapshot = previousSnapshot;
            this.sharingFilesStrategy = sharingFilesStrategy;
        }

        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry closeableRegistry) throws Exception {
            boolean z = false;
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            try {
                SnapshotResult<StreamStateHandle> materializeMetaData = RocksIncrementalSnapshotStrategy.this.materializeMetaData(closeableRegistry, this.tmpResourcesRegistry, this.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                Preconditions.checkNotNull(materializeMetaData, "Metadata was not properly created.");
                Preconditions.checkNotNull(materializeMetaData.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
                IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, arrayList, arrayList2, materializeMetaData.getJobManagerOwnedSnapshot(), materializeMetaData.getStateSize() + uploadSnapshotFiles(arrayList, arrayList2, closeableRegistry, this.tmpResourcesRegistry, arrayList3));
                SnapshotResult<KeyedStateHandle> snapshotResult = (SnapshotResult) getLocalSnapshot((StreamStateHandle) materializeMetaData.getTaskLocalSnapshot(), arrayList).map(keyedStateHandle -> {
                    return SnapshotResult.withLocalState(incrementalRemoteKeyedStateHandle, keyedStateHandle);
                }).orElseGet(() -> {
                    return SnapshotResult.of(incrementalRemoteKeyedStateHandle);
                });
                z = true;
                if (1 == 0) {
                    RocksIncrementalSnapshotStrategy.this.cleanupIncompleteSnapshot(this.tmpResourcesRegistry, this.localBackupDirectory);
                } else {
                    this.checkpointStreamFactory.reusePreviousStateHandle(arrayList3);
                }
                return snapshotResult;
            } catch (Throwable th) {
                if (z) {
                    this.checkpointStreamFactory.reusePreviousStateHandle(arrayList3);
                } else {
                    RocksIncrementalSnapshotStrategy.this.cleanupIncompleteSnapshot(this.tmpResourcesRegistry, this.localBackupDirectory);
                }
                throw th;
            }
        }

        private long uploadSnapshotFiles(@Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> list, @Nonnull List<IncrementalKeyedStateHandle.HandleAndLocalPath> list2, @Nonnull CloseableRegistry closeableRegistry, @Nonnull CloseableRegistry closeableRegistry2, @Nonnull List<StreamStateHandle> list3) throws Exception {
            Preconditions.checkState(this.localBackupDirectory.exists());
            Path[] listDirectory = this.localBackupDirectory.listDirectory();
            long j = 0;
            if (listDirectory != null) {
                ArrayList arrayList = new ArrayList(listDirectory.length);
                ArrayList arrayList2 = new ArrayList(listDirectory.length);
                createUploadFilePaths(listDirectory, list, arrayList, arrayList2);
                CheckpointedStateScope checkpointedStateScope = this.sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED;
                Stream map = list.stream().map((v0) -> {
                    return v0.getHandle();
                });
                list3.getClass();
                map.forEach((v1) -> {
                    r1.add(v1);
                });
                List<IncrementalKeyedStateHandle.HandleAndLocalPath> uploadFilesToCheckpointFs = RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(arrayList, this.checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
                long sum = 0 + uploadFilesToCheckpointFs.stream().mapToLong((v0) -> {
                    return v0.getStateSize();
                }).sum();
                list.addAll(uploadFilesToCheckpointFs);
                List<IncrementalKeyedStateHandle.HandleAndLocalPath> uploadFilesToCheckpointFs2 = RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(arrayList2, this.checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
                j = sum + uploadFilesToCheckpointFs2.stream().mapToLong((v0) -> {
                    return v0.getStateSize();
                }).sum();
                list2.addAll(uploadFilesToCheckpointFs2);
                synchronized (RocksIncrementalSnapshotStrategy.this.uploadedSstFiles) {
                    switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[this.sharingFilesStrategy.ordinal()]) {
                        case 1:
                        case 2:
                            RocksIncrementalSnapshotStrategy.this.uploadedSstFiles.put(Long.valueOf(this.checkpointId), Collections.unmodifiableList(list));
                            break;
                        case 3:
                            break;
                        default:
                            throw new IllegalArgumentException("Unsupported sharing files strategy: " + this.sharingFilesStrategy);
                    }
                }
            }
            return j;
        }

        private void createUploadFilePaths(Path[] pathArr, List<IncrementalKeyedStateHandle.HandleAndLocalPath> list, List<Path> list2, List<Path> list3) {
            for (Path path : pathArr) {
                String path2 = path.getFileName().toString();
                if (path2.endsWith(RocksSnapshotUtil.SST_FILE_SUFFIX)) {
                    Optional<StreamStateHandle> uploaded = this.previousSnapshot.getUploaded(path2);
                    if (uploaded.isPresent() && this.checkpointStreamFactory.couldReuseStateHandle(uploaded.get())) {
                        list.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of(uploaded.get(), path2));
                    } else {
                        list2.add(path);
                    }
                } else {
                    list3.add(path);
                }
            }
        }

        /* synthetic */ RocksDBIncrementalSnapshotOperation(RocksIncrementalSnapshotStrategy rocksIncrementalSnapshotStrategy, long j, CheckpointStreamFactory checkpointStreamFactory, SnapshotDirectory snapshotDirectory, RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot, SnapshotType.SharingFilesStrategy sharingFilesStrategy, List list, AnonymousClass1 anonymousClass1) {
            this(j, checkpointStreamFactory, snapshotDirectory, previousSnapshot, sharingFilesStrategy, list);
        }
    }

    public RocksIncrementalSnapshotStrategy(@Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int i, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull File file, @Nonnull UUID uuid, @Nonnull SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> sortedMap, @Nonnull RocksDBStateUploader rocksDBStateUploader, long j) {
        super(DESCRIPTION, rocksDB, resourceGuard, typeSerializer, linkedHashMap, keyGroupRange, i, localRecoveryConfig, file, uuid);
        this.uploadedSstFiles = new TreeMap((SortedMap) sortedMap);
        this.stateUploader = rocksDBStateUploader;
        this.lastCompletedCheckpointId = j;
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources nativeRocksDBSnapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        RocksDBSnapshotStrategyBase.PreviousSnapshot previousSnapshot;
        if (nativeRocksDBSnapshotResources.stateMetaInfoSnapshots.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", Long.valueOf(j2));
            }
            return closeableRegistry -> {
                return SnapshotResult.empty();
            };
        }
        SnapshotType.SharingFilesStrategy sharingFilesStrategy = checkpointOptions.getCheckpointType().getSharingFilesStrategy();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$checkpoint$SnapshotType$SharingFilesStrategy[sharingFilesStrategy.ordinal()]) {
            case 1:
                previousSnapshot = nativeRocksDBSnapshotResources.previousSnapshot;
                break;
            case 2:
            case 3:
                previousSnapshot = EMPTY_PREVIOUS_SNAPSHOT;
                break;
            default:
                throw new IllegalArgumentException("Unsupported sharing files strategy: " + sharingFilesStrategy);
        }
        return new RocksDBIncrementalSnapshotOperation(this, j, checkpointStreamFactory, nativeRocksDBSnapshotResources.snapshotDirectory, previousSnapshot, sharingFilesStrategy, nativeRocksDBSnapshotResources.stateMetaInfoSnapshots, null);
    }

    public void notifyCheckpointComplete(long j) {
        synchronized (this.uploadedSstFiles) {
            if (j > this.lastCompletedCheckpointId && this.uploadedSstFiles.containsKey(Long.valueOf(j))) {
                this.uploadedSstFiles.keySet().removeIf(l -> {
                    return l.longValue() < j;
                });
                this.lastCompletedCheckpointId = j;
            }
        }
    }

    public void notifyCheckpointAborted(long j) {
        synchronized (this.uploadedSstFiles) {
            this.uploadedSstFiles.keySet().remove(Long.valueOf(j));
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase, java.lang.AutoCloseable
    public void close() throws IOException {
        this.stateUploader.close();
    }

    @Override // org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase
    protected RocksDBSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long j, @Nonnull List<StateMetaInfoSnapshot> list) {
        long j2;
        Collection<IncrementalKeyedStateHandle.HandleAndLocalPath> collection;
        synchronized (this.uploadedSstFiles) {
            j2 = this.lastCompletedCheckpointId;
            collection = this.uploadedSstFiles.get(Long.valueOf(j2));
            LOG.trace("Use confirmed SST files for checkpoint {}: {}", Long.valueOf(j), collection);
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) confirmed files as base: {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), collection});
        Iterator<Map.Entry<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo>> it = this.kvStateInformation.entrySet().iterator();
        while (it.hasNext()) {
            list.add(it.next().getValue().metaInfo.snapshot());
        }
        return new RocksDBSnapshotStrategyBase.PreviousSnapshot(collection);
    }
}
