package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManagerTest.class */
class HsSubpartitionConsumerMemoryDataManagerTest {
    private static final int BUFFER_SIZE = 8;
    private static final int SUBPARTITION_ID = 0;

    HsSubpartitionConsumerMemoryDataManagerTest() {
    }

    @Test
    void testPeekNextToConsumeDataTypeNotMeetBufferIndexToConsume() throws Exception {
        HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager = createSubpartitionConsumerMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, false));
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.peekNextToConsumeDataType(1, new ArrayDeque())).isEqualTo(Buffer.DataType.NONE);
    }

    @Test
    void testPeekNextToConsumeDataTypeTrimHeadingReleasedBuffers() throws Exception {
        HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager = createSubpartitionConsumerMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        HsBufferContext createBufferContext = createBufferContext(0, false);
        HsBufferContext createBufferContext2 = createBufferContext(1, false);
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext);
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext2);
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, true));
        createBufferContext.release();
        createBufferContext2.release();
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.peekNextToConsumeDataType(2, Collections.emptyList())).isEqualTo(Buffer.DataType.EVENT_BUFFER);
    }

    @Test
    void testConsumeBufferFirstUnConsumedBufferIndexNotMeetNextToConsume() throws Exception {
        HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager = createSubpartitionConsumerMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, false));
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.consumeBuffer(1, Collections.emptyList())).isNotPresent();
    }

    @Test
    void testConsumeBufferTrimHeadingReleasedBuffers() throws Exception {
        HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager = createSubpartitionConsumerMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        HsBufferContext createBufferContext = createBufferContext(0, false);
        HsBufferContext createBufferContext2 = createBufferContext(1, false);
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext);
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext2);
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, true));
        createBufferContext.release();
        createBufferContext2.release();
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.consumeBuffer(2, Collections.emptyList())).isPresent();
    }

    @Test
    void testConsumeBufferReturnSlice() {
        HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager = createSubpartitionConsumerMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(0, false));
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.consumeBuffer(0, Collections.emptyList())).hasValueSatisfying(bufferAndBacklog -> {
            Assertions.assertThat(bufferAndBacklog.buffer()).isInstanceOf(ReadOnlySlicedNetworkBuffer.class);
        });
    }

    @Test
    void testAddBuffer() {
        HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager = createSubpartitionConsumerMemoryDataManager(TestingMemoryDataManagerOperation.builder().build());
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.add(createBufferContext(0, false));
        arrayDeque.add(createBufferContext(1, false));
        createSubpartitionConsumerMemoryDataManager.addInitialBuffers(arrayDeque);
        createSubpartitionConsumerMemoryDataManager.addBuffer(createBufferContext(2, true));
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.consumeBuffer(0, Collections.emptyList())).hasValueSatisfying(bufferAndBacklog -> {
            Assertions.assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0);
            Assertions.assertThat(bufferAndBacklog.buffer().getDataType()).isEqualTo(Buffer.DataType.DATA_BUFFER);
        });
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.consumeBuffer(1, Collections.emptyList())).hasValueSatisfying(bufferAndBacklog2 -> {
            Assertions.assertThat(bufferAndBacklog2.getSequenceNumber()).isEqualTo(1);
            Assertions.assertThat(bufferAndBacklog2.buffer().getDataType()).isEqualTo(Buffer.DataType.DATA_BUFFER);
        });
        Assertions.assertThat(createSubpartitionConsumerMemoryDataManager.consumeBuffer(2, Collections.emptyList())).hasValueSatisfying(bufferAndBacklog3 -> {
            Assertions.assertThat(bufferAndBacklog3.getSequenceNumber()).isEqualTo(2);
            Assertions.assertThat(bufferAndBacklog3.buffer().getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
        });
    }

    @Test
    void testRelease() {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingMemoryDataManagerOperation build = TestingMemoryDataManagerOperation.builder().setOnConsumerReleasedBiConsumer((num, hsConsumerId) -> {
            completableFuture.complete(hsConsumerId);
        }).build();
        HsConsumerId newId = HsConsumerId.newId((HsConsumerId) null);
        createSubpartitionConsumerMemoryDataManager(newId, build).releaseDataView();
        Assertions.assertThat(completableFuture).isCompletedWithValue(newId);
    }

    private static HsBufferContext createBufferContext(int i, boolean z) {
        return new HsBufferContext(HybridShuffleTestUtils.createBuffer(BUFFER_SIZE, z), i, 0);
    }

    private HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager(HsMemoryDataManagerOperation hsMemoryDataManagerOperation) {
        return createSubpartitionConsumerMemoryDataManager(HsConsumerId.DEFAULT, hsMemoryDataManagerOperation);
    }

    private HsSubpartitionConsumerMemoryDataManager createSubpartitionConsumerMemoryDataManager(HsConsumerId hsConsumerId, HsMemoryDataManagerOperation hsMemoryDataManagerOperation) {
        return new HsSubpartitionConsumerMemoryDataManager(new ReentrantLock(), new ReentrantLock(), 0, hsConsumerId, hsMemoryDataManagerOperation);
    }
}
