package org.apache.flink.runtime.webmonitor.history;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.webmonitor.history.HistoryServer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.class */
class HistoryServerArchiveFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
    private static final JsonFactory jacksonFactory = new JsonFactory();
    private static final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
    private static final String JSON_FILE_ENDING = ".json";
    private final List<HistoryServer.RefreshLocation> refreshDirs;
    private final Consumer<ArchiveEvent> jobArchiveEventListener;
    private final boolean processExpiredArchiveDeletion;
    private final boolean processBeyondLimitArchiveDeletion;
    private final int maxHistorySize;
    private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
    private final File webDir;
    private final File webJobDir;
    private final File webOverviewDir;

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher$ArchiveEvent.class */
    public static class ArchiveEvent {
        private final String jobID;
        private final ArchiveEventType operation;

        ArchiveEvent(String str, ArchiveEventType archiveEventType) {
            this.jobID = str;
            this.operation = archiveEventType;
        }

        public String getJobID() {
            return this.jobID;
        }

        public ArchiveEventType getType() {
            return this.operation;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher$ArchiveEventType.class */
    public enum ArchiveEventType {
        CREATED,
        DELETED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HistoryServerArchiveFetcher(List<HistoryServer.RefreshLocation> list, File file, Consumer<ArchiveEvent> consumer, boolean z, int i) throws IOException {
        this.refreshDirs = (List) Preconditions.checkNotNull(list);
        this.jobArchiveEventListener = consumer;
        this.processExpiredArchiveDeletion = z;
        this.maxHistorySize = i;
        this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
        this.cachedArchivesPerRefreshDirectory = new HashMap();
        Iterator<HistoryServer.RefreshLocation> it = list.iterator();
        while (it.hasNext()) {
            this.cachedArchivesPerRefreshDirectory.put(it.next().getPath(), new HashSet());
        }
        this.webDir = (File) Preconditions.checkNotNull(file);
        this.webJobDir = new File(file, "jobs");
        Files.createDirectories(this.webJobDir.toPath(), new FileAttribute[0]);
        this.webOverviewDir = new File(file, "overviews");
        Files.createDirectories(this.webOverviewDir.toPath(), new FileAttribute[0]);
        updateJobOverview(this.webOverviewDir, file);
        if (LOG.isInfoEnabled()) {
            Iterator<HistoryServer.RefreshLocation> it2 = list.iterator();
            while (it2.hasNext()) {
                LOG.info("Monitoring directory {} for archived jobs.", it2.next().getPath());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fetchArchives() {
        try {
            LOG.debug("Starting archive fetching.");
            ArrayList arrayList = new ArrayList();
            HashMap hashMap = new HashMap();
            this.cachedArchivesPerRefreshDirectory.forEach((path, set) -> {
            });
            HashMap hashMap2 = new HashMap();
            for (HistoryServer.RefreshLocation refreshLocation : this.refreshDirs) {
                Path path2 = refreshLocation.getPath();
                LOG.debug("Checking archive directory {}.", path2);
                try {
                    int i = 0;
                    for (FileStatus fileStatus : listArchives(refreshLocation.getFs(), path2)) {
                        Path path3 = fileStatus.getPath();
                        String name = path3.getName();
                        if (isValidJobID(name, path2)) {
                            hashMap.get(path2).remove(name);
                            i++;
                            if (i > this.maxHistorySize && this.processBeyondLimitArchiveDeletion) {
                                hashMap2.computeIfAbsent(path2, path4 -> {
                                    return new HashSet();
                                }).add(path3);
                            } else if (this.cachedArchivesPerRefreshDirectory.get(path2).contains(name)) {
                                LOG.trace("Ignoring archive {} because it was already fetched.", path3);
                            } else {
                                LOG.info("Processing archive {}.", path3);
                                try {
                                    processArchive(name, path3);
                                    arrayList.add(new ArchiveEvent(name, ArchiveEventType.CREATED));
                                    this.cachedArchivesPerRefreshDirectory.get(path2).add(name);
                                    LOG.info("Processing archive {} finished.", path3);
                                } catch (IOException e) {
                                    LOG.error("Failure while fetching/processing job archive for job {}.", name, e);
                                    deleteJobFiles(name);
                                }
                            }
                        }
                    }
                } catch (IOException e2) {
                    LOG.error("Failed to access job archive location for path {}.", path2, e2);
                    hashMap.remove(path2);
                }
            }
            if (hashMap.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).findAny().isPresent() && this.processExpiredArchiveDeletion) {
                arrayList.addAll(cleanupExpiredJobs(hashMap));
            }
            if (!hashMap2.isEmpty() && this.processBeyondLimitArchiveDeletion) {
                arrayList.addAll(cleanupJobsBeyondSizeLimit(hashMap2));
            }
            if (!arrayList.isEmpty()) {
                updateJobOverview(this.webOverviewDir, this.webDir);
            }
            Consumer<ArchiveEvent> consumer = this.jobArchiveEventListener;
            consumer.getClass();
            arrayList.forEach((v1) -> {
                r1.accept(v1);
            });
            LOG.debug("Finished archive fetching.");
        } catch (Exception e3) {
            LOG.error("Critical failure while fetching/processing job archives.", e3);
        }
    }

    private static FileStatus[] listArchives(FileSystem fileSystem, Path path) throws IOException {
        FileStatus[] listStatus = fileSystem.listStatus(path);
        if (listStatus == null) {
            return new FileStatus[0];
        }
        Arrays.sort(listStatus, Comparator.comparingLong((v0) -> {
            return v0.getModificationTime();
        }).reversed());
        return listStatus;
    }

    private static boolean isValidJobID(String str, Path path) {
        try {
            JobID.fromHexString(str);
            return true;
        } catch (IllegalArgumentException e) {
            LOG.debug("Archive directory {} contained file with unexpected name {}. Ignoring file.", new Object[]{path, str, e});
            return false;
        }
    }

    private void processArchive(String str, Path path) throws IOException {
        File file;
        for (ArchivedJson archivedJson : FsJobArchivist.getArchivedJsons(path)) {
            String path2 = archivedJson.getPath();
            String json = archivedJson.getJson();
            if (path2.equals("/jobs/overview")) {
                file = new File(this.webOverviewDir, str + JSON_FILE_ENDING);
            } else if (path2.equals("/joboverview")) {
                LOG.debug("Migrating legacy archive {}", path);
                json = convertLegacyJobOverview(json);
                file = new File(this.webOverviewDir, str + JSON_FILE_ENDING);
            } else {
                file = new File(this.webDir, path2 + JSON_FILE_ENDING);
            }
            try {
                Files.createDirectories(file.getParentFile().toPath(), new FileAttribute[0]);
            } catch (FileAlreadyExistsException e) {
            }
            Files.deleteIfExists(file.toPath());
            Files.createFile(file.toPath(), new FileAttribute[0]);
            FileWriter fileWriter = new FileWriter(file);
            Throwable th = null;
            try {
                try {
                    fileWriter.write(json);
                    fileWriter.flush();
                    if (fileWriter != null) {
                        if (0 != 0) {
                            try {
                                fileWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileWriter.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (fileWriter != null) {
                    if (th != null) {
                        try {
                            fileWriter.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fileWriter.close();
                    }
                }
                throw th3;
            }
        }
    }

    private List<ArchiveEvent> cleanupJobsBeyondSizeLimit(Map<Path, Set<Path>> map) {
        Map<Path, Set<String>> hashMap = new HashMap<>();
        for (Map.Entry<Path, Set<Path>> entry : map.entrySet()) {
            HashSet hashSet = new HashSet();
            for (Path path : entry.getValue()) {
                hashSet.add(path.getName());
                try {
                    path.getFileSystem().delete(path, false);
                } catch (IOException e) {
                    LOG.warn("Could not delete old archive " + path, e);
                }
            }
            hashMap.put(entry.getKey(), hashSet);
        }
        return cleanupExpiredJobs(hashMap);
    }

    private List<ArchiveEvent> cleanupExpiredJobs(Map<Path, Set<String>> map) {
        ArrayList arrayList = new ArrayList();
        LOG.info("Archive directories for jobs {} were deleted.", map);
        map.forEach((path, set) -> {
            this.cachedArchivesPerRefreshDirectory.get(path).removeAll(set);
        });
        map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).forEach(str -> {
            deleteJobFiles(str);
            arrayList.add(new ArchiveEvent(str, ArchiveEventType.DELETED));
        });
        return arrayList;
    }

    private void deleteJobFiles(String str) {
        try {
            Files.deleteIfExists(new File(this.webOverviewDir, str + JSON_FILE_ENDING).toPath());
        } catch (IOException e) {
            LOG.warn("Could not delete file from overview directory.", e);
        }
        try {
            FileUtils.deleteDirectory(new File(this.webJobDir, str));
        } catch (IOException e2) {
            LOG.warn("Could not clean up job directory.", e2);
        }
        try {
            Files.deleteIfExists(new File(this.webJobDir, str + JSON_FILE_ENDING).toPath());
        } catch (IOException e3) {
            LOG.warn("Could not delete file from job directory.", e3);
        }
    }

    private static String convertLegacyJobOverview(String str) throws IOException {
        int asInt;
        JsonNode jsonNode = mapper.readTree(str).get("finished").get(0);
        JobID fromHexString = JobID.fromHexString(jsonNode.get("jid").asText());
        String asText = jsonNode.get("name").asText();
        JobStatus valueOf = JobStatus.valueOf(jsonNode.get("state").asText());
        long asLong = jsonNode.get("start-time").asLong();
        long asLong2 = jsonNode.get("end-time").asLong();
        long asLong3 = jsonNode.get("duration").asLong();
        long asLong4 = jsonNode.get("last-modification").asLong();
        JsonNode jsonNode2 = jsonNode.get("tasks");
        int asInt2 = jsonNode2.get("total").asInt();
        JsonNode jsonNode3 = jsonNode2.get("pending");
        int i = 0;
        int i2 = 0;
        if (jsonNode3 != null) {
            asInt = jsonNode3.asInt();
        } else {
            i = jsonNode2.get("created").asInt();
            asInt = jsonNode2.get("scheduled").asInt();
            i2 = jsonNode2.get("deploying").asInt();
        }
        int asInt3 = jsonNode2.get("running").asInt();
        int asInt4 = jsonNode2.get("finished").asInt();
        int asInt5 = jsonNode2.get("canceling").asInt();
        int asInt6 = jsonNode2.get("canceled").asInt();
        int asInt7 = jsonNode2.get("failed").asInt();
        int[] iArr = new int[ExecutionState.values().length];
        iArr[ExecutionState.CREATED.ordinal()] = i;
        iArr[ExecutionState.SCHEDULED.ordinal()] = asInt;
        iArr[ExecutionState.DEPLOYING.ordinal()] = i2;
        iArr[ExecutionState.RUNNING.ordinal()] = asInt3;
        iArr[ExecutionState.FINISHED.ordinal()] = asInt4;
        iArr[ExecutionState.CANCELING.ordinal()] = asInt5;
        iArr[ExecutionState.CANCELED.ordinal()] = asInt6;
        iArr[ExecutionState.FAILED.ordinal()] = asInt7;
        MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(new JobDetails(fromHexString, asText, asLong, asLong2, asLong3, valueOf, asLong4, iArr, asInt2, new HashMap())));
        StringWriter stringWriter = new StringWriter();
        mapper.writeValue(stringWriter, multipleJobsDetails);
        return stringWriter.toString();
    }

    private static void updateJobOverview(File file, File file2) {
        try {
            JsonGenerator createGenerator = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(file2, "/jobs/overview"));
            Throwable th = null;
            try {
                try {
                    File[] listFiles = new File(file.getPath()).listFiles();
                    if (listFiles != null) {
                        ArrayList arrayList = new ArrayList(listFiles.length);
                        for (File file3 : listFiles) {
                            arrayList.addAll(((MultipleJobsDetails) mapper.readValue(file3, MultipleJobsDetails.class)).getJobs());
                        }
                        mapper.writeValue(createGenerator, new MultipleJobsDetails(arrayList));
                    }
                    if (createGenerator != null) {
                        if (0 != 0) {
                            try {
                                createGenerator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createGenerator.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.error("Failed to update job overview.", e);
        }
    }
}
