package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.class */
public class SinkWriterOperatorTest extends TestLogger {

    @Parameterized.Parameter
    public boolean stateful;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$BufferingSinkWriter.class */
    private static class BufferingSinkWriter extends TestSink.DefaultSinkWriter<Integer> {
        private BufferingSinkWriter() {
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> prepareCommit(boolean z) {
            if (!z) {
                return Collections.emptyList();
            }
            List<String> list = this.elements;
            this.elements = new ArrayList();
            return list;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$DummySinkOperator.class */
    private static class DummySinkOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor<>(DUMMY_SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;

        private DummySinkOperator() {
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.sinkState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(SINK_STATE_DESC), TestSink.StringCommittableSerializer.INSTANCE);
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.sinkState.add(streamRecord.getValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$PreBarrierSinkWriter.class */
    private static class PreBarrierSinkWriter extends TestSink.DefaultSinkWriter<Integer> {
        private boolean receivedPreCommit;

        private PreBarrierSinkWriter() {
            this.receivedPreCommit = false;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> prepareCommit(boolean z) {
            this.receivedPreCommit = true;
            return Collections.emptyList();
        }

        public boolean hasReceivedPreCommit() {
            return this.receivedPreCommit;
        }

        public List<Watermark> getWatermarks() {
            return this.watermarks;
        }

        public List<String> getElements() {
            return this.elements;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$SnapshottingBufferingSinkWriter.class */
    public static class SnapshottingBufferingSinkWriter extends BufferingSinkWriter {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId;

        private SnapshottingBufferingSinkWriter() {
            super();
            this.lastCheckpointId = -1L;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public List<String> snapshotState(long j) throws IOException {
            this.lastCheckpointId = j;
            return this.elements;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        void restoredFrom(List<String> list) {
            this.elements = new ArrayList(list);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest$TimeBasedBufferingSinkWriter.class */
    private static class TimeBasedBufferingSinkWriter extends TestSink.DefaultSinkWriter<Integer> implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables;

        private TimeBasedBufferingSinkWriter() {
            this.cachedCommittables = new ArrayList();
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public void write(Integer num, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of(num, context.timestamp(), Long.valueOf(context.currentWatermark())).toString());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSink.DefaultSinkWriter
        public void setProcessingTimerService(Sink.ProcessingTimeService processingTimeService) {
            super.setProcessingTimerService(processingTimeService);
            this.processingTimerService.registerProcessingTimer(1000L, this);
        }

        public void onProcessingTime(long j) throws IOException {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(j + 1000, this);
        }
    }

    @Parameterized.Parameters(name = "Stateful: {0}")
    public static Collection<Object> data() {
        return Arrays.asList(true, false);
    }

    @Test
    public void nonBufferingWriterEmitsWithoutFlush() throws Exception {
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(new TestSink.DefaultSinkWriter<>());
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.prepareSnapshotPreBarrier(1L);
        createTestHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createTestHarness.getOutput()), Matchers.contains(new StreamElement[]{new org.apache.flink.streaming.api.watermark.Watermark(0L), new StreamRecord(Tuple3.of(1, 1L, 0L).toString()), new StreamRecord(Tuple3.of(2, 2L, 0L).toString())}));
    }

    @Test
    public void nonBufferingWriterEmitsOnFlush() throws Exception {
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(new TestSink.DefaultSinkWriter<>());
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createTestHarness.getOutput()), Matchers.contains(new StreamElement[]{new org.apache.flink.streaming.api.watermark.Watermark(0L), new StreamRecord(Tuple3.of(1, 1L, 0L).toString()), new StreamRecord(Tuple3.of(2, 2L, 0L).toString())}));
    }

    @Test
    public void bufferingWriterDoesNotEmitWithoutFlush() throws Exception {
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(new BufferingSinkWriter());
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.prepareSnapshotPreBarrier(1L);
        createTestHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createTestHarness.getOutput()), Matchers.contains(new StreamElement[]{new org.apache.flink.streaming.api.watermark.Watermark(0L)}));
    }

    @Test
    public void bufferingWriterEmitsOnFlush() throws Exception {
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(new BufferingSinkWriter());
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createTestHarness.getOutput()), Matchers.contains(new StreamElement[]{new org.apache.flink.streaming.api.watermark.Watermark(0L), new StreamRecord(Tuple3.of(1, 1L, 0L).toString()), new StreamRecord(Tuple3.of(2, 2L, 0L).toString())}));
    }

    @Test
    public void timeBasedBufferingSinkWriter() throws Exception {
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(new TimeBasedBufferingSinkWriter());
        createTestHarness.open();
        createTestHarness.setProcessingTime(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.prepareSnapshotPreBarrier(1L);
        MatcherAssert.assertThat(Integer.valueOf(createTestHarness.getOutput().size()), Matchers.equalTo(0));
        createTestHarness.getProcessingTimeService().setCurrentTime(2001L);
        createTestHarness.prepareSnapshotPreBarrier(2L);
        createTestHarness.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createTestHarness.getOutput()), Matchers.contains(new StreamElement[]{new StreamRecord(Tuple3.of(1, 1L, Long.MIN_VALUE).toString()), new StreamRecord(Tuple3.of(2, 2L, Long.MIN_VALUE).toString())}));
    }

    @Test
    public void watermarkPropagatedToSinkWriter() throws Exception {
        TestSink.DefaultSinkWriter<Integer> defaultSinkWriter = new TestSink.DefaultSinkWriter<>();
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(defaultSinkWriter);
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processWatermark(1L);
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createTestHarness.getOutput()), Matchers.contains(new StreamElement[]{new org.apache.flink.streaming.api.watermark.Watermark(0L), new org.apache.flink.streaming.api.watermark.Watermark(1L)}));
        MatcherAssert.assertThat(defaultSinkWriter.watermarks, Matchers.contains(new Watermark[]{new Watermark(0L), new Watermark(1L)}));
    }

