/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.validation;

import java.util.Collection;
import java.util.HashMap;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorFinishedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestJobDataFlowValidator {
    private static final Logger LOG = LoggerFactory.getLogger(TestJobDataFlowValidator.class);

    public static void checkDataFlow(TestJobWithDescription testJob, boolean withDrain) {
        HashMap<String, Map<Integer, OperatorFinishedEvent>> finishEvents = new HashMap<String, Map<Integer, OperatorFinishedEvent>>();
        for (TestEvent ev : testJob.eventQueue.getAll()) {
            if (!(ev instanceof OperatorFinishedEvent)) continue;
            finishEvents.computeIfAbsent(ev.operatorId, ign -> new HashMap()).put(ev.subtaskIndex, (OperatorFinishedEvent)ev);
        }
        for (JobVertex upstream : testJob.jobGraph.getVertices()) {
            for (IntermediateDataSet produced : upstream.getProducedDataSets()) {
                JobEdge edge = produced.getConsumer();
                Optional<String> upstreamIDOptional = TestJobDataFlowValidator.getTrackedOperatorID(upstream, true, testJob);
                Optional<String> downstreamIDOptional = TestJobDataFlowValidator.getTrackedOperatorID(edge.getTarget(), false, testJob);
                if (upstreamIDOptional.isPresent() && downstreamIDOptional.isPresent()) {
                    String upstreamID = upstreamIDOptional.get();
                    String downstreamID = downstreamIDOptional.get();
                    if (testJob.sources.contains(upstreamID)) {
                        LOG.debug("Legacy sources do not have the finish() method and thus do not emit FinishEvent");
                        continue;
                    }
                    TestJobDataFlowValidator.checkDataFlow(upstreamID, downstreamID, edge, finishEvents, withDrain);
                    continue;
                }
                LOG.debug("Ignoring edge (untracked operator): {}", (Object)edge);
            }
        }
    }

    private static void checkDataFlow(String upstreamID, String downstreamID, JobEdge edge, Map<String, Map<Integer, OperatorFinishedEvent>> finishEvents, boolean withDrain) {
        LOG.debug("Checking {} edge\n  from {} ({})\n  to {} ({})", new Object[]{edge.getDistributionPattern(), edge.getSource().getProducer().getName(), upstreamID, edge.getTarget().getName(), downstreamID});
        Map<Integer, OperatorFinishedEvent> downstreamFinishInfo = TestJobDataFlowValidator.getForOperator(downstreamID, finishEvents, withDrain);
        Map<Integer, OperatorFinishedEvent> upstreamFinishInfo = TestJobDataFlowValidator.getForOperator(upstreamID, finishEvents, withDrain);
        if (withDrain) {
            upstreamFinishInfo.forEach((upstreamIndex, upstreamInfo) -> Assert.assertTrue((String)String.format("No downstream received %s from %s[%d]; received: %s", upstreamInfo.lastSent, upstreamID, upstreamIndex, downstreamFinishInfo), (boolean)TestJobDataFlowValidator.anySubtaskReceived(upstreamID, upstreamIndex, upstreamInfo.lastSent, downstreamFinishInfo.values())));
        }
    }

    private static boolean anySubtaskReceived(String upstreamID, int upstreamIndex, long upstreamValue, Collection<OperatorFinishedEvent> downstreamFinishInfo) {
        return downstreamFinishInfo.stream().anyMatch(event -> event.getLastReceived(upstreamID, upstreamIndex) == upstreamValue);
    }

    private static Map<Integer, OperatorFinishedEvent> getForOperator(String operatorId, Map<String, Map<Integer, OperatorFinishedEvent>> finishEvents, boolean withDrain) {
        Map<Integer, OperatorFinishedEvent> events = finishEvents.get(operatorId);
        if (withDrain) {
            Assert.assertNotNull((String)String.format("Operator finish info wasn't collected with draining: %s (collected: %s)", operatorId, finishEvents), events);
        } else {
            Assert.assertNull((String)String.format("Operator finish info was collected without draining: %s (collected: %s)", operatorId, finishEvents), events);
        }
        return events;
    }

    private static Optional<String> getTrackedOperatorID(JobVertex vertex, boolean upstream, TestJobWithDescription testJob) {
        ListIterator iterator = vertex.getOperatorIDs().listIterator(upstream ? 0 : vertex.getOperatorIDs().size());
        while (upstream ? iterator.hasNext() : iterator.hasPrevious()) {
            OperatorIDPair idPair = upstream ? (OperatorIDPair)iterator.next() : (OperatorIDPair)iterator.previous();
            String id = idPair.getUserDefinedOperatorID().orElse(idPair.getGeneratedOperatorID()).toString();
            if (!testJob.operatorsWithDataFlowTracking.contains(id)) continue;
            return Optional.of(id);
        }
        return Optional.empty();
    }
}

