/*
 * Decompiled with CFR 0.152.
 */
package com.northpool.commons.filechannel;

import com.northpool.exception.UException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class AsyncFileReadChannelPool {
    final Path filePath;
    final int readFileChannelNum;
    BlockingQueue<DataRequest> queue;
    final int queueSize;
    FileChannelHolder[] pool;
    Executor executor;
    boolean isClose = false;
    final ReentrantLock lock = new ReentrantLock();
    final Condition allFileChannelInUse = this.lock.newCondition();
    boolean useDirect = true;

    public AsyncFileReadChannelPool(Path filePath, int readFileChannelNum, int queueSize) {
        this.filePath = filePath;
        this.readFileChannelNum = readFileChannelNum;
        this.queueSize = queueSize;
    }

    public void init() throws IOException {
        this.createFileChannelPool();
        this.queue = new ArrayBlockingQueue<DataRequest>(this.queueSize);
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.execute(() -> {
            while (!this.isClose) {
                DataRequest dataRequest = null;
                try {
                    dataRequest = this.queue.take();
                }
                catch (InterruptedException e) {
                    UException.printStackTrace(e);
                }
                this.lock.lock();
                FileChannelHolder holder = this.getUnusedFileChannelPool();
                this.doRead(holder, dataRequest);
                this.lock.unlock();
            }
        });
    }

    private void doRead(final FileChannelHolder holder, final DataRequest dataRequest) {
        holder.using = true;
        ByteBuffer buffer = this.useDirect ? ByteBuffer.allocateDirect(dataRequest.lenght) : ByteBuffer.allocate(dataRequest.lenght);
        holder.channel.read(buffer, dataRequest.offset, buffer, new CompletionHandler<Integer, ByteBuffer>(){

            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                AsyncFileReadChannelPool.this.lock.lock();
                dataRequest.buffer = attachment;
                dataRequest.future.complete(dataRequest);
                holder.using = false;
                AsyncFileReadChannelPool.this.allFileChannelInUse.signalAll();
                AsyncFileReadChannelPool.this.lock.unlock();
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                AsyncFileReadChannelPool.this.lock.lock();
                dataRequest.future.completeExceptionally(exc);
                holder.using = false;
                AsyncFileReadChannelPool.this.allFileChannelInUse.signalAll();
                AsyncFileReadChannelPool.this.lock.unlock();
            }
        });
    }

    private FileChannelHolder getUnusedFileChannelPool() {
        FileChannelHolder holderUnused = null;
        while (holderUnused == null) {
            for (FileChannelHolder holder : this.pool) {
                if (holder.using) continue;
                holderUnused = holder;
                return holder;
            }
            try {
                this.allFileChannelInUse.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return holderUnused;
    }

    void createFileChannelPool() throws IOException {
        this.pool = new FileChannelHolder[this.readFileChannelNum];
        for (int i = 0; i < this.readFileChannelNum; ++i) {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(this.filePath, StandardOpenOption.READ);
            this.pool[i] = FileChannelHolder.create(channel);
        }
    }

    public void close() {
        for (int i = 0; i < this.readFileChannelNum; ++i) {
            try {
                this.pool[i].channel.close();
                continue;
            }
            catch (IOException e) {
                UException.printStackTrace(e);
            }
        }
        this.isClose = true;
    }

    public CompletableFuture<DataRequest> read(String key, long offset, int lenght) throws InterruptedException {
        DataRequest dataRequest = DataRequest.build(key, offset, lenght);
        this.queue.put(dataRequest);
        return dataRequest.future;
    }

    static class FileChannelHolder {
        volatile boolean using = false;
        AsynchronousFileChannel channel;

        FileChannelHolder() {
        }

        static FileChannelHolder create(AsynchronousFileChannel channel) {
            FileChannelHolder holder = new FileChannelHolder();
            holder.channel = channel;
            return holder;
        }
    }

    public static class DataRequest {
        long offset;
        int lenght;
        ByteBuffer buffer;
        String key;
        CompletableFuture<DataRequest> future;

        static DataRequest build(String key, long offset, int lenght) {
            DataRequest dataRequest = new DataRequest();
            dataRequest.offset = offset;
            dataRequest.lenght = lenght;
            CompletableFuture future = new CompletableFuture();
            dataRequest.future = future;
            dataRequest.key = key;
            return dataRequest;
        }

        public long getOffset() {
            return this.offset;
        }

        public int getLenght() {
            return this.lenght;
        }

        public ByteBuffer getBuffer() {
            return this.buffer;
        }

        public String getKey() {
            return this.key;
        }
    }
}