    @Test
    public void stateIsRestored() throws Exception {
        SnapshottingBufferingSinkWriter snapshottingBufferingSinkWriter = new SnapshottingBufferingSinkWriter();
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(snapshottingBufferingSinkWriter);
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = createTestHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(createTestHarness.getOutput(), Matchers.contains(new Object[]{new org.apache.flink.streaming.api.watermark.Watermark(0L)}));
        MatcherAssert.assertThat(Long.valueOf(snapshottingBufferingSinkWriter.lastCheckpointId), Matchers.equalTo(Long.valueOf(this.stateful ? 1L : -1L)));
        createTestHarness.close();
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness2 = createTestHarness(new SnapshottingBufferingSinkWriter());
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.endInput();
        ArrayList arrayList = new ArrayList();
        if (this.stateful) {
            arrayList.add(new StreamRecord(Tuple3.of(1, 1L, 0L).toString()));
            arrayList.add(new StreamRecord(Tuple3.of(2, 2L, 0L).toString()));
        }
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createTestHarness2.getOutput()), Matchers.equalTo(arrayList));
    }

    @Test
    public void loadPreviousSinkState() throws Exception {
        List asList = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new DummySinkOperator(), (TypeSerializer) StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness<Integer, byte[]> createCompatibleSinkOperator = createCompatibleSinkOperator();
        List arrayList = this.stateful ? (List) asList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()) : new ArrayList();
        arrayList.add(new StreamRecord(Tuple3.of(1, 1, Long.MIN_VALUE).toString()));
        createCompatibleSinkOperator.initializeState(buildSubtaskState);
        createCompatibleSinkOperator.open();
        createCompatibleSinkOperator.processElement(1, 1L);
        createCompatibleSinkOperator.endInput();
        OperatorSubtaskState snapshot = createCompatibleSinkOperator.snapshot(1L, 1L);
        createCompatibleSinkOperator.close();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createCompatibleSinkOperator.getOutput()), Matchers.containsInAnyOrder(arrayList.toArray()));
        OneInputStreamOperatorTestHarness<Integer, byte[]> createCompatibleSinkOperator2 = createCompatibleSinkOperator();
        List asList2 = Arrays.asList(new StreamRecord(Tuple3.of(2, 2, Long.MIN_VALUE).toString()), new StreamRecord(Tuple3.of(3, 3, Long.MIN_VALUE).toString()));
        createCompatibleSinkOperator2.initializeState(snapshot);
        createCompatibleSinkOperator2.open();
        createCompatibleSinkOperator2.processElement(2, 2L);
        createCompatibleSinkOperator2.processElement(3, 3L);
        createCompatibleSinkOperator2.endInput();
        MatcherAssert.assertThat(SinkTestUtil.fromOutput(createCompatibleSinkOperator2.getOutput()), Matchers.containsInAnyOrder(asList2.toArray()));
    }

    @Test
    public void receivePreCommitWithoutCommitter() throws Exception {
        PreBarrierSinkWriter preBarrierSinkWriter = new PreBarrierSinkWriter();
        OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness = createTestHarness(preBarrierSinkWriter, false);
        createTestHarness.open();
        createTestHarness.processWatermark(0L);
        createTestHarness.processElement(1, 1L);
        createTestHarness.processElement(2, 2L);
        createTestHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertTrue(preBarrierSinkWriter.hasReceivedPreCommit());
        createTestHarness.snapshot(1L, 1L);
        MatcherAssert.assertThat(preBarrierSinkWriter.getElements(), Matchers.contains(new String[]{Tuple3.of(1, 1L, 0L).toString(), Tuple3.of(2, 2L, 0L).toString()}));
        MatcherAssert.assertThat(preBarrierSinkWriter.getWatermarks(), Matchers.contains(new Watermark[]{new Watermark(0L)}));
    }

    private OneInputStreamOperatorTestHarness<Integer, byte[]> createCompatibleSinkOperator() throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new SinkOperatorFactory(getBuilder(new SnapshottingBufferingSinkWriter()).setCompatibleStateNames("dummy_sink_state").build(), false, true), (TypeSerializer) IntSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness(TestSink.DefaultSinkWriter<Integer> defaultSinkWriter) throws Exception {
        return createTestHarness(defaultSinkWriter, true);
    }

    private OneInputStreamOperatorTestHarness<Integer, byte[]> createTestHarness(TestSink.DefaultSinkWriter<Integer> defaultSinkWriter, boolean z) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new SinkOperatorFactory(getBuilder(defaultSinkWriter).build(), false, z), (TypeSerializer) IntSerializer.INSTANCE);
    }

    private TestSink.Builder<Integer> getBuilder(TestSink.DefaultSinkWriter<Integer> defaultSinkWriter) {
        TestSink.Builder<Integer> committableSerializer = TestSink.newBuilder().setWriter(defaultSinkWriter).setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE);
        if (this.stateful) {
            committableSerializer.withWriterState();
        }
        return committableSerializer;
    }
}
