/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class DispatcherResourceCleanupTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private static final Time timeout = Time.seconds((long)10L);
    private static TestingRpcService rpcService;
    private JobID jobId;
    private JobGraph jobGraph;
    private Configuration configuration;
    private SingleRunningJobsRegistry runningJobsRegistry;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private OneShotLatch clearedJobLatch;
    private TestingDispatcher dispatcher;
    private DispatcherGateway dispatcherGateway;
    private BlobServer blobServer;
    private PermanentBlobKey permanentBlobKey;
    private File blobFile;
    private CompletableFuture<BlobKey> storedHABlobFuture;
    private CompletableFuture<JobID> deleteAllHABlobsFuture;
    private CompletableFuture<JobID> cleanupJobFuture;
    private CompletableFuture<JobID> cleanupJobHADataFuture;
    private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
    private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        this.jobId = this.jobGraph.getJobID();
        this.configuration = new Configuration();
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.clearedJobLatch = new OneShotLatch();
        this.runningJobsRegistry = new SingleRunningJobsRegistry(this.jobId, this.clearedJobLatch);
        this.highAvailabilityServices.setRunningJobsRegistry(this.runningJobsRegistry);
        this.cleanupJobHADataFuture = new CompletableFuture();
        this.highAvailabilityServices.setCleanupJobDataFuture(this.cleanupJobHADataFuture);
        this.storedHABlobFuture = new CompletableFuture();
        this.deleteAllHABlobsFuture = new CompletableFuture();
        TestingBlobStore testingBlobStore = new TestingBlobStoreBuilder().setPutFunction(putArguments -> this.storedHABlobFuture.complete((BlobKey)putArguments.f2)).setDeleteAllFunction(this.deleteAllHABlobsFuture::complete).createTestingBlobStore();
        this.cleanupJobFuture = new CompletableFuture();
        this.blobServer = new TestingBlobServer(this.configuration, testingBlobStore, this.cleanupJobFuture);
        this.permanentBlobKey = this.blobServer.putPermanent(this.jobId, new byte[256]);
        this.jobGraph.addUserJarBlobKey(this.permanentBlobKey);
        this.blobFile = this.blobServer.getStorageLocation(this.jobId, (BlobKey)this.permanentBlobKey);
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)true));
        Assert.assertThat((Object)this.storedHABlobFuture.get(), (Matcher)Matchers.equalTo((Object)this.permanentBlobKey));
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception {
        return this.startDispatcherAndSubmitJob(0);
    }

    private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob(int numBlockingJobManagerRunners) throws Exception {
        TestingJobManagerRunnerFactory testingJobManagerRunnerFactoryNG = new TestingJobManagerRunnerFactory(numBlockingJobManagerRunners);
        this.startDispatcher(testingJobManagerRunnerFactoryNG);
        this.submitJob();
        return testingJobManagerRunnerFactoryNG;
    }

    private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
        MemoryExecutionGraphInfoStore archivedExecutionGraphStore = new MemoryExecutionGraphInfoStore();
        this.dispatcher = new TestingDispatcher(rpcService, DispatcherId.generate(), Collections.emptyList(), (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), new DispatcherServices(this.configuration, (HighAvailabilityServices)this.highAvailabilityServices, () -> CompletableFuture.completedFuture(resourceManagerGateway), this.blobServer, heartbeatServices, (ExecutionGraphInfoStore)archivedExecutionGraphStore, (FatalErrorHandler)this.testingFatalErrorHandlerResource.getFatalErrorHandler(), this.historyServerArchivist, null, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), this.jobGraphWriter, jobManagerRunnerFactory, (Executor)ForkJoinPool.commonPool()));
        this.dispatcher.start();
        this.dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
    }

    @After
    public void teardown() throws Exception {
        if (this.dispatcher != null) {
            this.dispatcher.close();
        }
    }

    @AfterClass
    public static void teardownClass() throws ExecutionException, InterruptedException {
        if (rpcService != null) {
            rpcService.stopService().get();
        }
    }

    @Test
    public void testBlobServerCleanupWhenJobFinished() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        this.finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        this.assertThatHABlobsHaveBeenRemoved();
    }

    private void assertThatHABlobsHaveBeenRemoved() throws InterruptedException, ExecutionException {
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)false));
    }

    private void submitJob() throws InterruptedException, ExecutionException {
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        submissionFuture.get();
    }

    @Test
    public void testBlobServerCleanupWhenJobNotFinished() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        this.suspendJob(testingJobManagerRunner);
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)false));
        try {
            this.deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not delete the HA blobs.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
        this.startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception")));
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        try {
            submissionFuture.get();
            Assert.fail((String)"Job submission was expected to fail.");
        }
        catch (ExecutionException ee) {
            Assert.assertThat((Object)ee, (Matcher)FlinkMatchers.containsCause(JobSubmissionException.class));
        }
        this.assertThatHABlobsHaveBeenRemoved();
    }

    @Test
    public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
        this.startDispatcherAndSubmitJob();
        this.dispatcher.closeAsync().get();
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)false));
        try {
            this.deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not delete the HA blobs.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testHACleanupWhenJobFinishedWhileClosingDispatcher() throws Exception {
        TestingJobManagerRunner testingJobManagerRunner = TestingJobManagerRunner.newBuilder().setBlockingTermination(true).setJobId(this.jobId).build();
        ArrayDeque<TestingJobManagerRunner> jobManagerRunners = new ArrayDeque<TestingJobManagerRunner>(Arrays.asList(testingJobManagerRunner));
        this.startDispatcher(new QueueJobManagerRunnerFactory(jobManagerRunners));
        this.submitJob();
        CompletableFuture dispatcherTerminationFuture = this.dispatcher.closeAsync();
        testingJobManagerRunner.getCloseAsyncCalledLatch().await();
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build()));
        testingJobManagerRunner.completeTerminationFuture();
        dispatcherTerminationFuture.get();
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.is((Object)this.jobId));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.get(), (Matcher)Matchers.is((Object)this.jobId));
    }

    @Test
    public void testRunningJobsRegistryCleanup() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        this.runningJobsRegistry.setJobRunning(this.jobId);
        Assert.assertThat((Object)this.runningJobsRegistry.contains(this.jobId), (Matcher)Matchers.is((Object)true));
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(this.jobId).build()));
        this.clearedJobLatch.await();
        Assert.assertThat((Object)this.runningJobsRegistry.contains(this.jobId), (Matcher)Matchers.is((Object)false));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobSubmissionUnderSameJobId() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(1);
        this.runningJobsRegistry.setJobRunning(this.jobId);
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        this.suspendJob(testingJobManagerRunner);
        testingJobManagerRunner.getCloseAsyncCalledLatch().await();
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        try {
            submissionFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"The job submission future should not complete until the previous JobManager termination future has been completed.");
        }
        catch (TimeoutException timeoutException) {
        }
        finally {
            testingJobManagerRunner.completeTerminationFuture();
        }
        Assert.assertThat(submissionFuture.get(), (Matcher)Matchers.equalTo((Object)Acknowledge.get()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exception {
        TestingJobManagerRunnerFactory testingJobManagerRunnerFactoryNG = this.startDispatcherAndSubmitJob();
        CompletableFuture submissionFuture = this.dispatcherGateway.submitJob(this.jobGraph, timeout);
        try {
            try {
                submissionFuture.get();
                Assert.fail((String)"Expected a DuplicateJobSubmissionFailure.");
            }
            catch (ExecutionException ee) {
                Assert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)ee, DuplicateJobSubmissionException.class).isPresent(), (Matcher)Matchers.is((Object)true));
            }
            this.assertThatHABlobsHaveNotBeenRemoved();
        }
        finally {
            this.finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner());
        }
        this.assertThatHABlobsHaveBeenRemoved();
    }

    @Test
    public void testHaDataCleanupWhenJobFinished() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        TestingJobManagerRunner jobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        this.finishJob(jobManagerRunner);
        JobID jobID = this.cleanupJobHADataFuture.get(2000L, TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)jobID, (Matcher)Matchers.is((Object)this.jobId));
    }

    @Test
    public void testHaDataCleanupWhenJobNotFinished() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        TestingJobManagerRunner jobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        this.suspendJob(jobManagerRunner);
        try {
            this.cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not delete the HA data for job.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertThat((Object)this.cleanupJobHADataFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
        this.terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.FINISHED);
    }

    private void suspendJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
        this.terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.SUSPENDED);
    }

    private void terminateJobWithState(TestingJobManagerRunner takeCreatedJobManagerRunner, JobStatus state) {
        takeCreatedJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(state).build()));
    }

    private void assertThatHABlobsHaveNotBeenRemoved() {
        Assert.assertThat((Object)this.cleanupJobFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)this.blobFile.exists(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testDispatcherTerminationTerminatesRunningJobMasters() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        this.dispatcher.closeAsync().get();
        TestingJobManagerRunner jobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        Assert.assertThat((Object)jobManagerRunner.getTerminationFuture().isDone(), (Matcher)Matchers.is((Object)true));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDispatcherTerminationWaitsForJobMasterTerminations() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(1);
        CompletableFuture dispatcherTerminationFuture = this.dispatcher.closeAsync();
        try {
            dispatcherTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"We should not terminate before all running JobMasters have terminated.");
        }
        catch (TimeoutException timeoutException) {
        }
        finally {
            jobManagerRunnerFactory.takeCreatedJobManagerRunner().completeTerminationFuture();
        }
        dispatcherTerminationFuture.get();
    }

    @Test
    public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception {
        this.jobGraphWriter = TestingJobGraphStore.newBuilder().setRemoveJobGraphConsumer((ThrowingConsumer<JobID, ? extends Exception>)((ThrowingConsumer)ignored -> {
            throw new Exception("Failed to Remove future");
        })).withAutomaticStart().build();
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.CANCELED).build();
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.isDone(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception {
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob();
        ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.CANCELED).build();
        TestingJobManagerRunner testingJobManagerRunner = jobManagerRunnerFactory.takeCreatedJobManagerRunner();
        testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph));
        Assert.assertThat((Object)this.cleanupJobFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
        Assert.assertThat((Object)this.deleteAllHABlobsFuture.get(), (Matcher)Matchers.equalTo((Object)this.jobId));
    }

    @Test
    public void testArchivingFinishedJobToHistoryServer() throws Exception {
        OneShotLatch archivingLatch = new OneShotLatch();
        CompletableFuture<Acknowledge> archiveFuture = new CompletableFuture<Acknowledge>();
        this.historyServerArchivist = executionGraphInfo -> {
            archivingLatch.trigger();
            return archiveFuture;
        };
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(0);
        this.finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        archivingLatch.await();
        this.assertThatNoCleanupWasTriggered();
        archiveFuture.complete(Acknowledge.get());
        this.assertGlobalCleanupTriggered(this.jobId);
    }

    @Test
    public void testNotArchivingSuspendedJobToHistoryServer() throws Exception {
        AtomicBoolean isArchived = new AtomicBoolean(false);
        this.historyServerArchivist = executionGraphInfo -> {
            isArchived.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        };
        TestingJobManagerRunnerFactory jobManagerRunnerFactory = this.startDispatcherAndSubmitJob(0);
        this.suspendJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner());
        this.assertLocalCleanupTriggered(this.jobId);
        this.dispatcher.getJobTerminationFuture(this.jobId, Time.hours((long)1L)).join();
        Assert.assertFalse((String)"Archiving should not be triggered for a non-globally terminal job.", (boolean)isArchived.get());
    }

    private void assertThatNoCleanupWasTriggered() {
        Assert.assertFalse((boolean)this.cleanupJobFuture.isDone());
        Assert.assertFalse((boolean)this.deleteAllHABlobsFuture.isDone());
        Assert.assertFalse((boolean)this.cleanupJobHADataFuture.isDone());
    }

    private void assertLocalCleanupTriggered(JobID jobId) throws ExecutionException, InterruptedException {
        Assert.assertEquals((Object)this.cleanupJobFuture.get(), (Object)jobId);
        Assert.assertFalse((boolean)this.deleteAllHABlobsFuture.isDone());
        Assert.assertFalse((boolean)this.cleanupJobHADataFuture.isDone());
    }

    private void assertGlobalCleanupTriggered(JobID jobId) throws ExecutionException, InterruptedException {
        Assert.assertEquals((Object)this.cleanupJobFuture.get(), (Object)jobId);
        Assert.assertEquals((Object)this.deleteAllHABlobsFuture.get(), (Object)jobId);
        Assert.assertEquals((Object)this.cleanupJobHADataFuture.get(), (Object)jobId);
    }

    private class FailingJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final Exception testException;

        public FailingJobManagerRunnerFactory(FlinkException testException) {
            this.testException = testException;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) throws Exception {
            throw this.testException;
        }
    }

    private static final class QueueJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final Queue<? extends JobManagerRunner> jobManagerRunners;

        private QueueJobManagerRunnerFactory(Queue<? extends JobManagerRunner> jobManagerRunners) {
            this.jobManagerRunners = jobManagerRunners;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) {
            return Optional.ofNullable(this.jobManagerRunners.poll()).orElseThrow(() -> new IllegalStateException("Cannot create more JobManagerRunners."));
        }
    }

    private static final class TestingBlobServer
    extends BlobServer {
        private final CompletableFuture<JobID> cleanupJobFuture;

        public TestingBlobServer(Configuration config, BlobStore blobStore, CompletableFuture<JobID> cleanupJobFuture) throws IOException {
            super(config, blobStore);
            this.cleanupJobFuture = cleanupJobFuture;
        }

        public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) {
            boolean result = super.cleanupJob(jobId, cleanupBlobStoreFiles);
            this.cleanupJobFuture.complete(jobId);
            return result;
        }
    }

    private static final class SingleRunningJobsRegistry
    implements RunningJobsRegistry {
        @Nonnull
        private final JobID expectedJobId;
        @Nonnull
        private final OneShotLatch clearedJobLatch;
        private RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.PENDING;
        private boolean containsJob = false;

        private SingleRunningJobsRegistry(@Nonnull JobID expectedJobId, @Nonnull OneShotLatch clearedJobLatch) {
            this.expectedJobId = expectedJobId;
            this.clearedJobLatch = clearedJobLatch;
        }

        public void setJobRunning(JobID jobID) {
            this.checkJobId(jobID);
            this.containsJob = true;
            this.jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.RUNNING;
        }

        private void checkJobId(JobID jobID) {
            Preconditions.checkArgument((boolean)this.expectedJobId.equals((Object)jobID));
        }

        public void setJobFinished(JobID jobID) {
            this.checkJobId(jobID);
            this.containsJob = true;
            this.jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.DONE;
        }

        public RunningJobsRegistry.JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
            this.checkJobId(jobID);
            return this.jobSchedulingStatus;
        }

        public boolean contains(JobID jobId) {
            this.checkJobId(jobId);
            return this.containsJob;
        }

        public void clearJob(JobID jobID) {
            this.checkJobId(jobID);
            this.containsJob = false;
            this.clearedJobLatch.trigger();
        }
    }
}

