package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.operators.collect.utils.CollectTestUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.class */
public class CollectSinkOperatorCoordinatorTest {
    private static final int SOCKET_TIMEOUT_MILLIS = 1000;
    private static final TypeSerializer<Row> serializer = new RowTypeInfo(new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}).createSerializer(new ExecutionConfig());

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest$ServerThread.class */
    private static class ServerThread extends Thread {
        private final LinkedList<List<Row>> data;
        private final int closeRequestNum;
        private final ServerSocket server;
        private boolean running;

        private ServerThread(List<List<Row>> list, int i) throws IOException {
            this.data = new LinkedList<>(list);
            this.closeRequestNum = i;
            this.server = new ServerSocket(0);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.running = true;
            int i = 0;
            Socket socket = null;
            DataInputViewStreamWrapper dataInputViewStreamWrapper = null;
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = null;
            while (true) {
                try {
                    if (!this.running) {
                        break;
                    }
                    if (socket == null) {
                        socket = NetUtils.acceptWithoutTimeout(this.server);
                        dataInputViewStreamWrapper = new DataInputViewStreamWrapper(socket.getInputStream());
                        dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(socket.getOutputStream());
                    }
                    CollectCoordinationRequest collectCoordinationRequest = new CollectCoordinationRequest(dataInputViewStreamWrapper);
                    i++;
                    if (i >= this.closeRequestNum) {
                        this.running = false;
                        break;
                    }
                    new CollectCoordinationResponse(collectCoordinationRequest.getVersion(), 0L, CollectTestUtils.toBytesList(this.data.removeFirst(), CollectSinkOperatorCoordinatorTest.serializer)).serialize(dataOutputViewStreamWrapper);
                } catch (IOException e) {
                    return;
                }
            }
            socket.close();
            this.server.close();
        }

        public void close() {
            this.running = false;
        }

        public InetSocketAddress getServerAddress() {
            return new InetSocketAddress(InetAddress.getLoopbackAddress(), this.server.getLocalPort());
        }
    }

    @Test
    public void testNoAddress() throws Exception {
        CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
        collectSinkOperatorCoordinator.start();
        CollectCoordinationRequest collectCoordinationRequest = new CollectCoordinationRequest("version", 123L);
        assertResponseEquals(collectCoordinationRequest, (CollectCoordinationResponse) collectSinkOperatorCoordinator.handleCoordinationRequest(collectCoordinationRequest).get(), -1L, Collections.emptyList());
        collectSinkOperatorCoordinator.close();
    }

    @Test
    public void testServerFailure() throws Exception {
        CollectSinkOperatorCoordinator collectSinkOperatorCoordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
        collectSinkOperatorCoordinator.start();
        List asList = Arrays.asList(Arrays.asList(Row.of(new Object[]{1, "aaa"}), Row.of(new Object[]{2, "bbb"})), Arrays.asList(Row.of(new Object[]{3, "ccc"}), Row.of(new Object[]{4, "ddd"}), Row.of(new Object[]{5, "eee"})));
        ServerThread serverThread = new ServerThread(asList, 3);
        serverThread.start();
        collectSinkOperatorCoordinator.handleEventFromOperator(0, new CollectSinkAddressEvent(serverThread.getServerAddress()));
        CollectCoordinationRequest collectCoordinationRequest = new CollectCoordinationRequest("version1", 123L);
        assertResponseEquals(collectCoordinationRequest, (CollectCoordinationResponse) collectSinkOperatorCoordinator.handleCoordinationRequest(collectCoordinationRequest).get(), 0L, (List) asList.get(0));
        CollectCoordinationRequest collectCoordinationRequest2 = new CollectCoordinationRequest("version2", 456L);
        assertResponseEquals(collectCoordinationRequest2, (CollectCoordinationResponse) collectSinkOperatorCoordinator.handleCoordinationRequest(collectCoordinationRequest2).get(), 0L, (List) asList.get(1));
        CollectCoordinationRequest collectCoordinationRequest3 = new CollectCoordinationRequest("version3", 789L);
        CompletableFuture handleCoordinationRequest = collectSinkOperatorCoordinator.handleCoordinationRequest(collectCoordinationRequest3);
        collectSinkOperatorCoordinator.subtaskFailed(0, (Throwable) null);
        List singletonList = Collections.singletonList(Arrays.asList(Row.of(new Object[]{6, "fff"}), Row.of(new Object[]{7, "ggg"})));
        ServerThread serverThread2 = new ServerThread(singletonList, 2);
        serverThread2.start();
        collectSinkOperatorCoordinator.handleEventFromOperator(0, new CollectSinkAddressEvent(serverThread2.getServerAddress()));
        assertResponseEquals(collectCoordinationRequest3, (CollectCoordinationResponse) handleCoordinationRequest.get(), -1L, Collections.emptyList());
        CollectCoordinationRequest collectCoordinationRequest4 = new CollectCoordinationRequest("version4", 101112L);
        assertResponseEquals(collectCoordinationRequest4, (CollectCoordinationResponse) collectSinkOperatorCoordinator.handleCoordinationRequest(collectCoordinationRequest4).get(), 0L, (List) singletonList.get(0));
        serverThread2.close();
        collectSinkOperatorCoordinator.close();
    }

    private void assertResponseEquals(CollectCoordinationRequest collectCoordinationRequest, CollectCoordinationResponse collectCoordinationResponse, long j, List<Row> list) throws Exception {
        Assert.assertEquals(collectCoordinationRequest.getVersion(), collectCoordinationResponse.getVersion());
        Assert.assertEquals(j, collectCoordinationResponse.getLastCheckpointedOffset());
        List results = collectCoordinationResponse.getResults(serializer);
        Assert.assertEquals(list.size(), results.size());
        for (int i = 0; i < results.size(); i++) {
            Row row = list.get(i);
            Row row2 = (Row) results.get(i);
            Assert.assertEquals(row.getArity(), row2.getArity());
            for (int i2 = 0; i2 < row2.getArity(); i2++) {
                Assert.assertEquals(row.getField(i2), row2.getField(i2));
            }
        }
    }
}
