/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.test.runtime.JobGraphRunningUtil;
import org.apache.flink.types.LongValue;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ShuffleCompressionITCase {
    private static final int NUM_BUFFERS_TO_SEND = 1000;
    private static final int BUFFER_SIZE = 32768;
    private static final int BYTES_PER_RECORD = 12;
    private static final int NUM_RECORDS_TO_SEND = 2730667;
    private static final int NUM_TASKMANAGERS = 2;
    private static final int NUM_SLOTS = 4;
    private static final int PARALLELISM = 8;
    private static final LongValue RECORD_TO_SEND = new LongValue(4387942071694473832L);
    @Parameterized.Parameter
    public static boolean useBroadcastPartitioner = false;

    @Parameterized.Parameters(name="useBroadcastPartitioner = {0}")
    public static Boolean[] params() {
        return new Boolean[]{true, false};
    }

    @Test
    public void testNoDataCompressionForBoundedBlockingShuffle() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, (Object)Duration.ofMinutes(1L));
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, Integer.MAX_VALUE);
        JobGraph jobGraph = ShuffleCompressionITCase.createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testNoDataCompressionForSortMergeBlockingShuffle() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setBoolean(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, (Object)Duration.ofMinutes(1L));
        JobGraph jobGraph = ShuffleCompressionITCase.createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    private static JobGraph createJobGraph(ResultPartitionType resultPartitionType, ExecutionMode executionMode) throws IOException {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(LongValueSource.class);
        source.setParallelism(8);
        source.setSlotSharingGroup(slotSharingGroup);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(ResultVerifyingSink.class);
        sink.setParallelism(8);
        sink.setSlotSharingGroup(slotSharingGroup);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setExecutionMode(executionMode);
        return JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(Arrays.asList(source, sink)).setExecutionConfig(executionConfig).build();
    }

    public static final class ResultVerifyingSink
    extends AbstractInvokable {
        public ResultVerifyingSink(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            MutableRecordReader reader = new MutableRecordReader((InputGate)this.getEnvironment().getInputGate(0), new String[]{EnvironmentInformation.getTemporaryFileDirectory()});
            LongValue value = new LongValue();
            for (int i = 0; i < 21845336; ++i) {
                reader.next((IOReadableWritable)value);
                Assert.assertEquals((long)RECORD_TO_SEND.getValue(), (long)value.getValue());
            }
        }
    }

    public static final class LongValueSource
    extends AbstractInvokable {
        public LongValueSource(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            ResultPartitionWriter resultPartitionWriter = this.getEnvironment().getWriter(0);
            RecordWriterBuilder recordWriterBuilder = new RecordWriterBuilder();
            if (this.getEnvironment().getExecutionConfig().getExecutionMode() == ExecutionMode.PIPELINED) {
                recordWriterBuilder.setTimeout(100L);
            }
            if (useBroadcastPartitioner) {
                recordWriterBuilder.setChannelSelector((ChannelSelector)new BroadcastPartitioner());
            }
            RecordWriter writer = recordWriterBuilder.build(resultPartitionWriter);
            for (int i = 0; i < 2730667; ++i) {
                writer.broadcastEmit((IOReadableWritable)RECORD_TO_SEND);
            }
            writer.flushAll();
            writer.close();
        }
    }
}

