/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.LongStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricMatchers;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

public class SinkMetricsITCase
extends TestLogger {
    private static final String TEST_SINK_NAME = "MetricTestSink";
    private static final String DEFAULT_WRITER_NAME = "Writer";
    private static final int DEFAULT_PARALLELISM = 4;
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setConfiguration(reporter.addToConfiguration(new Configuration())).build());

    @Test
    public void testMetrics() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int numSplits = Math.max(1, env.getParallelism() - 2);
        int numRecordsPerSplit = 10;
        SharedReference beforeBarrier = this.sharedObjects.add((Object)new CyclicBarrier(numSplits + 1));
        SharedReference afterBarrier = this.sharedObjects.add((Object)new CyclicBarrier(numSplits + 1));
        int stopAtRecord1 = 4;
        int stopAtRecord2 = numRecordsPerSplit - 1;
        env.fromSequence(0L, (long)(numSplits - 1)).flatMap((FlatMapFunction & Serializable)(split, collector) -> LongStream.range(0L, numRecordsPerSplit).forEach(arg_0 -> ((Collector)collector).collect(arg_0))).returns((TypeInformation)BasicTypeInfo.LONG_TYPE_INFO).map((MapFunction & Serializable)i -> {
            if (i % (long)numRecordsPerSplit == (long)stopAtRecord1 || i % (long)numRecordsPerSplit == (long)stopAtRecord2) {
                ((CyclicBarrier)beforeBarrier.get()).await();
                ((CyclicBarrier)afterBarrier.get()).await();
            }
            return i;
        }).sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter().setWriter((TestSink.DefaultSinkWriter)new MetricWriter()).build()).name(TEST_SINK_NAME);
        JobClient jobClient = env.executeAsync();
        JobID jobId = jobClient.getJobID();
        ((CyclicBarrier)beforeBarrier.get()).await();
        this.assertSinkMetrics(jobId, stopAtRecord1, env.getParallelism(), numSplits);
        ((CyclicBarrier)afterBarrier.get()).await();
        ((CyclicBarrier)beforeBarrier.get()).await();
        this.assertSinkMetrics(jobId, stopAtRecord2, env.getParallelism(), numSplits);
        ((CyclicBarrier)afterBarrier.get()).await();
        jobClient.getJobExecutionResult().get();
    }

    private void assertSinkMetrics(JobID jobId, long processedRecordsPerSubtask, int parallelism, int numSplits) {
        List groups = reporter.findOperatorMetricGroups(jobId, "MetricTestSink: Writer");
        MatcherAssert.assertThat((Object)groups, (Matcher)Matchers.hasSize((int)parallelism));
        int subtaskWithMetrics = 0;
        for (OperatorMetricGroup group : groups) {
            Map metrics = reporter.getMetricsByGroup((MetricGroup)group);
            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0L) continue;
            ++subtaskWithMetrics;
            MatcherAssert.assertThat(metrics.get("numRecordsOut"), (Matcher)MetricMatchers.isCounter((Matcher)CoreMatchers.equalTo((Object)processedRecordsPerSubtask)));
            MatcherAssert.assertThat(metrics.get("numBytesOut"), (Matcher)MetricMatchers.isCounter((Matcher)CoreMatchers.equalTo((Object)(processedRecordsPerSubtask * 10L))));
            MatcherAssert.assertThat(metrics.get("numRecordsOutErrors"), (Matcher)MetricMatchers.isCounter((Matcher)CoreMatchers.equalTo((Object)((processedRecordsPerSubtask + 1L) / 2L))));
            MatcherAssert.assertThat(metrics.get("numRecordsSend"), (Matcher)MetricMatchers.isCounter((Matcher)CoreMatchers.equalTo((Object)processedRecordsPerSubtask)));
            MatcherAssert.assertThat(metrics.get("numBytesSend"), (Matcher)MetricMatchers.isCounter((Matcher)CoreMatchers.equalTo((Object)(processedRecordsPerSubtask * 10L))));
            MatcherAssert.assertThat(metrics.get("numRecordsSendErrors"), (Matcher)MetricMatchers.isCounter((Matcher)CoreMatchers.equalTo((Object)((processedRecordsPerSubtask + 1L) / 2L))));
            MatcherAssert.assertThat(metrics.get("currentSendTime"), (Matcher)MetricMatchers.isGauge((Matcher)CoreMatchers.equalTo((Object)((processedRecordsPerSubtask - 1L) * 100L))));
        }
        MatcherAssert.assertThat((Object)subtaskWithMetrics, (Matcher)CoreMatchers.equalTo((Object)numSplits));
    }

    private static class MetricWriter
    extends TestSink.DefaultSinkWriter<Long> {
        static final long BASE_SEND_TIME = 100L;
        static final long RECORD_SIZE_IN_BYTES = 10L;
        private SinkWriterMetricGroup metricGroup;
        private long sendTime;

        private MetricWriter() {
        }

        public void init(Sink.InitContext context) {
            this.metricGroup = context.metricGroup();
            this.metricGroup.setCurrentSendTimeGauge(() -> this.sendTime);
        }

        public void write(Long element, SinkWriter.Context context) {
            super.write((Object)element, context);
            this.sendTime = element * 100L;
            this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
            if (element % 2L == 0L) {
                this.metricGroup.getNumRecordsOutErrorsCounter().inc();
            }
            this.metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(10L);
        }
    }
}

