/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.testutils;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.jupiter.api.Assertions;

public class CommonTestUtils {
    private static final long RETRY_INTERVAL = 100L;

    public static String getCurrentClasspath() {
        RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
        return bean.getClassPath();
    }

    public static File createTemporaryLog4JProperties() throws IOException {
        File log4jProps = File.createTempFile(FileUtils.getRandomFilename((String)""), "-log4j.properties");
        log4jProps.deleteOnExit();
        CommonTestUtils.printLog4jDebugConfig(log4jProps);
        return log4jProps;
    }

    public static String getJavaCommandPath() {
        Process process;
        ProcessBuilder bld2;
        File javaHome = new File(System.getProperty("java.home"));
        String path1 = new File(javaHome, "java").getAbsolutePath();
        String path2 = new File(new File(javaHome, "bin"), "java").getAbsolutePath();
        try {
            bld2 = new ProcessBuilder(path1, "-version");
            process = bld2.start();
            if (process.waitFor() == 0) {
                return path1;
            }
        }
        catch (Throwable bld2) {
            // empty catch block
        }
        try {
            bld2 = new ProcessBuilder(path2, "-version");
            process = bld2.start();
            if (process.waitFor() == 0) {
                return path2;
            }
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        return null;
    }

    public static void printLog4jDebugConfig(File file) throws IOException {
        try (PrintWriter writer = new PrintWriter(new FileWriter(file));){
            writer.println("rootLogger.level = INFO");
            writer.println("rootLogger.appenderRef.console.ref = ConsoleAppender");
            writer.println("appender.console.name = ConsoleAppender");
            writer.println("appender.console.type = CONSOLE");
            writer.println("appender.console.target = SYSTEM_ERR");
            writer.println("appender.console.layout.type = PatternLayout");
            writer.println("appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-4r [%t] %-5p %c %x - %m%n");
            writer.println("logger.jetty.name = org.eclipse.jetty.util.log");
            writer.println("logger.jetty.level = OFF");
            writer.println("logger.zookeeper.name = org.apache.zookeeper");
            writer.println("logger.zookeeper.level = OFF");
            writer.flush();
        }
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout) throws Exception {
        CommonTestUtils.waitUntilCondition(condition, timeout, 100L);
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryIntervalMillis) throws Exception {
        CommonTestUtils.waitUntilCondition(condition, timeout, retryIntervalMillis, "Condition was not met in given timeout.");
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, String errorMsg) throws Exception {
        CommonTestUtils.waitUntilCondition(condition, timeout, 100L, errorMsg);
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryIntervalMillis, String errorMsg) throws Exception {
        while (timeout.hasTimeLeft() && !((Boolean)condition.get()).booleanValue()) {
            long timeLeft = Math.max(0L, timeout.timeLeft().toMillis());
            Thread.sleep(Math.min(retryIntervalMillis, timeLeft));
        }
        if (!timeout.hasTimeLeft()) {
            throw new TimeoutException(errorMsg);
        }
    }

    public static void waitForAllTaskRunning(MiniCluster miniCluster, JobID jobId, boolean allowFinished) throws Exception {
        CommonTestUtils.waitForAllTaskRunning((SupplierWithException<AccessExecutionGraph, Exception>)((SupplierWithException)() -> CommonTestUtils.getGraph(miniCluster, jobId)), allowFinished);
    }

    private static AccessExecutionGraph getGraph(MiniCluster miniCluster, JobID jobId) throws InterruptedException, ExecutionException, TimeoutException {
        return (AccessExecutionGraph)miniCluster.getExecutionGraph(jobId).get(60L, TimeUnit.SECONDS);
    }

    public static void waitForAllTaskRunning(SupplierWithException<AccessExecutionGraph, Exception> executionGraphSupplier, boolean allowFinished) throws Exception {
        CommonTestUtils.waitForAllTaskRunning(executionGraphSupplier, Deadline.fromNow((Duration)Duration.of(1L, ChronoUnit.MINUTES)), allowFinished);
    }

    public static void waitForAllTaskRunning(SupplierWithException<AccessExecutionGraph, Exception> executionGraphSupplier, Deadline timeout, boolean allowFinished) throws Exception {
        Predicate<AccessExecutionVertex> subtaskPredicate = task -> {
            switch (task.getExecutionState()) {
                case RUNNING: {
                    return true;
                }
                case FINISHED: {
                    if (allowFinished) {
                        return true;
                    }
                    throw new RuntimeException("Sub-Task finished unexpectedly" + task);
                }
            }
            return false;
        };
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            AccessExecutionGraph graph = (AccessExecutionGraph)executionGraphSupplier.get();
            if (graph.getState().isGloballyTerminalState()) {
                ErrorInfo failureInfo = graph.getFailureInfo();
                Assertions.fail((String)String.format("Graph is in globally terminal state (%s)", graph.getState()), (Throwable)(failureInfo != null ? failureInfo.getException() : null));
            }
            return graph.getState() == JobStatus.RUNNING && graph.getAllVertices().values().stream().allMatch(jobVertex -> Arrays.stream(jobVertex.getTaskVertices()).allMatch(subtaskPredicate));
        }), timeout);
    }

    public static void waitForNoTaskRunning(SupplierWithException<JobDetailsInfo, Exception> jobDetailsSupplier, Deadline timeout) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            Map state = ((JobDetailsInfo)jobDetailsSupplier.get()).getJobVerticesPerState();
            Integer numRunningTasks = (Integer)state.get(ExecutionState.RUNNING);
            return numRunningTasks == null || numRunningTasks.equals(0);
        }), timeout, "Some tasks are still running until timeout");
    }

    public static void waitUntilJobManagerIsInitialized(SupplierWithException<JobStatus, Exception> jobStatusSupplier) throws Exception {
        CommonTestUtils.waitUntilJobManagerIsInitialized(jobStatusSupplier, Deadline.fromNow((Duration)Duration.of(1L, ChronoUnit.MINUTES)));
    }

    public static void waitUntilJobManagerIsInitialized(SupplierWithException<JobStatus, Exception> jobStatusSupplier, Deadline timeout) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> jobStatusSupplier.get() != JobStatus.INITIALIZING), timeout, 20L);
    }

    public static void waitForJobStatus(JobClient client, List<JobStatus> expectedStatus, Deadline deadline) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            JobStatus currentStatus = (JobStatus)client.getJobStatus().get();
            if (expectedStatus.contains(currentStatus)) {
                return true;
            }
            if (currentStatus.isTerminalState()) {
                try {
                    client.getJobExecutionResult().get();
                }
                catch (Exception e) {
                    throw new IllegalStateException(String.format("Job has entered %s state, but expecting %s", currentStatus, expectedStatus), e);
                }
                throw new IllegalStateException(String.format("Job has entered a terminal state %s, but expecting %s", currentStatus, expectedStatus));
            }
            return false;
        }), deadline);
    }

    public static void terminateJob(JobClient client, Duration timeout) throws Exception {
        client.cancel().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    public static void waitForSubtasksToFinish(MiniCluster miniCluster, JobID job, JobVertexID id, boolean allSubtasks) throws Exception {
        Predicate<AccessExecutionVertex> subtaskPredicate = subtask -> {
            ExecutionState state = subtask.getExecutionState();
            if (state == ExecutionState.FINISHED) {
                return true;
            }
            if (state.isTerminal()) {
                throw new RuntimeException(String.format("Sub-Task %s is already in a terminal state %s", subtask, state));
            }
            return false;
        };
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            AccessExecutionGraph graph = CommonTestUtils.getGraph(miniCluster, job);
            if (graph.getState() != JobStatus.RUNNING) {
                return false;
            }
            Stream<AccessExecutionVertex> vertexStream = Arrays.stream(graph.getAllVertices().values().stream().filter(jv -> jv.getJobVertexId().equals((Object)id)).findAny().orElseThrow(() -> new RuntimeException("Vertex not found " + id)).getTaskVertices());
            return allSubtasks ? vertexStream.allMatch(subtaskPredicate) : vertexStream.anyMatch(subtaskPredicate);
        }), Deadline.fromNow((Duration)Duration.of(1L, ChronoUnit.MINUTES)));
    }

    public static boolean isStreamContentEqual(InputStream input1, InputStream input2) throws IOException {
        int ch2;
        if (!(input1 instanceof BufferedInputStream)) {
            input1 = new BufferedInputStream(input1);
        }
        if (!(input2 instanceof BufferedInputStream)) {
            input2 = new BufferedInputStream(input2);
        }
        int ch = input1.read();
        while (-1 != ch) {
            ch2 = input2.read();
            if (ch != ch2) {
                return false;
            }
            ch = input1.read();
        }
        ch2 = input2.read();
        return ch2 == -1;
    }

    public static class PipeForwarder
    extends Thread {
        private final StringWriter target;
        private final InputStream source;

        public PipeForwarder(InputStream source, StringWriter target) {
            super("Pipe Forwarder");
            this.setDaemon(true);
            this.source = source;
            this.target = target;
            this.start();
        }

        @Override
        public void run() {
            try {
                int next;
                while ((next = this.source.read()) != -1) {
                    this.target.write(next);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

