package com.northpool.tiledispatch.consumer.abstractclass;

import com.northpool.resources.Constants;
import com.northpool.resources.command.QueryFilter;
import com.northpool.resources.datasource.db.DbDataSource;
import com.northpool.resources.datatable.ITable;
import com.northpool.resources.datatable.dao.IScroll;
import com.northpool.resources.sql.jdbc.Transactions;
import com.northpool.service.manager.task.exception.TaskCancelException;
import com.northpool.tiledispatch.base.AbstractBaseComponent;
import com.northpool.tiledispatch.consumer.ITileConsumer;
import com.northpool.tiledispatch.consumer.handler.abstractclass.AbstractTileHandlerStream;
import com.northpool.tiledispatch.exception.ConsumeException;
import com.northpool.tiledispatch.exception.TileCutConsumeException;
import com.northpool.type.Type;
import java.sql.SQLException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/northpool/tiledispatch/consumer/abstractclass/SubDataConsumer.class */
public class SubDataConsumer extends AbstractBaseComponent implements ITileConsumer, Supplier {
    Logger log = LoggerFactory.getLogger(getClass());
    protected AbstractTileHandlerStream handler;
    AtomicLong index;
    ITable table;
    QueryFilter filter;

    public SubDataConsumer(ITable iTable, AbstractTileHandlerStream abstractTileHandlerStream, QueryFilter queryFilter, AtomicLong atomicLong) {
        this.table = iTable;
        this.handler = abstractTileHandlerStream;
        this.index = atomicLong;
        this.filter = queryFilter;
    }

    @Override // com.northpool.tiledispatch.consumer.ITileConsumer
    public void consume() throws ConsumeException {
        DbDataSource dataSource = this.table.getDataSource();
        if (dataSource.getDataSourceType().equals(Constants.DATA_SOURCE_TYPE.oracle)) {
            Transactions.useSingleConnectionInThread(dataSource);
        } else {
            try {
                Transactions.begin(dataSource);
                dataSource.genericDao().doExecuteSql(dataSource.getDataSourceType().equals(Constants.DATA_SOURCE_TYPE.kingbase) ? "begin transaction ISOLATION LEVEL repeatable READ" : "set transaction ISOLATION LEVEL repeatable READ", (Object[]) null, (Type[]) null);
            } catch (SQLException e) {
                throw new ConsumeException(e);
            }
        }
        IScroll scrollArray = this.table.mapDao().scrollArray(this.filter, 2000);
        while (scrollArray.hasNext() && !this.handler.isError() && !this.handler.isCancel()) {
            try {
                try {
                    Object[] objArr = (Object[]) scrollArray.next();
                    if (objArr != null) {
                        this.handler.handle(objArr);
                    }
                    long addAndGet = this.index.addAndGet(1L);
                    if (addAndGet % 20000 == 0) {
                        System.gc();
                        this.log.info("达到阈值{}, 主动调用GC,清理垃圾内存", Integer.valueOf(AbstractDataConsumer.FEATURE_GC_NUM_THRESHOLD));
                    }
                    if (addAndGet % 5000000 == 0) {
                        this.log.info("达到阈值{}, 写入数据", Integer.valueOf(AbstractDataConsumer.FEATURE_NUM_THRESHOLD));
                        this.handler.save();
                    }
                } catch (Exception e2) {
                    throw new TileCutConsumeException(e2);
                }
            } catch (Throwable th) {
                if (dataSource == null) {
                    return;
                }
                if (dataSource.getDataSourceType().equals(Constants.DATA_SOURCE_TYPE.oracle)) {
                    Transactions.connectionInThreadRelease(dataSource);
                } else {
                    try {
                        Transactions.commit(dataSource);
                    } catch (SQLException e3) {
                        throw new ConsumeException(e3);
                    }
                }
                throw th;
            }
        }
        scrollArray.close();
        if (this.handler.isError()) {
            throw new TileCutConsumeException(this.handler.getException().getMessage(), this.handler.getException());
        }
        if (this.handler.isCancel()) {
            throw new TaskCancelException("任务被取消");
        }
        this.handler.awaitTermination(1L, TimeUnit.HOURS);
        this.handler.flush();
        if (dataSource == null) {
            return;
        }
        if (dataSource.getDataSourceType().equals(Constants.DATA_SOURCE_TYPE.oracle)) {
            Transactions.connectionInThreadRelease(dataSource);
            return;
        }
        try {
            Transactions.commit(dataSource);
        } catch (SQLException e4) {
            throw new ConsumeException(e4);
        }
    }

    public void setIndex(AtomicLong atomicLong) {
        this.index = atomicLong;
    }

    @Override // com.northpool.tiledispatch.consumer.ITileConsumer
    public void release() {
    }

    @Override // com.northpool.tiledispatch.consumer.ITileConsumer
    public void setLogPool(ScheduledExecutorService scheduledExecutorService) {
    }

    @Override // java.util.function.Supplier
    public Object get() {
        try {
            consume();
            return null;
        } catch (ConsumeException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.northpool.tiledispatch.base.IBaseComponent
    public void init() {
        if (this.init) {
            return;
        }
        this.init = true;
        this.handler.init();
    }

    @Override // com.northpool.tiledispatch.base.IBaseComponent
    public void cancel() {
        this.cancel = true;
        this.handler.cancel();
    }
}
