/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public abstract class ChangelogPeriodicMaterializationTestBase
extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 1;
    private static final int NUM_TASK_SLOTS = 4;
    protected static final int NUM_SLOTS = 4;
    protected static final int TOTAL_ELEMENTS = 10000;
    protected final AbstractStateBackend delegatedStateBackend;
    protected MiniClusterWithClientResource cluster;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Parameterized.Parameters(name="delegated state backend type ={0}")
    public static Collection<AbstractStateBackend> parameter() {
        return Arrays.asList(new HashMapStateBackend(), new EmbeddedRocksDBStateBackend(true), new EmbeddedRocksDBStateBackend(false));
    }

    public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend delegatedStateBackend) {
        this.delegatedStateBackend = delegatedStateBackend;
    }

    @Before
    public void setup() throws Exception {
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.configure()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
        this.cluster.before();
        this.cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
    }

    @After
    public void tearDown() throws IOException {
        this.cluster.after();
        CollectionSink.clearExpectedResult();
    }

    protected StreamExecutionEnvironment getEnv(StateBackend stateBackend, File checkpointFile, long checkpointInterval, int restartAttempts, long materializationInterval, int materializationMaxFailure) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(checkpointInterval).enableChangelogStateBackend(true).getCheckpointConfig().setCheckpointStorage(checkpointFile.toURI());
        env.getCheckpointConfig().enableUnalignedCheckpoints(false);
        env.setStateBackend(stateBackend).setRestartStrategy(RestartStrategies.fixedDelayRestart((int)restartAttempts, (long)0L));
        env.configure((ReadableConfig)new Configuration().set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, (Object)Duration.ofMillis(materializationInterval)).set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, (Object)materializationMaxFailure));
        return env;
    }

    protected JobGraph buildJobGraph(StreamExecutionEnvironment env, ControlledSource controlledSource, JobID jobId) {
        env.addSource((SourceFunction)controlledSource).keyBy((KeySelector & Serializable)element -> element).map((MapFunction)new CountMapper()).addSink((SinkFunction)new CollectionSink()).setParallelism(1);
        return env.getStreamGraph().getJobGraph(jobId);
    }

    protected void waitAndAssert(JobGraph jobGraph) throws Exception {
        this.waitUntilJobFinished(jobGraph);
        Assert.assertEquals(CollectionSink.getActualResult(), ControlledSource.getExpectedResult());
    }

    protected JobID generateJobID() {
        byte[] randomBytes = new byte[16];
        ThreadLocalRandom.current().nextBytes(randomBytes);
        return JobID.fromByteArray((byte[])randomBytes);
    }

    protected static Set<StateHandleID> getAllStateHandleId(File checkpointFile) throws IOException {
        Optional<File> mostRecentCompletedCheckpoint = TestUtils.getMostRecentCompletedCheckpointMaybe(checkpointFile);
        if (!mostRecentCompletedCheckpoint.isPresent()) {
            return Collections.emptySet();
        }
        CheckpointMetadata checkpointMetadata = TestUtils.loadCheckpointMetadata(mostRecentCompletedCheckpoint.get().toString());
        Set<StateHandleID> materializationIds = checkpointMetadata.getOperatorStates().stream().flatMap(operatorState -> operatorState.getStates().stream()).flatMap(operatorSubtaskState -> operatorSubtaskState.getManagedKeyedState().stream()).flatMap(keyedStateHandle -> ((ChangelogStateBackendHandle)keyedStateHandle).getMaterializedStateHandles().stream()).map(KeyedStateHandle::getStateHandleId).collect(Collectors.toSet());
        if (!materializationIds.isEmpty()) {
            return materializationIds;
        }
        return Collections.emptySet();
    }

    private void waitUntilJobFinished(JobGraph jobGraph) throws Exception {
        JobSubmissionResult jobSubmissionResult = (JobSubmissionResult)this.cluster.getMiniCluster().submitJob(jobGraph).get();
        JobResult jobResult = (JobResult)this.cluster.getMiniCluster().requestJobResult(jobSubmissionResult.getJobID()).get();
        if (jobResult.getSerializedThrowable().isPresent()) {
            throw (SerializedThrowable)jobResult.getSerializedThrowable().get();
        }
        Assert.assertSame((Object)ApplicationStatus.SUCCEEDED, (Object)jobResult.getApplicationStatus());
    }

    private Configuration configure() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 200);
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)TEMPORARY_FOLDER.newFolder());
        return configuration;
    }

    protected static class ArtificialFailure
    extends Exception {
        protected ArtificialFailure() {
        }
    }

    @FunctionalInterface
    protected static interface SerializableFunctionWithException<T>
    extends FunctionWithException<T, T, Exception>,
    Serializable {
    }

    @FunctionalInterface
    protected static interface SerializableBooleanSupplierWithException
    extends Serializable {
        public boolean getAsBoolean() throws Exception;
    }

    protected static class DelegatedStateBackendWrapper
    extends AbstractStateBackend {
        private static final long serialVersionUID = 1L;
        private final AbstractStateBackend delegatedStataBackend;
        private final SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> snapshotResultFunction;

        public DelegatedStateBackendWrapper(AbstractStateBackend delegatedStataBackend, SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> snapshotResultFunction) {
            this.delegatedStataBackend = delegatedStataBackend;
            this.snapshotResultFunction = snapshotResultFunction;
        }

        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws IOException {
            final AbstractKeyedStateBackend delegatedKeyedStateBackend = this.delegatedStataBackend.createKeyedStateBackend(env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, ttlTimeProvider, metricGroup, stateHandles, cancelStreamRegistry);
            return new AbstractKeyedStateBackend<K>(kvStateRegistry, keySerializer, env.getUserCodeClassLoader().asClassLoader(), env.getExecutionConfig(), ttlTimeProvider, delegatedKeyedStateBackend.getLatencyTrackingStateConfig(), cancelStreamRegistry, delegatedKeyedStateBackend.getKeyContext()){

                public void setCurrentKey(K newKey) {
                    delegatedKeyedStateBackend.setCurrentKey(newKey);
                }

                public void notifyCheckpointComplete(long checkpointId) throws Exception {
                    delegatedKeyedStateBackend.notifyCheckpointComplete(checkpointId);
                }

                @Nonnull
                public SavepointResources<K> savepoint() throws Exception {
                    return delegatedKeyedStateBackend.savepoint();
                }

                public int numKeyValueStateEntries() {
                    return delegatedKeyedStateBackend.numKeyValueStateEntries();
                }

                public <N> Stream<K> getKeys(String state, N namespace) {
                    return delegatedKeyedStateBackend.getKeys(state, namespace);
                }

                public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
                    return delegatedKeyedStateBackend.getKeysAndNamespaces(state);
                }

                @Nonnull
                public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
                    return (IS)delegatedKeyedStateBackend.createInternalState(namespaceSerializer, stateDesc, snapshotTransformFactory);
                }

                @Nonnull
                public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
                    return delegatedKeyedStateBackend.create(stateName, byteOrderedElementSerializer);
                }

                @Nonnull
                public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                    RunnableFuture snapshotResultRunnableFuture = delegatedKeyedStateBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
                    return (RunnableFuture)snapshotResultFunction.apply(snapshotResultRunnableFuture);
                }

                public void dispose() {
                    super.dispose();
                    delegatedKeyedStateBackend.dispose();
                }

                public void close() throws IOException {
                    super.close();
                    delegatedKeyedStateBackend.close();
                }
            };
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
            return this.delegatedStataBackend.createOperatorStateBackend(env, operatorIdentifier, stateHandles, cancelStreamRegistry);
        }
    }

    protected static class CountMapper
    extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private ValueState<Integer> countState;

        protected CountMapper() {
        }

        public void open(Configuration parameters) throws Exception {
            this.countState = this.getRuntimeContext().getState(new ValueStateDescriptor("countState", Integer.class));
        }

        public Tuple2<Integer, Integer> map(Integer value) throws Exception {
            Integer count = (Integer)this.countState.value();
            Integer currentCount = count == null ? 1 : count + 1;
            this.countState.update((Object)currentCount);
            return Tuple2.of((Object)value, (Object)currentCount);
        }
    }

    protected static class CollectionSink
    implements SinkFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private static final Map<Integer, Integer> expectedResult = new HashMap<Integer, Integer>();

        protected CollectionSink() {
        }

        public void invoke(Tuple2<Integer, Integer> value, SinkFunction.Context context) throws Exception {
            expectedResult.merge((Integer)value.f0, (Integer)value.f1, Math::max);
        }

        public static void clearExpectedResult() {
            expectedResult.clear();
        }

        public static Map<Integer, Integer> getActualResult() {
            return expectedResult;
        }
    }

    protected static class ControlledSource
    extends RichSourceFunction<Integer>
    implements CheckpointedFunction,
    CheckpointListener {
        private static final long serialVersionUID = 1L;
        protected volatile int currentIndex;
        protected final AtomicInteger completedCheckpointNum = new AtomicInteger();
        protected volatile boolean isCanceled;
        private static final List<Integer> sourceList = Collections.unmodifiableList(ControlledSource.initSourceData(10000));
        private transient ListState<Integer> currentIndexState;
        private transient ListState<Integer> completedCheckpointNumState;

        public static List<Integer> initSourceData(int totalNum) {
            ArrayList<Integer> sourceList = new ArrayList<Integer>(totalNum);
            for (int i = 0; i < totalNum; ++i) {
                sourceList.add(ThreadLocalRandom.current().nextInt(totalNum));
            }
            return sourceList;
        }

        public static Map<Integer, Integer> getExpectedResult() {
            return sourceList.stream().collect(Collectors.toConcurrentMap(element -> element % 100, element -> 1, Integer::sum));
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.currentIndexState.update(Collections.singletonList(this.currentIndex));
            this.completedCheckpointNumState.update(Collections.singletonList(this.completedCheckpointNum.get()));
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.currentIndexState = context.getOperatorStateStore().getListState(new ListStateDescriptor("currentIndexState", Integer.class));
            this.completedCheckpointNumState = context.getOperatorStateStore().getListState(new ListStateDescriptor("completedCheckpointNumState", Integer.class));
            if (context.isRestored()) {
                this.currentIndex = (Integer)Iterables.get((Iterable)((Iterable)this.currentIndexState.get()), (int)0);
                this.completedCheckpointNum.compareAndSet(0, (Integer)Iterables.get((Iterable)((Iterable)this.completedCheckpointNumState.get()), (int)0));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (!this.isCanceled && this.currentIndex < sourceList.size()) {
                this.beforeElement(ctx);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    if (!this.isCanceled && this.currentIndex < sourceList.size()) {
                        int currentElement = sourceList.get(this.currentIndex++);
                        ctx.collect((Object)(currentElement % 100));
                    }
                }
            }
        }

        protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
        }

        protected void waitWhile(SerializableBooleanSupplierWithException supplier) throws Exception {
            while (supplier.getAsBoolean()) {
                Thread.sleep(10L);
            }
        }

        protected void throwArtificialFailure() throws Exception {
            throw new ArtificialFailure();
        }

        public void cancel() {
            this.isCanceled = true;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            this.completedCheckpointNum.getAndIncrement();
        }
    }
}

