/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.test.runtime.JobGraphRunningUtil;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlockingShuffleITCase {
    private static final String RECORD = "hello, world!";
    private final int numTaskManagers = 2;
    private final int numSlotsPerTaskManager = 4;
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @Test
    public void testBoundedBlockingShuffle() throws Exception {
        JobGraph jobGraph = this.createJobGraph(1000000, false);
        Configuration configuration = this.getConfiguration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, Integer.MAX_VALUE);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testBoundedBlockingShuffleWithoutData() throws Exception {
        JobGraph jobGraph = this.createJobGraph(0, false);
        Configuration configuration = this.getConfiguration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, Integer.MAX_VALUE);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testSortMergeBlockingShuffle() throws Exception {
        Configuration configuration = this.getConfiguration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
        JobGraph jobGraph = this.createJobGraph(1000000, false);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testSortMergeBlockingShuffleWithoutData() throws Exception {
        Configuration configuration = this.getConfiguration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
        JobGraph jobGraph = this.createJobGraph(0, false);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testDeletePartitionFileOfBoundedBlockingShuffle() throws Exception {
        Configuration configuration = this.getConfiguration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, Integer.MAX_VALUE);
        JobGraph jobGraph = this.createJobGraph(0, true);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws Exception {
        Configuration configuration = this.getConfiguration();
        JobGraph jobGraph = this.createJobGraph(0, true);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    private Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.TMP_DIRS, (Object)TEMP_FOLDER.getRoot().getAbsolutePath());
        configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, (Object)100);
        return configuration;
    }

    private JobGraph createJobGraph(int numRecordsToSend, boolean deletePartitionFile) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        env.setBufferTimeout(-1L);
        env.setParallelism(8);
        DataStreamSource source = env.addSource((SourceFunction)new StringSource(numRecordsToSend));
        source.rebalance().map((MapFunction & Serializable)value -> value).broadcast().addSink((SinkFunction)new VerifySink(deletePartitionFile));
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        streamGraph.setJobType(JobType.BATCH);
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
    }

    private static class VerifySink
    extends RichSinkFunction<String> {
        private final boolean deletePartitionFile;

        VerifySink(boolean deletePartitionFile) {
            this.deletePartitionFile = deletePartitionFile;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            if (!this.deletePartitionFile || this.getRuntimeContext().getAttemptNumber() > 0) {
                return;
            }
            Class<BlockingShuffleITCase> clazz = BlockingShuffleITCase.class;
            synchronized (BlockingShuffleITCase.class) {
                VerifySink.deleteFiles(TEMP_FOLDER.getRoot());
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
        }

        public void invoke(String value, SinkFunction.Context context) throws Exception {
            Assert.assertEquals((Object)BlockingShuffleITCase.RECORD, (Object)value);
        }

        private static void deleteFiles(File root) throws IOException {
            File[] files = root.listFiles();
            if (files == null || files.length == 0) {
                return;
            }
            for (File file : files) {
                if (!file.isDirectory()) {
                    Files.deleteIfExists(file.toPath());
                    continue;
                }
                VerifySink.deleteFiles(file);
            }
        }
    }

    private static class StringSource
    implements ParallelSourceFunction<String> {
        private volatile boolean isRunning = true;
        private int numRecordsToSend;

        StringSource(int numRecordsToSend) {
            this.numRecordsToSend = numRecordsToSend;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.isRunning && this.numRecordsToSend-- > 0) {
                ctx.collect((Object)BlockingShuffleITCase.RECORD);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

