/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Test;

public class ChangelogPeriodicMaterializationRescaleITCase
extends ChangelogPeriodicMaterializationTestBase {
    public ChangelogPeriodicMaterializationRescaleITCase(AbstractStateBackend delegatedStateBackend) {
        super(delegatedStateBackend);
    }

    @Test
    public void testRescaleOut() throws Exception {
        this.testRescale(2, 4);
    }

    @Test
    public void testRescaleIn() throws Exception {
        this.testRescale(4, 2);
    }

    private void testRescale(int firstParallelism, int secondParallelism) throws Exception {
        final File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
        JobID firstJobID = this.generateJobID();
        final SharedReference currentMaterializationId = this.sharedObjects.add(ConcurrentHashMap.newKeySet());
        JobGraph firstJobGraph = this.buildJobGraph(this.getEnv(firstCheckpointFolder, firstParallelism), new ChangelogPeriodicMaterializationTestBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                if (this.currentIndex == 2500) {
                    this.waitWhile(() -> {
                        if (this.completedCheckpointNum.get() <= 0) {
                            return true;
                        }
                        Set<StateHandleID> allMaterializationId = ChangelogPeriodicMaterializationTestBase.getAllStateHandleId(firstCheckpointFolder);
                        if (!allMaterializationId.isEmpty()) {
                            ((Set)currentMaterializationId.get()).addAll(allMaterializationId);
                            return false;
                        }
                        return true;
                    });
                } else if (this.currentIndex > 2500) {
                    this.throwArtificialFailure();
                }
            }
        }, firstJobID);
        try {
            this.cluster.getMiniCluster().submitJob(firstJobGraph).get();
            this.cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get();
        }
        catch (Exception ex) {
            Preconditions.checkState((boolean)ExceptionUtils.findThrowable((Throwable)ex, ChangelogPeriodicMaterializationTestBase.ArtificialFailure.class).isPresent());
        }
        final File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
        JobID secondJobId = this.generateJobID();
        JobGraph jobGraph = this.buildJobGraph(this.getEnv(secondCheckpointFolder, secondParallelism), new ChangelogPeriodicMaterializationTestBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                if (this.currentIndex == 5000) {
                    this.waitWhile(() -> {
                        Set<StateHandleID> allMaterializationId = ChangelogPeriodicMaterializationTestBase.getAllStateHandleId(secondCheckpointFolder);
                        return allMaterializationId.isEmpty() || ((Set)currentMaterializationId.get()).equals(allMaterializationId);
                    });
                }
            }
        }, secondJobId);
        File checkpoint = TestUtils.getMostRecentCompletedCheckpoint(firstCheckpointFolder);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)checkpoint.getPath()));
        this.waitAndAssert(jobGraph);
    }

    private StreamExecutionEnvironment getEnv(File checkpointFolder, int parallelism) {
        StreamExecutionEnvironment env = this.getEnv((StateBackend)this.delegatedStateBackend, checkpointFolder, 50L, 0, 20L, 0);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setParallelism(parallelism);
        return env;
    }
}

