package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.class */
class IOManagerAsyncTest {
    private IOManagerAsync ioManager;

    /* loaded from: input_file:org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest$TestIOException.class */
    private static final class TestIOException extends IOException {
        private static final long serialVersionUID = -814705441998024472L;

        private TestIOException() {
        }
    }

    IOManagerAsyncTest() {
    }

    @BeforeEach
    void beforeTest() {
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
    }

    @Test
    void channelReadWriteOneSegment() throws Exception {
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(32768);
        for (int i = 0; i < 1111; i++) {
            for (int i2 = 0; i2 < allocateUnpooledSegment.size(); i2 += 4) {
                allocateUnpooledSegment.putInt(i2, i);
            }
            createBlockChannelWriter.writeBlock(allocateUnpooledSegment);
            allocateUnpooledSegment = (MemorySegment) createBlockChannelWriter.getNextReturnedBlock();
        }
        createBlockChannelWriter.close();
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        for (int i3 = 0; i3 < 1111; i3++) {
            createBlockChannelReader.readBlock(allocateUnpooledSegment);
            allocateUnpooledSegment = (MemorySegment) createBlockChannelReader.getNextReturnedBlock();
            for (int i4 = 0; i4 < allocateUnpooledSegment.size(); i4 += 4) {
                Assertions.assertThat(allocateUnpooledSegment.getInt(i4)).withFailMessage("Read memory segment contains invalid data.", new Object[0]).isEqualTo(i3);
            }
        }
        createBlockChannelReader.closeAndDelete();
    }

    @Test
    void channelReadWriteMultipleSegments() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 16; i++) {
            arrayList.add(MemorySegmentFactory.allocateUnpooledSegment(32768));
        }
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
        for (int i2 = 0; i2 < 1111; i2++) {
            MemorySegment memorySegment = arrayList.isEmpty() ? (MemorySegment) createBlockChannelWriter.getNextReturnedBlock() : (MemorySegment) arrayList.remove(arrayList.size() - 1);
            for (int i3 = 0; i3 < memorySegment.size(); i3 += 4) {
                memorySegment.putInt(i3, i2);
            }
            createBlockChannelWriter.writeBlock(memorySegment);
        }
        createBlockChannelWriter.close();
        while (arrayList.size() < 16) {
            arrayList.add(createBlockChannelWriter.getNextReturnedBlock());
        }
        BlockChannelReader createBlockChannelReader = this.ioManager.createBlockChannelReader(createChannel);
        while (!arrayList.isEmpty()) {
            createBlockChannelReader.readBlock(arrayList.remove(0));
        }
        for (int i4 = 0; i4 < 1111; i4++) {
            MemorySegment memorySegment2 = (MemorySegment) createBlockChannelReader.getNextReturnedBlock();
            for (int i5 = 0; i5 < memorySegment2.size(); i5 += 4) {
                Assertions.assertThat(memorySegment2.getInt(i5)).withFailMessage("Read memory segment contains invalid data.", new Object[0]).isEqualTo(i4);
            }
            createBlockChannelReader.readBlock(memorySegment2);
        }
        createBlockChannelReader.closeAndDelete();
        while (arrayList.size() < 16) {
            arrayList.add(createBlockChannelReader.getNextReturnedBlock());
        }
    }

    @Test
    void testExceptionPropagationReader() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.ioManager.getReadRequestQueue(this.ioManager.createChannel()).add(new ReadRequest() { // from class: org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncTest.1
            public void requestDone(IOException iOException) {
                if (iOException instanceof TestIOException) {
                    atomicBoolean2.set(true);
                }
                synchronized (atomicBoolean) {
                    atomicBoolean.set(true);
                    atomicBoolean.notifyAll();
                }
            }

            public void read() throws IOException {
                throw new TestIOException();
            }
        });
        synchronized (atomicBoolean) {
            while (!atomicBoolean.get()) {
                atomicBoolean.wait();
            }
        }
        Assertions.assertThat(atomicBoolean2).isTrue();
    }

    @Test
    void testExceptionPropagationWriter() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.ioManager.getWriteRequestQueue(this.ioManager.createChannel()).add(new WriteRequest() { // from class: org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncTest.2
            public void requestDone(IOException iOException) {
                if (iOException instanceof TestIOException) {
                    atomicBoolean2.set(true);
                }
                synchronized (atomicBoolean) {
                    atomicBoolean.set(true);
                    atomicBoolean.notifyAll();
                }
            }

            public void write() throws IOException {
                throw new TestIOException();
            }
        });
        synchronized (atomicBoolean) {
            while (!atomicBoolean.get()) {
                atomicBoolean.wait();
            }
        }
        Assertions.assertThat(atomicBoolean2).isTrue();
    }

    @Test
    void testExceptionInCallbackRead() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        ReadRequest readRequest = new ReadRequest() { // from class: org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncTest.3
            public void requestDone(IOException iOException) {
                synchronized (atomicBoolean) {
                    atomicBoolean.set(true);
                    atomicBoolean.notifyAll();
                }
            }

            public void read() {
            }
        };
        ReadRequest readRequest2 = new ReadRequest() { // from class: org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncTest.4
            public void requestDone(IOException iOException) {
                throw new RuntimeException();
            }

            public void read() {
            }
        };
        RequestQueue readRequestQueue = this.ioManager.getReadRequestQueue(this.ioManager.createChannel());
        readRequestQueue.add(readRequest2);
        readRequestQueue.add(readRequest);
        synchronized (atomicBoolean) {
            while (!atomicBoolean.get()) {
                atomicBoolean.wait();
            }
        }
    }

    @Test
    void testExceptionInCallbackWrite() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        WriteRequest writeRequest = new WriteRequest() { // from class: org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncTest.5
            public void requestDone(IOException iOException) {
                synchronized (atomicBoolean) {
                    atomicBoolean.set(true);
                    atomicBoolean.notifyAll();
                }
            }

            public void write() {
            }
        };
        WriteRequest writeRequest2 = new WriteRequest() { // from class: org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncTest.6
            public void requestDone(IOException iOException) {
                throw new RuntimeException();
            }

            public void write() {
            }
        };
        RequestQueue writeRequestQueue = this.ioManager.getWriteRequestQueue(this.ioManager.createChannel());
        writeRequestQueue.add(writeRequest2);
        writeRequestQueue.add(writeRequest);
        synchronized (atomicBoolean) {
            while (!atomicBoolean.get()) {
                atomicBoolean.wait();
            }
        }
    }
}
