package com.northpool.tiledispatch.consumer.abstractclass;

import com.northpool.resources.datatable.dao.IScroll;
import com.northpool.service.manager.task.exception.TaskCancelException;
import com.northpool.service.manager.task.log.ITaskLogger;
import com.northpool.tiledispatch.base.AbstractBaseComponent;
import com.northpool.tiledispatch.consumer.ITileConsumer;
import com.northpool.tiledispatch.consumer.handler.abstractclass.AbstractTileHandlerStream;
import com.northpool.tiledispatch.exception.ConsumeException;
import com.northpool.tiledispatch.exception.TileCutConsumeException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/northpool/tiledispatch/consumer/abstractclass/AbstractDataConsumer.class */
public class AbstractDataConsumer extends AbstractBaseComponent implements ITileConsumer {
    protected AbstractTileHandlerStream handler;
    protected ScheduledExecutorService logPool;
    public static final int FEATURE_NUM_THRESHOLD = 2000000;
    IScroll scroll;

    public AbstractDataConsumer(IScroll iScroll, AbstractTileHandlerStream abstractTileHandlerStream) {
        this.handler = abstractTileHandlerStream;
        this.scroll = iScroll;
    }

    @Override // com.northpool.tiledispatch.consumer.ITileConsumer
    public void consume() throws ConsumeException {
        ScheduledFuture scheduledFuture = null;
        ScheduledFuture scheduledFuture2 = null;
        try {
            try {
                AtomicLong atomicLong = new AtomicLong(0L);
                ScheduledFuture<?> scheduleWithFixedDelay = this.logPool.scheduleWithFixedDelay(() -> {
                    if (this.error) {
                        this.logPool.shutdown();
                    } else {
                        log("正在生产一组瓦片，进度：" + atomicLong);
                    }
                }, 2L, 5L, TimeUnit.SECONDS);
                while (this.scroll.hasNext() && !this.handler.error && !this.cancel) {
                    Object[] objArr = (Object[]) this.scroll.next();
                    if (objArr != null) {
                        this.handler.handle(objArr);
                    }
                    atomicLong.addAndGet(1L);
                    if (atomicLong.intValue() % FEATURE_NUM_THRESHOLD == 0) {
                        ExecutorService executor = this.handler.getExecutor();
                        executor.shutdown();
                        executor.awaitTermination(1L, TimeUnit.HOURS);
                        this.handler.save();
                    }
                }
                this.scroll.close();
                if (this.handler.isError()) {
                    throw new TileCutConsumeException(this.handler.getException().getMessage(), this.handler.getException());
                }
                if (this.cancel) {
                    throw new TaskCancelException("任务被取消");
                }
                this.handler.awaitTermination(1L, TimeUnit.HOURS);
                this.handler.flush();
                if (scheduleWithFixedDelay != null) {
                    scheduleWithFixedDelay.cancel(true);
                }
                if (0 != 0) {
                    scheduledFuture2.cancel(false);
                }
                release();
            } catch (Exception e) {
                throw new TileCutConsumeException(e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                scheduledFuture.cancel(true);
            }
            if (0 != 0) {
                scheduledFuture2.cancel(false);
            }
            release();
            throw th;
        }
    }

    @Override // com.northpool.tiledispatch.base.IBaseComponent
    public void cancel() {
        this.cancel = true;
        this.handler.cancel();
    }

    @Override // com.northpool.tiledispatch.consumer.ITileConsumer
    public void release() {
    }

    @Override // com.northpool.tiledispatch.base.AbstractBaseComponent, com.northpool.tiledispatch.base.IBaseComponent
    public void setLogger(ITaskLogger iTaskLogger) {
        this.cutLogger = iTaskLogger;
    }

    @Override // com.northpool.tiledispatch.consumer.ITileConsumer
    public void setLogPool(ScheduledExecutorService scheduledExecutorService) {
        this.logPool = scheduledExecutorService;
    }

    @Override // com.northpool.tiledispatch.base.IBaseComponent
    public void init() {
        this.handler.init();
    }
}
