package com.alibaba.schedulerx.worker.actor;

import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.alibaba.schedulerx.common.constants.CommonConstants;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.container.Container;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.container.ContainerPool;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.pull.PullManager;
import com.alibaba.schedulerx.worker.util.ContanerUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/alibaba/schedulerx/worker/actor/ContainerActor.class */
public class ContainerActor extends UntypedActor {
    private static Logger LOGGER = LogFactory.getLogger(ContainerActor.class);
    private static ThreadPoolExecutor containerStarter = new ThreadPoolExecutor(8, 8, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: com.alibaba.schedulerx.worker.actor.ContainerActor.1
        private final AtomicInteger nextId = new AtomicInteger(1);
        private final String namePrefix = "Schedulerx-Container-Starter-Thread-";

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "Schedulerx-Container-Starter-Thread-" + this.nextId.getAndIncrement());
        }
    }, new ThreadPoolExecutor.CallerRunsPolicy());
    private ContainerPool containerPool = ContainerFactory.getContainerPool();
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private boolean enableShareContainerPool = ConfigUtil.getWorkerConfig().getBoolean(WorkerConstants.SHARE_CONTAINER_POOL, false);
    private int batchSize = ConfigUtil.getWorkerConfig().getInt(WorkerConstants.WORKER_MAP_PAGE_SIZE, 1000);
    private LogCollector logCollector = LogCollectorFactory.get();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/schedulerx/worker/actor/ContainerActor$ContainerStartRunnable.class */
    public class ContainerStartRunnable implements Runnable {
        private Worker.MasterBatchStartContainersRequest request;

        ContainerStartRunnable(Worker.MasterBatchStartContainersRequest masterBatchStartContainersRequest) {
            this.request = masterBatchStartContainersRequest;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (Worker.MasterStartContainerRequest masterStartContainerRequest : this.request.getStartReqsList()) {
                try {
                    ContainerActor.LOGGER.debug("submit container to containerPool, uniqueId={}, cost={}ms", ContainerActor.this.startContainer(masterStartContainerRequest), Long.valueOf(System.currentTimeMillis() - masterStartContainerRequest.getScheduleTime()));
                } catch (Throwable th) {
                    ContainerActor.this.logCollector.collect(IdUtil.getUniqueId(masterStartContainerRequest.getJobId(), this.request.getJobInstanceId(), masterStartContainerRequest.getTaskId()), ClientLoggerMessage.CONTAINER_START_FAIL, th);
                    Worker.ContainerReportTaskStatusRequest.Builder newBuilder = Worker.ContainerReportTaskStatusRequest.newBuilder();
                    newBuilder.setJobId(masterStartContainerRequest.getJobId());
                    newBuilder.setJobInstanceId(masterStartContainerRequest.getJobInstanceId());
                    newBuilder.setTaskId(masterStartContainerRequest.getTaskId());
                    newBuilder.setStatus(TaskStatus.FAILED.getValue());
                    Address defaultAddress = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
                    newBuilder.setWorkerAddr(defaultAddress.host().get() + CommonConstants.ADDRESS_SEPARATOR + defaultAddress.port().get());
                    newBuilder.setWorkerId(WorkerIdGenerator.get());
                    if (masterStartContainerRequest.getTaskName() != null) {
                        newBuilder.setTaskName(masterStartContainerRequest.getTaskName());
                    }
                    SchedulerxWorker.actorSystem.actorSelection(masterStartContainerRequest.getInstanceMasterAkkaPath()).tell(newBuilder.build(), null);
                }
            }
        }
    }

    public static Props props() {
        return Props.create((Class<?>) ContainerActor.class, new Object[0]);
    }

    @Override // akka.actor.UntypedActor
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Worker.MasterStartContainerRequest) {
            handleStartContainer((Worker.MasterStartContainerRequest) obj);
            return;
        }
        if (obj instanceof Worker.MasterBatchStartContainersRequest) {
            handleBatchStartContainers((Worker.MasterBatchStartContainersRequest) obj);
        } else if (obj instanceof Worker.MasterKillContainerRequest) {
            handleKillContainer((Worker.MasterKillContainerRequest) obj);
        } else if (obj instanceof Worker.MasterDestroyContainerPoolRequest) {
            handleDestroyContainerPool((Worker.MasterDestroyContainerPoolRequest) obj);
        }
    }

    private void handleStartContainer(Worker.MasterStartContainerRequest masterStartContainerRequest) {
        Worker.MasterStartContainerResponse build;
        try {
            String startContainer = startContainer(masterStartContainerRequest);
            build = Worker.MasterStartContainerResponse.newBuilder().setSuccess(true).build();
            LOGGER.debug("submit container to containerPool, uniqueId={}, cost={}ms", startContainer, Long.valueOf(System.currentTimeMillis() - masterStartContainerRequest.getScheduleTime()));
        } catch (Throwable th) {
            String uniqueId = IdUtil.getUniqueId(masterStartContainerRequest.getJobId(), masterStartContainerRequest.getJobInstanceId(), masterStartContainerRequest.getTaskId());
            build = Worker.MasterStartContainerResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(th)).build();
            LOGGER.error("handleStartContainer error.", th);
            this.logCollector.collect(uniqueId, ClientLoggerMessage.CONTAINER_START_FAIL, th);
        }
        getSender().tell(build, getSelf());
    }

    private void handleBatchStartContainers(Worker.MasterBatchStartContainersRequest masterBatchStartContainersRequest) {
        LOGGER.info("jobInstanceId={}, batch start containers, size:{}", Long.valueOf(masterBatchStartContainersRequest.getJobInstanceId()), Integer.valueOf(masterBatchStartContainersRequest.getStartReqsCount()));
        containerStarter.submit(new ContainerStartRunnable(masterBatchStartContainersRequest));
        getSender().tell(Worker.MasterBatchStartContainersResponse.newBuilder().setSuccess(true).build(), getSelf());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String startContainer(Worker.MasterStartContainerRequest masterStartContainerRequest) throws Exception {
        String uniqueId = IdUtil.getUniqueId(masterStartContainerRequest.getJobId(), masterStartContainerRequest.getJobInstanceId(), masterStartContainerRequest.getTaskId());
        LOGGER.debug("handleStartContainer, uniqueId={}, cost={}ms", uniqueId, Long.valueOf(System.currentTimeMillis() - masterStartContainerRequest.getScheduleTime()));
        JobContext convert2JobContext = ContanerUtil.convert2JobContext(masterStartContainerRequest);
        Container create = ContainerFactory.create(convert2JobContext);
        if (create != null) {
            synchronized (this.containerPool.getInstanceLock(masterStartContainerRequest.getJobInstanceId())) {
                this.containerPool.put(convert2JobContext.getUniqueId(), create);
                long jobInstanceId = this.enableShareContainerPool ? 0L : masterStartContainerRequest.getJobInstanceId();
                if (!this.statusReqBatchHandlerPool.contains(jobInstanceId)) {
                    ReqQueue reqQueue = new ReqQueue(jobInstanceId, 100000);
                    reqQueue.init();
                    this.statusReqBatchHandlerPool.start(jobInstanceId, new ContainerStatusReqHandler<>(jobInstanceId, 1, 1, this.batchSize, reqQueue, masterStartContainerRequest.getInstanceMasterAkkaPath()));
                }
                this.containerPool.submit(convert2JobContext.getJobId(), convert2JobContext.getJobInstanceId(), convert2JobContext.getTaskId(), create, masterStartContainerRequest.hasConsumerNum() ? masterStartContainerRequest.getConsumerNum() : 5);
            }
        } else {
            LOGGER.warn("Container is null, uniqueId={}", uniqueId);
        }
        return uniqueId;
    }

    private void handleKillContainer(Worker.MasterKillContainerRequest masterKillContainerRequest) {
        Worker.MasterKillContainerResponse build;
        long jobId = masterKillContainerRequest.getJobId();
        long jobInstanceId = masterKillContainerRequest.getJobInstanceId();
        String str = "";
        try {
            if (masterKillContainerRequest.hasTaskId()) {
                String uniqueId = IdUtil.getUniqueId(jobId, jobInstanceId, masterKillContainerRequest.getTaskId());
                if (this.containerPool.contain(uniqueId)) {
                    this.containerPool.get(uniqueId).kill();
                }
                str = uniqueId;
                LOGGER.info("kill task container success, uniqueId={}", str);
            } else {
                str = IdUtil.getUniqueIdWithoutTask(jobId, jobInstanceId);
                killInstance(jobId, jobInstanceId);
                LOGGER.info("kill instance success, uniqueId:{}", str);
            }
            build = Worker.MasterKillContainerResponse.newBuilder().setSuccess(true).build();
            this.logCollector.collect(str, ClientLoggerMessage.CONTAINER_KILL_SUCCESS);
        } catch (Throwable th) {
            LOGGER.error("kill container exception", th);
            this.logCollector.collect(str, ClientLoggerMessage.CONTAINER_KILL_FAIL, th);
            build = Worker.MasterKillContainerResponse.newBuilder().setSuccess(false).setMessage(th.getMessage()).build();
        }
        getSender().tell(build, getSelf());
    }

    private void handleDestroyContainerPool(Worker.MasterDestroyContainerPoolRequest masterDestroyContainerPoolRequest) {
        try {
            if (!this.enableShareContainerPool) {
                try {
                    ContainerStatusReqHandler<Worker.ContainerReportTaskStatusRequest> containerStatusReqHandler = this.statusReqBatchHandlerPool.getHandlers().get(Long.valueOf(masterDestroyContainerPoolRequest.getJobInstanceId()));
                    if (containerStatusReqHandler != null) {
                        synchronized (this.containerPool.getInstanceLock(masterDestroyContainerPoolRequest.getJobInstanceId())) {
                            if (containerStatusReqHandler.getLatestRequest() != null && containerStatusReqHandler.getLatestRequest().getSerialNum() != masterDestroyContainerPoolRequest.getSerialNum()) {
                                LOGGER.info("skip handleDestroyContainerPool cycleId={}_{}, handler serialNum={}.", Long.valueOf(masterDestroyContainerPoolRequest.getJobInstanceId()), Long.valueOf(masterDestroyContainerPoolRequest.getSerialNum()), Long.valueOf(containerStatusReqHandler.getLatestRequest().getSerialNum()));
                                getSender().tell(Worker.MasterDestroyContainerPoolResponse.newBuilder().setSuccess(true).setDeliveryId(masterDestroyContainerPoolRequest.getDeliveryId()).build(), getSelf());
                                return;
                            } else {
                                LOGGER.info("handleDestroyContainerPool from cycleId={}_{}, handler serialNum={}.", Long.valueOf(masterDestroyContainerPoolRequest.getJobInstanceId()), Long.valueOf(masterDestroyContainerPoolRequest.getSerialNum()), Long.valueOf(containerStatusReqHandler.getLatestRequest().getSerialNum()));
                                this.containerPool.destroyByInstance(masterDestroyContainerPoolRequest.getJobInstanceId());
                                this.statusReqBatchHandlerPool.stop(masterDestroyContainerPoolRequest.getJobInstanceId());
                                PullManager.INSTANCE.stop(masterDestroyContainerPoolRequest.getJobInstanceId());
                            }
                        }
                    }
                    getSender().tell(Worker.MasterDestroyContainerPoolResponse.newBuilder().setSuccess(true).setDeliveryId(masterDestroyContainerPoolRequest.getDeliveryId()).build(), getSelf());
                } catch (Throwable th) {
                    LOGGER.error("cycleId={}_{} handleDestroyContainerPool failed.", Long.valueOf(masterDestroyContainerPoolRequest.getJobInstanceId()), Long.valueOf(masterDestroyContainerPoolRequest.getSerialNum()), th);
                    getSender().tell(Worker.MasterDestroyContainerPoolResponse.newBuilder().setSuccess(true).setDeliveryId(masterDestroyContainerPoolRequest.getDeliveryId()).build(), getSelf());
                }
            }
            this.containerPool.releaseInstanceLock(masterDestroyContainerPoolRequest.getJobInstanceId());
        } catch (Throwable th2) {
            getSender().tell(Worker.MasterDestroyContainerPoolResponse.newBuilder().setSuccess(true).setDeliveryId(masterDestroyContainerPoolRequest.getDeliveryId()).build(), getSelf());
            throw th2;
        }
    }

    private void killInstance(long j, long j2) {
        Map<String, Container> containerMap = this.containerPool.getContainerMap();
        String str = j + IdUtil.SPLITTER_TOKEN + j2;
        Iterator<Map.Entry<String, Container>> it = containerMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Container> next = it.next();
            String key = next.getKey();
            Container value = next.getValue();
            if (key.startsWith(str)) {
                value.kill();
                it.remove();
            }
        }
        if (this.enableShareContainerPool) {
            return;
        }
        this.containerPool.destroyByInstance(j2);
    }

    static {
        containerStarter.allowCoreThreadTimeOut(true);
    }
}
