/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.util.TestExpandingSink;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={TestLoggerExtension.class})
class StreamingJobGraphGeneratorSourceSinkTest {
    private StreamExecutionEnvironment env;

    StreamingJobGraphGeneratorSourceSinkTest() {
    }

    @BeforeEach
    void setUp() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
    }

    @Test
    void testLegacySource() {
        this.env.fromElements((Object[])new Integer[]{0, 1}).map((MapFunction & Serializable)i -> i);
        List<JobVertex> verticesSorted = this.getJobVertices();
        JobVertex sourceVertex = verticesSorted.get(0);
        Assertions.assertThat((boolean)sourceVertex.containsSources()).isTrue();
        Assertions.assertThat((boolean)sourceVertex.containsSinks()).isFalse();
    }

    @Test
    void testNewSource() {
        this.env.fromSequence(0L, 1L).map((MapFunction & Serializable)i -> i);
        List<JobVertex> verticesSorted = this.getJobVertices();
        JobVertex sourceVertex = verticesSorted.get(0);
        Assertions.assertThat((boolean)sourceVertex.containsSources()).isTrue();
        Assertions.assertThat((boolean)sourceVertex.containsSinks()).isFalse();
    }

    @Test
    void testMultiInputSource() {
        DataStreamSource source1 = this.env.fromSequence(0L, 1L);
        DataStreamSource source2 = this.env.fromSequence(0L, 1L);
        MultipleInputTransformation multiInputTransform = new MultipleInputTransformation("multi-input-operator", (StreamOperatorFactory)new StreamingJobGraphGeneratorTest.UnusedOperatorFactory(), Types.LONG, this.env.getParallelism());
        multiInputTransform.addInput(source1.map((MapFunction & Serializable)i -> i).getTransformation());
        multiInputTransform.addInput(source2.getTransformation());
        multiInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        this.env.addOperator((Transformation)multiInputTransform);
        List<JobVertex> verticesSorted = this.getJobVertices();
        JobVertex source1Vertex = verticesSorted.get(0);
        Assertions.assertThat((boolean)source1Vertex.containsSources()).isTrue();
        Assertions.assertThat((boolean)source1Vertex.containsSinks()).isFalse();
        JobVertex multiInputVertex = verticesSorted.get(1);
        Assertions.assertThat((boolean)multiInputVertex.containsSources()).isTrue();
        Assertions.assertThat((boolean)multiInputVertex.containsSinks()).isFalse();
    }

    @Test
    void testLegacySink() {
        this.env.fromElements((Object[])new Integer[]{0, 1}).map((MapFunction & Serializable)i -> i).startNewChain().addSink((SinkFunction)new SinkFunction<Integer>(){});
        List<JobVertex> verticesSorted = this.getJobVertices();
        JobVertex sinkVertex = verticesSorted.get(1);
        Assertions.assertThat((boolean)sinkVertex.containsSources()).isFalse();
        Assertions.assertThat((boolean)sinkVertex.containsSinks()).isTrue();
    }

    @Test
    void testNewSink() {
        this.env.fromElements((Object[])new Integer[]{0, 1}).disableChaining().sinkTo((Sink)new TestExpandingSink());
        List<JobVertex> verticesSorted = this.getJobVertices();
        JobVertex sinkVertex = verticesSorted.get(1);
        Assertions.assertThat((boolean)sinkVertex.containsSources()).isFalse();
        Assertions.assertThat((boolean)sinkVertex.containsSinks()).isTrue();
    }

    @Test
    void testNewSinkWithSinkTopology() {
        this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        this.env.fromElements((Object[])new Integer[]{0, 1}).disableChaining().sinkTo((Sink)new TestExpandingSink());
        List<JobVertex> verticesSorted = this.getJobVertices();
        JobVertex sinkWriterVertex = verticesSorted.get(1);
        Assertions.assertThat((boolean)sinkWriterVertex.containsSources()).isFalse();
        Assertions.assertThat((boolean)sinkWriterVertex.containsSinks()).isTrue();
        JobVertex sinkCommitterVertex = verticesSorted.get(2);
        Assertions.assertThat((boolean)sinkCommitterVertex.containsSources()).isFalse();
        Assertions.assertThat((boolean)sinkCommitterVertex.containsSinks()).isTrue();
        JobVertex sinkPostCommitterVertex = verticesSorted.get(3);
        Assertions.assertThat((boolean)sinkPostCommitterVertex.containsSources()).isFalse();
        Assertions.assertThat((boolean)sinkPostCommitterVertex.containsSinks()).isTrue();
    }

    @Test
    void testChainedSourceSink() {
        this.env.setParallelism(1);
        this.env.fromElements((Object[])new Integer[]{0, 1}).sinkTo((Sink)new TestExpandingSink());
        List<JobVertex> verticesSorted = this.getJobVertices();
        JobVertex sourceSinkVertex = verticesSorted.get(0);
        Assertions.assertThat((boolean)sourceSinkVertex.containsSources()).isTrue();
        Assertions.assertThat((boolean)sourceSinkVertex.containsSinks()).isTrue();
    }

    private List<JobVertex> getJobVertices() {
        StreamGraph streamGraph = this.env.getStreamGraph();
        JobGraph jobGraph = streamGraph.getJobGraph();
        return jobGraph.getVerticesSortedTopologicallyFromSources();
    }
}

