/*
 * Decompiled with CFR 0.152.
 */
package com.northpool.tiledispatch.consumer.abstractclass;

import com.northpool.resources.Constants;
import com.northpool.resources.command.QueryFilter;
import com.northpool.resources.datasource.db.DbDataSource;
import com.northpool.resources.datatable.ITable;
import com.northpool.resources.datatable.dao.IScroll;
import com.northpool.resources.sql.jdbc.Transactions;
import com.northpool.service.manager.task.exception.TaskCancelException;
import com.northpool.tiledispatch.base.AbstractBaseComponent;
import com.northpool.tiledispatch.consumer.ITileConsumer;
import com.northpool.tiledispatch.consumer.handler.abstractclass.AbstractTileHandlerStream;
import com.northpool.tiledispatch.exception.ConsumeException;
import com.northpool.tiledispatch.exception.TileCutConsumeException;
import java.sql.SQLException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubDataConsumer
extends AbstractBaseComponent
implements ITileConsumer,
Supplier {
    Logger log = LoggerFactory.getLogger(this.getClass());
    protected AbstractTileHandlerStream handler;
    AtomicLong index;
    ITable table;
    QueryFilter filter;

    public SubDataConsumer(ITable table, AbstractTileHandlerStream handler, QueryFilter filter, AtomicLong index) {
        this.table = table;
        this.handler = handler;
        this.index = index;
        this.filter = filter;
    }

    @Override
    public void consume() throws ConsumeException {
        DbDataSource dataSource = (DbDataSource)this.table.getDataSource();
        if (dataSource.getDataSourceType().equals((Object)Constants.DATA_SOURCE_TYPE.oracle)) {
            Transactions.useSingleConnectionInThread((DbDataSource)dataSource);
        } else {
            try {
                Transactions.begin((DbDataSource)dataSource);
            }
            catch (SQLException ex) {
                throw new ConsumeException(ex);
            }
            String sql = "set transaction ISOLATION LEVEL repeatable READ";
            if (dataSource.getDataSourceType().equals((Object)Constants.DATA_SOURCE_TYPE.kingbase)) {
                sql = "begin transaction ISOLATION LEVEL repeatable READ";
            }
            dataSource.genericDao().doExecuteSql(sql, null, null);
        }
        IScroll scroll = this.table.mapDao().scrollArray(this.filter, Integer.valueOf(2000));
        try {
            while (scroll.hasNext() && !this.handler.isError() && !this.handler.isCancel()) {
                long indexLong;
                Object[] item = (Object[])scroll.next();
                if (item != null) {
                    this.handler.handle(item);
                }
                if ((indexLong = this.index.addAndGet(1L)) % 20000L == 0L) {
                    System.gc();
                    this.log.info("\u8fbe\u5230\u9608\u503c{}, \u4e3b\u52a8\u8c03\u7528GC,\u6e05\u7406\u5783\u573e\u5185\u5b58", (Object)20000);
                }
                if (indexLong % 5000000L != 0L) continue;
                this.log.info("\u8fbe\u5230\u9608\u503c{}, \u5199\u5165\u6570\u636e", (Object)5000000);
                this.handler.save();
            }
            scroll.close();
            if (this.handler.isError()) {
                throw new TileCutConsumeException(this.handler.getException().getMessage(), this.handler.getException());
            }
            if (this.handler.isCancel()) {
                throw new TaskCancelException("\u4efb\u52a1\u88ab\u53d6\u6d88");
            }
            this.handler.awaitTermination(1L, TimeUnit.HOURS);
            this.handler.flush();
        }
        catch (Exception e) {
            throw new TileCutConsumeException(e);
        }
        finally {
            if (dataSource == null) {
                return;
            }
            if (dataSource.getDataSourceType().equals((Object)Constants.DATA_SOURCE_TYPE.oracle)) {
                Transactions.connectionInThreadRelease((DbDataSource)dataSource);
            } else {
                try {
                    Transactions.commit((DbDataSource)dataSource);
                }
                catch (SQLException ex) {
                    throw new ConsumeException(ex);
                }
            }
        }
    }

    public void setIndex(AtomicLong index) {
        this.index = index;
    }

    @Override
    public void release() {
    }

    @Override
    public void setLogPool(ScheduledExecutorService logPool) {
    }

    public Object get() {
        try {
            this.consume();
            return null;
        }
        catch (ConsumeException ex) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void init() {
        if (this.init) {
            return;
        }
        this.init = true;
        this.handler.init();
    }

    @Override
    public void cancel() {
        this.cancel = true;
        this.handler.cancel();
    }
}

