package org.apache.flink.test.streaming.api.datastream;

import java.lang.invoke.SerializedLambda;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase.class */
public class FinishedSourcesWatermarkITCase extends TestLogger {
    private static final AtomicLong CHECKPOINT_10_WATERMARK = new AtomicLong(Watermark.MAX_WATERMARK.getTimestamp());
    private static final AtomicBoolean DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK = new AtomicBoolean();

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase$LongRunningSource.class */
    private static class LongRunningSource extends RichSourceFunction<String> implements CheckpointListener {
        private volatile boolean isRunning;
        private long lastEmittedWatermark;

        private LongRunningSource() {
            this.isRunning = true;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.isRunning && !FinishedSourcesWatermarkITCase.DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.get()) {
                synchronized (sourceContext.getCheckpointLock()) {
                    this.lastEmittedWatermark = Math.max(System.currentTimeMillis(), this.lastEmittedWatermark);
                    sourceContext.emitWatermark(new Watermark(this.lastEmittedWatermark));
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (j == 5) {
                throw new RuntimeException("Force recovery");
            }
            if (j > 10) {
                FinishedSourcesWatermarkITCase.CHECKPOINT_10_WATERMARK.set(Math.min(this.lastEmittedWatermark, FinishedSourcesWatermarkITCase.CHECKPOINT_10_WATERMARK.get()));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase$NoopCoProcessFunction.class */
    private static class NoopCoProcessFunction extends CoProcessFunction<String, String, String> {
        private NoopCoProcessFunction() {
        }

        public void processElement1(String str, CoProcessFunction<String, String, String>.Context context, Collector<String> collector) {
        }

        public void processElement2(String str, CoProcessFunction<String, String, String>.Context context, Collector<String> collector) {
        }

        public /* bridge */ /* synthetic */ void processElement2(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
            processElement2((String) obj, (CoProcessFunction<String, String, String>.Context) context, (Collector<String>) collector);
        }

        public /* bridge */ /* synthetic */ void processElement1(Object obj, CoProcessFunction.Context context, Collector collector) throws Exception {
            processElement1((String) obj, (CoProcessFunction<String, String, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase$ShortLivedEmptySource.class */
    private static class ShortLivedEmptySource extends RichSourceFunction<String> {
        private ShortLivedEmptySource() {
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase$SinkWaitingForWatermark.class */
    private static class SinkWaitingForWatermark implements SinkFunction<String> {
        private SinkWaitingForWatermark() {
        }

        public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
            if (watermark.getTimestamp() > FinishedSourcesWatermarkITCase.CHECKPOINT_10_WATERMARK.get()) {
                FinishedSourcesWatermarkITCase.DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.set(true);
            }
        }
    }

    @Test
    public void testTwoConsecutiveFinishedTasksShouldPropagateMaxWatermark() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
        createLocalEnvironment.disableOperatorChaining();
        createLocalEnvironment.enableCheckpointing(200L);
        createLocalEnvironment.addSource(new LongRunningSource(), "Long Running Source").connect(createLocalEnvironment.addSource(new ShortLivedEmptySource(), "Short Lived Source").map(str -> {
            return str;
        }).name("Empty Stream Map")).process(new NoopCoProcessFunction()).name("Join").addSink(new SinkWaitingForWatermark());
        createLocalEnvironment.execute();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1954490041:
                if (implMethodName.equals("lambda$testTwoConsecutiveFinishedTasksShouldPropagateMaxWatermark$e8f65cf2$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
