/*
 * Decompiled with CFR 0.152.
 */
package com.northpool.tiledispatch.consumer.abstractclass;

import com.northpool.service.manager.task.log.ITaskLogger;
import com.northpool.tiledispatch.base.AbstractBaseComponent;
import com.northpool.tiledispatch.consumer.ITileConsumer;
import com.northpool.tiledispatch.consumer.abstractclass.SubDataConsumer;
import com.northpool.tiledispatch.consumer.buffer.IDataBuffer;
import com.northpool.tiledispatch.consumer.handler.abstractclass.AbstractTileHandlerStream;
import com.northpool.tiledispatch.exception.ConsumeException;
import com.northpool.tiledispatch.exception.TileCutConsumeException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbstractDataConsumer
extends AbstractBaseComponent
implements ITileConsumer {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    protected AbstractTileHandlerStream handler;
    protected ScheduledExecutorService logPool;
    protected IDataBuffer dataBuffer;
    protected ExecutorService subConsumeThread;
    public static final int FEATURE_NUM_THRESHOLD = 5000000;
    public static final int FEATURE_GC_NUM_THRESHOLD = 20000;
    List<SubDataConsumer> subConsumers;

    public AbstractDataConsumer(List<SubDataConsumer> subConsumers, IDataBuffer dataBuffer) {
        this.subConsumers = subConsumers;
        this.dataBuffer = dataBuffer;
        this.subConsumeThread = Executors.newFixedThreadPool(subConsumers.size());
    }

    @Override
    public void consume() throws ConsumeException {
        ScheduledFuture<?> future = null;
        try {
            Object subConsumer;
            AtomicLong index = new AtomicLong(0L);
            future = this.logPool.scheduleWithFixedDelay(() -> {
                if (this.error) {
                    this.logPool.shutdown();
                    return;
                }
                String logMsg = "\u6b63\u5728\u751f\u4ea7\u4e00\u7ec4\u74e6\u7247\uff0c\u8fdb\u5ea6\uff1a" + index;
                this.logger.info(logMsg);
                this.log(logMsg);
            }, 2L, 60L, TimeUnit.SECONDS);
            CompletableFuture[] promiseArr = new CompletableFuture[this.subConsumers.size()];
            int size = this.subConsumers.size();
            for (int i = 0; i < size; ++i) {
                CompletableFuture completableFuture;
                subConsumer = this.subConsumers.get(i);
                ((SubDataConsumer)subConsumer).setIndex(index);
                promiseArr[i] = completableFuture = CompletableFuture.supplyAsync(subConsumer, this.subConsumeThread);
            }
            CompletableFuture<Void> allConsumeFuture = CompletableFuture.allOf(promiseArr);
            try {
                subConsumer = allConsumeFuture.get();
            }
            catch (Exception e) {
                throw new TileCutConsumeException(e.getMessage(), e);
            }
            this.dataBuffer.awaitTermination(24L, TimeUnit.HOURS);
            this.dataBuffer.flushAll();
        }
        catch (Exception e) {
            throw new TileCutConsumeException(e);
        }
        finally {
            if (future != null) {
                future.cancel(true);
            }
            this.release();
        }
    }

    @Override
    public void cancel() {
        this.cancel = true;
        for (SubDataConsumer subConsumer : this.subConsumers) {
            subConsumer.cancel();
        }
        if (this.handler != null) {
            this.handler.cancel();
        }
    }

    @Override
    public void release() {
    }

    @Override
    public void setLogger(ITaskLogger cutLogger) {
        this.cutLogger = cutLogger;
    }

    @Override
    public void setLogPool(ScheduledExecutorService logPool) {
        this.logPool = logPool;
    }

    @Override
    public void init() {
    }
}

