package org.apache.flink.table.gateway.workflow.scheduler;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.header.materializedtable.RefreshMaterializedTableHeaders;
import org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableParameters;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableRequestBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableResponseBody;
import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
import org.apache.flink.table.gateway.workflow.WorkflowInfo;
import org.apache.flink.table.shaded.org.quartz.CronScheduleBuilder;
import org.apache.flink.table.shaded.org.quartz.CronTrigger;
import org.apache.flink.table.shaded.org.quartz.Job;
import org.apache.flink.table.shaded.org.quartz.JobBuilder;
import org.apache.flink.table.shaded.org.quartz.JobDetail;
import org.apache.flink.table.shaded.org.quartz.JobExecutionContext;
import org.apache.flink.table.shaded.org.quartz.JobExecutionException;
import org.apache.flink.table.shaded.org.quartz.JobKey;
import org.apache.flink.table.shaded.org.quartz.Scheduler;
import org.apache.flink.table.shaded.org.quartz.TriggerBuilder;
import org.apache.flink.table.shaded.org.quartz.TriggerKey;
import org.apache.flink.table.shaded.org.quartz.impl.StdSchedulerFactory;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.class */
public class EmbeddedQuartzScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedQuartzScheduler.class);
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private Scheduler quartzScheduler;

    /* loaded from: input_file:org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler$EmbeddedSchedulerJob.class */
    public static class EmbeddedSchedulerJob implements Job {

        /* loaded from: input_file:org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler$EmbeddedSchedulerJob$SqlGatewayRestClient.class */
        private static class SqlGatewayRestClient implements AutoCloseable {
            private final String address;
            private final int port;
            private final RestClient restClient;

            private SqlGatewayRestClient(String str) throws Exception {
                URL url = new URL(str);
                this.address = url.getHost();
                this.port = url.getPort();
                this.restClient = RestClient.forUrl(new Configuration(), Executors.directExecutor(), url);
            }

            /* JADX INFO: Access modifiers changed from: private */
            public SessionHandle openSession(String str, Map<String, String> map) throws Exception {
                OpenSessionRequestBody openSessionRequestBody = new OpenSessionRequestBody(str, map);
                return new SessionHandle(UUID.fromString(((OpenSessionResponseBody) this.restClient.sendRequest(this.address, this.port, OpenSessionHeaders.getInstance(), EmptyMessageParameters.getInstance(), openSessionRequestBody).get()).getSessionHandle()));
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void closeSession(SessionHandle sessionHandle) throws Exception {
                this.restClient.sendRequest(this.address, this.port, CloseSessionHeaders.getInstance(), new SessionMessageParameters(sessionHandle), EmptyRequestBody.getInstance()).get();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
                this.restClient.sendRequest(this.address, this.port, CloseOperationHeaders.getInstance(), new OperationMessageParameters(sessionHandle, operationHandle), EmptyRequestBody.getInstance()).get();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public OperationHandle refreshMaterializedTable(SessionHandle sessionHandle, String str, String str2, Map<String, String> map, Map<String, String> map2, Map<String, String> map3) throws Exception {
                RefreshMaterializedTableRequestBody refreshMaterializedTableRequestBody = new RefreshMaterializedTableRequestBody(true, str2, map, map2, map3);
                return new OperationHandle(UUID.fromString(((RefreshMaterializedTableResponseBody) this.restClient.sendRequest(this.address, this.port, RefreshMaterializedTableHeaders.getInstance(), new RefreshMaterializedTableParameters(sessionHandle, str), refreshMaterializedTableRequestBody).get()).getOperationHandle()));
            }

            /* JADX INFO: Access modifiers changed from: private */
            public List<RowData> fetchOperationAllResults(SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
                Long l = 0L;
                ArrayList arrayList = new ArrayList();
                while (l != null) {
                    FetchResultsResponseBody fetchOperationResults = fetchOperationResults(sessionHandle, operationHandle, l);
                    if (fetchOperationResults instanceof NotReadyFetchResultResponse) {
                        Thread.sleep(10L);
                    } else {
                        fetchOperationResults.getNextResultUri();
                        arrayList.addAll(fetchOperationResults.getResults().getData());
                        l = SqlGatewayRestEndpointUtils.parseToken(fetchOperationResults.getNextResultUri());
                    }
                }
                return arrayList;
            }

            private FetchResultsResponseBody fetchOperationResults(SessionHandle sessionHandle, OperationHandle operationHandle, Long l) throws Exception {
                FetchResultsMessageParameters fetchResultsMessageParameters = new FetchResultsMessageParameters(sessionHandle, operationHandle, l, RowFormat.JSON);
                return (FetchResultsResponseBody) this.restClient.sendRequest(this.address, this.port, FetchResultsHeaders.getDefaultInstance(), fetchResultsMessageParameters, EmptyRequestBody.getInstance()).get();
            }

            @Override // java.lang.AutoCloseable
            public void close() {
                try {
                    this.restClient.close();
                } catch (Exception e) {
                    EmbeddedQuartzScheduler.LOG.error("Failed to close rest client.", e);
                }
            }
        }

        @Override // org.apache.flink.table.shaded.org.quartz.Job
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            SessionHandle sessionHandle = null;
            OperationHandle operationHandle = null;
            SqlGatewayRestClient sqlGatewayRestClient = null;
            try {
                try {
                    WorkflowInfo workflowInfo = (WorkflowInfo) QuartzSchedulerUtils.fromJson(jobExecutionContext.getJobDetail().getJobDataMap().getString(QuartzSchedulerUtils.WORKFLOW_INFO), WorkflowInfo.class);
                    EmbeddedQuartzScheduler.LOG.info("Execute refresh operation for workflow: {}.", workflowInfo);
                    String dateToString = QuartzSchedulerUtils.dateToString(jobExecutionContext.getScheduledFireTime());
                    sqlGatewayRestClient = new SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
                    sessionHandle = sqlGatewayRestClient.openSession(String.format("%s-quartz-refresh-session-%s", workflowInfo.getMaterializedTableIdentifier(), dateToString), workflowInfo.getInitConfig());
                    operationHandle = sqlGatewayRestClient.refreshMaterializedTable(sessionHandle, workflowInfo.getMaterializedTableIdentifier(), dateToString, workflowInfo.getDynamicOptions(), Collections.emptyMap(), workflowInfo.getExecutionConfig());
                    String stringData = ((RowData) sqlGatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle).get(0)).getString(0).toString();
                    EmbeddedQuartzScheduler.LOG.info("Successfully execute refresh operation for materialized table: {} with job id: {}.", workflowInfo.getMaterializedTableIdentifier(), stringData);
                    jobExecutionContext.setResult("Successfully execute refresh operation for materialized table: " + workflowInfo.getMaterializedTableIdentifier() + " with job id: " + stringData);
                    if (sqlGatewayRestClient != null) {
                        if (operationHandle != null) {
                            try {
                                sqlGatewayRestClient.closeOperation(sessionHandle, operationHandle);
                            } catch (Exception e) {
                                EmbeddedQuartzScheduler.LOG.error("Failed to close session.", e);
                                return;
                            }
                        }
                        if (sessionHandle != null) {
                            sqlGatewayRestClient.closeSession(sessionHandle);
                        }
                        sqlGatewayRestClient.close();
                    }
                } catch (Throwable th) {
                    if (sqlGatewayRestClient != null) {
                        if (operationHandle != null) {
                            try {
                                sqlGatewayRestClient.closeOperation(sessionHandle, operationHandle);
                            } catch (Exception e2) {
                                EmbeddedQuartzScheduler.LOG.error("Failed to close session.", e2);
                                throw th;
                            }
                        }
                        if (sessionHandle != null) {
                            sqlGatewayRestClient.closeSession(sessionHandle);
                        }
                        sqlGatewayRestClient.close();
                    }
                    throw th;
                }
            } catch (Exception e3) {
                EmbeddedQuartzScheduler.LOG.error("Failed to execute refresh operation for workflow.", e3);
                throw new JobExecutionException(e3.getMessage(), e3);
            }
        }
    }

    public void start() {
        Properties initializeQuartzSchedulerConfig = QuartzSchedulerUtils.initializeQuartzSchedulerConfig();
        try {
            this.quartzScheduler = new StdSchedulerFactory(initializeQuartzSchedulerConfig).getScheduler();
            this.quartzScheduler.start();
            LOG.info("Start quartz scheduler successfully.");
        } catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            String format = String.format("Failed to start quartz scheduler with config: %s.", initializeQuartzSchedulerConfig);
            LOG.error(format);
            throw new SchedulerException(format, e);
        }
    }

    public void stop() {
        try {
            this.quartzScheduler.shutdown();
        } catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            LOG.error("Failed to shutdown quartz schedule.");
            throw new SchedulerException("Failed to shutdown quartz scheduler.", e);
        }
    }

    public JobDetail createScheduleWorkflow(WorkflowInfo workflowInfo, String str) throws SchedulerException {
        String materializedTableIdentifier = workflowInfo.getMaterializedTableIdentifier();
        JobKey jobKey = QuartzSchedulerUtils.getJobKey(materializedTableIdentifier);
        this.lock.writeLock().lock();
        try {
            try {
                if (this.quartzScheduler.checkExists(jobKey)) {
                    LOG.error("Materialized table {} quartz schedule job already exist, job info: {}.", materializedTableIdentifier, jobKey);
                    throw new SchedulerException(String.format("Materialized table %s quartz schedule job already exist, job info: %s.", materializedTableIdentifier, jobKey));
                }
                JobDetail build = JobBuilder.newJob(EmbeddedSchedulerJob.class).withIdentity(jobKey).build();
                build.getJobDataMap().put(QuartzSchedulerUtils.WORKFLOW_INFO, QuartzSchedulerUtils.toJson(workflowInfo));
                this.quartzScheduler.scheduleJob(build, (CronTrigger) TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup())).withSchedule(CronScheduleBuilder.cronSchedule(str).withMisfireHandlingInstructionIgnoreMisfires()).forJob(build).build());
                LOG.info("Create quartz schedule job for materialized table {} successfully, job info: {}, cron expression: {}.", new Object[]{materializedTableIdentifier, jobKey, str});
                this.lock.writeLock().unlock();
                return build;
            } catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
                LOG.error("Failed to create quartz schedule job for materialized table {}.", materializedTableIdentifier, e);
                throw new SchedulerException(String.format("Failed to create quartz schedule job for materialized table %s.", materializedTableIdentifier), e);
            }
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public void suspendScheduleWorkflow(String str, String str2) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(str, str2);
        this.lock.writeLock().lock();
        try {
            try {
                checkJobExists(jobKey, String.format("Failed to suspend a non-existent quartz schedule job: %s.", jobKey));
                this.quartzScheduler.pauseJob(jobKey);
                this.lock.writeLock().unlock();
            } catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
                LOG.error("Failed to suspend quartz schedule job: {}.", jobKey, e);
                throw new SchedulerException(String.format("Failed to suspend quartz schedule job: %s.", jobKey), e);
            }
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public void resumeScheduleWorkflow(String str, String str2, Map<String, String> map) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(str, str2);
        this.lock.writeLock().lock();
        try {
            try {
                checkJobExists(jobKey, String.format("Failed to resume a non-existent quartz schedule job: %s.", jobKey));
                if (map.isEmpty()) {
                    this.quartzScheduler.resumeJob(jobKey);
                } else {
                    WorkflowInfo workflowInfo = (WorkflowInfo) QuartzSchedulerUtils.fromJson(this.quartzScheduler.getJobDetail(jobKey).getJobDataMap().getString(QuartzSchedulerUtils.WORKFLOW_INFO), WorkflowInfo.class);
                    WorkflowInfo workflowInfo2 = new WorkflowInfo(workflowInfo.getMaterializedTableIdentifier(), map, workflowInfo.getInitConfig(), workflowInfo.getExecutionConfig(), workflowInfo.getRestEndpointUrl());
                    String cronExpression = ((CronTrigger) this.quartzScheduler.getTrigger(TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup()))).getCronExpression();
                    this.quartzScheduler.deleteJob(jobKey);
                    createScheduleWorkflow(workflowInfo2, cronExpression);
                }
            } catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
                LOG.error("Failed to resume quartz schedule job: {}.", jobKey, e);
                throw new SchedulerException(String.format("Failed to resume quartz schedule job: %s.", jobKey), e);
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public void deleteScheduleWorkflow(String str, String str2) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(str, str2);
        this.lock.writeLock().lock();
        try {
            try {
                this.quartzScheduler.deleteJob(jobKey);
                this.lock.writeLock().unlock();
            } catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
                LOG.error("Failed to delete quartz schedule job: {}.", jobKey, e);
                throw new SchedulerException(String.format("Failed to delete quartz schedule job: %s.", jobKey), e);
            }
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    private void checkJobExists(JobKey jobKey, String str) throws org.apache.flink.table.shaded.org.quartz.SchedulerException {
        if (this.quartzScheduler.checkExists(jobKey)) {
            return;
        }
        LOG.error(str);
        throw new SchedulerException(str);
    }

    @VisibleForTesting
    public Scheduler getQuartzScheduler() {
        return this.quartzScheduler;
    }
}
