/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestJobExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(TestJobExecutor.class);
    private final MiniClusterWithClientResource miniClusterResource;
    private final TestJobWithDescription testJob;
    private final JobID jobID;

    private TestJobExecutor(TestJobWithDescription testJob, JobID jobID, MiniClusterWithClientResource miniClusterResource) {
        this.testJob = testJob;
        this.jobID = jobID;
        this.miniClusterResource = miniClusterResource;
    }

    public static TestJobExecutor execute(TestJobWithDescription testJob, MiniClusterWithClientResource miniClusterResource) throws Exception {
        LOG.debug("submitGraph: {}", (Object)testJob.jobGraph);
        JobID job = (JobID)miniClusterResource.getClusterClient().submitJob(testJob.jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniClusterResource.getMiniCluster(), (JobID)job, (boolean)false);
        return new TestJobExecutor(testJob, job, miniClusterResource);
    }

    public TestJobExecutor waitForAllRunning() throws Exception {
        LOG.debug("waitForAllRunning in {}", (Object)this.jobID);
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)this.miniClusterResource.getMiniCluster(), (JobID)this.jobID, (boolean)true);
        return this;
    }

    public TestJobExecutor waitForEvent(Class<? extends TestEvent> eventClass) throws Exception {
        LOG.debug("waitForEvent: {}", (Object)eventClass.getSimpleName());
        this.testJob.eventQueue.withHandler(e -> eventClass.isAssignableFrom(e.getClass()) ? TestEventQueue.TestEventHandler.TestEventNextAction.STOP : TestEventQueue.TestEventHandler.TestEventNextAction.CONTINUE);
        return this;
    }

    public TestJobExecutor stopWithSavepoint(TemporaryFolder folder, boolean withDrain) throws Exception {
        LOG.debug("stopWithSavepoint: {} (withDrain: {})", (Object)folder, (Object)withDrain);
        ClusterClient client = this.miniClusterResource.getClusterClient();
        client.stopWithSavepoint(this.jobID, withDrain, folder.newFolder().toString(), SavepointFormatType.CANONICAL).get();
        return this;
    }

    public TestJobExecutor sendOperatorCommand(String operatorID, TestCommand command, TestCommandDispatcher.TestCommandScope scope) {
        LOG.debug("send command: {} to {}/{}", new Object[]{command, operatorID, scope});
        this.testJob.commandQueue.dispatch(command, scope, operatorID);
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void triggerFailover(String operatorID) throws Exception {
        LOG.debug("sendCommand: {}", (Object)TestCommand.FAIL);
        LinkedBlockingQueue<TestEvent> queue = new LinkedBlockingQueue<TestEvent>();
        Consumer<TestEvent> listener = queue::add;
        this.testJob.eventQueue.addListener(listener);
        this.testJob.commandQueue.dispatch(TestCommand.FAIL, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, operatorID);
        try {
            this.waitForFailover(queue);
        }
        catch (TimeoutException e) {
            this.handleFailoverTimeout(e);
        }
        finally {
            this.testJob.eventQueue.removeListener(listener);
        }
        this.waitForAllRunning();
    }

    private void waitForFailover(BlockingQueue<TestEvent> queue) throws Exception {
        int timeoutMs = 10000;
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMillis(timeoutMs));
        String operatorId = null;
        int subtaskId = -1;
        int attemptNumber = -1;
        while (deadline.hasTimeLeft()) {
            TestEvent e = queue.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            if (e instanceof TestCommandAckEvent) {
                TestCommandAckEvent ack = (TestCommandAckEvent)e;
                if (ack.getCommand() != TestCommand.FAIL) continue;
                operatorId = ack.operatorId;
                subtaskId = ack.subtaskIndex;
                attemptNumber = ack.getAttemptNumber();
                continue;
            }
            if (!(e instanceof OperatorStartedEvent) || operatorId == null) continue;
            OperatorStartedEvent started = (OperatorStartedEvent)e;
            if (!started.operatorId.equals(operatorId) || started.subtaskIndex != subtaskId || started.getAttemptNumber() < attemptNumber) continue;
            return;
        }
        throw new TimeoutException("No subtask restarted in " + timeoutMs + "ms");
    }

    private void handleFailoverTimeout(TimeoutException e) throws Exception {
        String message = String.format("Unable to failover the job: %s; job status: %s", e.getMessage(), this.miniClusterResource.getClusterClient().getJobStatus(this.jobID).get());
        Optional throwable = ((JobResult)this.miniClusterResource.getClusterClient().requestJobResult(this.jobID).get()).getSerializedThrowable();
        if (throwable.isPresent()) {
            throw new RuntimeException(message, (Throwable)throwable.get());
        }
        throw new RuntimeException(message);
    }

    public TestJobExecutor sendBroadcastCommand(TestCommand command, TestCommandDispatcher.TestCommandScope scope) {
        LOG.debug("sendCommand: {}", (Object)command);
        this.testJob.commandQueue.broadcast(command, scope);
        return this;
    }

    public TestJobExecutor waitForTermination() throws Exception {
        LOG.debug("waitForTermination");
        while (!((JobStatus)this.miniClusterResource.getClusterClient().getJobStatus(this.jobID).get()).isGloballyTerminalState()) {
            Thread.sleep(1L);
        }
        return this;
    }

    public TestJobExecutor assertFinishedSuccessfully() throws Exception {
        LOG.debug("assertFinishedSuccessfully");
        JobStatus jobStatus = (JobStatus)this.miniClusterResource.getClusterClient().getJobStatus(this.jobID).get();
        if (!jobStatus.equals((Object)JobStatus.FINISHED)) {
            String message = String.format("Job didn't finish successfully, status: %s", jobStatus);
            Optional throwable = ((JobResult)this.miniClusterResource.getClusterClient().requestJobResult(this.jobID).get()).getSerializedThrowable();
            if (throwable.isPresent()) {
                throw new AssertionError(message, (Throwable)throwable.get());
            }
            Assert.fail((String)message);
        }
        return this;
    }

    public TestJobExecutor waitForSubtasksToFinish(JobVertexID id, TestCommandDispatcher.TestCommandScope scope) throws Exception {
        LOG.debug("waitForSubtasksToFinish vertex {}, all subtasks: {}", (Object)id, (Object)scope);
        CommonTestUtils.waitForSubtasksToFinish((MiniCluster)this.miniClusterResource.getMiniCluster(), (JobID)this.jobID, (JobVertexID)id, (scope == TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS ? 1 : 0) != 0);
        return this;
    }
}

