package org.apache.flink.streaming.api.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.class */
public class InternalTimerServiceImplTest {
    private final int maxParallelism;
    private final KeyGroupRange testKeyGroupRange;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest$TestKeyContext.class */
    public static class TestKeyContext implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object obj) {
            this.key = obj;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }

    private static InternalTimer<Integer, String> anyInternalTimer() {
        return (InternalTimer) Mockito.any();
    }

    public InternalTimerServiceImplTest(int i, int i2, int i3) {
        this.testKeyGroupRange = new KeyGroupRange(i, i2);
        this.maxParallelism = i3;
    }

    @Test
    public void testKeyGroupStartIndexSetting() {
        Assert.assertEquals(7, createInternalTimerService(new KeyGroupRange(7, 21), new TestKeyContext(), new TestProcessingTimeService(), IntSerializer.INSTANCE, StringSerializer.INSTANCE, createQueueFactory()).getLocalKeyGroupRangeStartIdx());
    }

    @Test
    public void testTimerAssignmentToKeyGroups() {
        HashSet[] hashSetArr = new HashSet[100];
        TestKeyContext testKeyContext = new TestKeyContext();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 100 - 1);
        InternalTimerServiceImpl createInternalTimerService = createInternalTimerService(keyGroupRange, testKeyContext, new TestProcessingTimeService(), IntSerializer.INSTANCE, StringSerializer.INSTANCE, createQueueFactory(keyGroupRange, 100));
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, (Triggerable) Mockito.mock(Triggerable.class));
        for (int i = 0; i < 100; i++) {
            TimerHeapInternalTimer timerHeapInternalTimer = new TimerHeapInternalTimer(10 + i, Integer.valueOf(i), "hello_world_" + i);
            int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(timerHeapInternalTimer.getKey(), 100);
            HashSet hashSet = hashSetArr[assignToKeyGroup];
            if (hashSet == null) {
                hashSet = new HashSet();
                hashSetArr[assignToKeyGroup] = hashSet;
            }
            hashSet.add(timerHeapInternalTimer);
            testKeyContext.setCurrentKey(timerHeapInternalTimer.getKey());
            createInternalTimerService.registerEventTimeTimer(timerHeapInternalTimer.getNamespace(), timerHeapInternalTimer.getTimestamp());
            createInternalTimerService.registerProcessingTimeTimer(timerHeapInternalTimer.getNamespace(), timerHeapInternalTimer.getTimestamp());
        }
        List eventTimeTimersPerKeyGroup = createInternalTimerService.getEventTimeTimersPerKeyGroup();
        List processingTimeTimersPerKeyGroup = createInternalTimerService.getProcessingTimeTimersPerKeyGroup();
        for (int i2 = 0; i2 < hashSetArr.length; i2++) {
            HashSet hashSet2 = hashSetArr[i2];
            Set set = (Set) eventTimeTimersPerKeyGroup.get(i2);
            Set set2 = (Set) processingTimeTimersPerKeyGroup.get(i2);
            if (hashSet2 == null) {
                Assert.assertTrue(set.isEmpty());
                Assert.assertTrue(set2.isEmpty());
            } else {
                Assert.assertEquals(hashSet2, set);
                Assert.assertEquals(hashSet2, set2);
            }
        }
    }

    @Test
    public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, new HeapPriorityQueueSetFactory(this.testKeyGroupRange, this.maxParallelism, 128));
        testKeyContext.setCurrentKey(Integer.valueOf(getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism)));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 20L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 30L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 20L);
        Assert.assertEquals(5L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
        Assert.assertEquals(3L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{10L}));
        testProcessingTimeService.setCurrentTime(10L);
        Assert.assertEquals(3L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
        Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{20L}));
        testProcessingTimeService.setCurrentTime(20L);
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(0L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{30L}));
        testProcessingTimeService.setCurrentTime(30L);
        Assert.assertEquals(0L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(0L, testProcessingTimeService.getNumActiveTimers());
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 40L);
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
    }

    @Test
    public void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testKeyContext.setCurrentKey(Integer.valueOf(getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism)));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 20L);
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{20L}));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{10L}));
    }

    @Test
    public void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        final InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testKeyContext.setCurrentKey(Integer.valueOf(getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism)));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{10L}));
        ((Triggerable) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.streaming.api.operators.InternalTimerServiceImplTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Exception {
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 20L);
                return null;
            }
        }).when(triggerable)).onProcessingTime(anyInternalTimer());
        testProcessingTimeService.setCurrentTime(10L);
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{20L}));
        ((Triggerable) Mockito.doAnswer(new Answer<Object>() { // from class: org.apache.flink.streaming.api.operators.InternalTimerServiceImplTest.2
            public Object answer(InvocationOnMock invocationOnMock) throws Exception {
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 30L);
                return null;
            }
        }).when(triggerable)).onProcessingTime(anyInternalTimer());
        testProcessingTimeService.setCurrentTime(20L);
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(1L, testProcessingTimeService.getNumActiveTimers());
        Assert.assertThat(testProcessingTimeService.getActiveTimerTimestamps(), Matchers.containsInAnyOrder(new Long[]{30L}));
    }

    @Test
    public void testCurrentProcessingTime() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testProcessingTimeService.setCurrentTime(17L);
        Assert.assertEquals(17L, createAndStartInternalTimerService.currentProcessingTime());
        testProcessingTimeService.setCurrentTime(42L);
        Assert.assertEquals(42L, createAndStartInternalTimerService.currentProcessingTime());
    }

    @Test
    public void testCurrentEventTime() throws Exception {
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService((Triggerable) Mockito.mock(Triggerable.class), new TestKeyContext(), new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        createAndStartInternalTimerService.advanceWatermark(17L);
        Assert.assertEquals(17L, createAndStartInternalTimerService.currentWatermark());
        createAndStartInternalTimerService.advanceWatermark(42L);
        Assert.assertEquals(42L, createAndStartInternalTimerService.currentWatermark());
    }

    @Test
    public void testSetAndFireEventTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
                Assert.assertEquals(4L, createAndStartInternalTimerService.numEventTimeTimers());
                Assert.assertEquals(2L, createAndStartInternalTimerService.numEventTimeTimers("hello"));
                Assert.assertEquals(2L, createAndStartInternalTimerService.numEventTimeTimers("ciao"));
                createAndStartInternalTimerService.advanceWatermark(10L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(4))).onEventTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assert.assertEquals(0L, createAndStartInternalTimerService.numEventTimeTimers());
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @Test
    public void testSetAndFireProcessingTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                Assert.assertEquals(4L, createAndStartInternalTimerService.numProcessingTimeTimers());
                Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
                Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
                testProcessingTimeService.setCurrentTime(10L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(4))).onProcessingTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assert.assertEquals(0L, createAndStartInternalTimerService.numProcessingTimeTimers());
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @Test
    public void testDeleteEventTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
                Assert.assertEquals(4L, createAndStartInternalTimerService.numEventTimeTimers());
                Assert.assertEquals(2L, createAndStartInternalTimerService.numEventTimeTimers("hello"));
                Assert.assertEquals(2L, createAndStartInternalTimerService.numEventTimeTimers("ciao"));
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.deleteEventTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.deleteEventTimeTimer("ciao", 10L);
                Assert.assertEquals(2L, createAndStartInternalTimerService.numEventTimeTimers());
                Assert.assertEquals(1L, createAndStartInternalTimerService.numEventTimeTimers("hello"));
                Assert.assertEquals(1L, createAndStartInternalTimerService.numEventTimeTimers("ciao"));
                createAndStartInternalTimerService.advanceWatermark(10L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(2))).onEventTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assert.assertEquals(0L, createAndStartInternalTimerService.numEventTimeTimers());
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @Test
    public void testDeleteProcessingTimeTimers() throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            int i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
                createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
                Assert.assertEquals(4L, createAndStartInternalTimerService.numProcessingTimeTimers());
                Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
                Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
                testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
                createAndStartInternalTimerService.deleteProcessingTimeTimer("hello", 10L);
                testKeyContext.setCurrentKey(Integer.valueOf(i));
                createAndStartInternalTimerService.deleteProcessingTimeTimer("ciao", 10L);
                Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers());
                Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
                Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
                testProcessingTimeService.setCurrentTime(10L);
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(2))).onProcessingTime(anyInternalTimer());
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(0))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "ciao")));
                ((Triggerable) Mockito.verify(triggerable, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i), "hello")));
                Assert.assertEquals(0L, createAndStartInternalTimerService.numEventTimeTimers());
                return;
            }
            keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
    }

    @Test
    public void testForEachEventTimeTimers() throws Exception {
        int i;
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                break;
            } else {
                keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
            }
        }
        HashSet<Tuple3> hashSet = new HashSet();
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "hello", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "hello", 10L));
        for (Tuple3 tuple3 : hashSet) {
            testKeyContext.setCurrentKey(tuple3.f0);
            createAndStartInternalTimerService.registerEventTimeTimer(tuple3.f1, ((Long) tuple3.f2).longValue());
        }
        HashSet hashSet2 = new HashSet();
        createAndStartInternalTimerService.forEachEventTimeTimer((str, l) -> {
            hashSet2.add(Tuple3.of((Integer) testKeyContext.getCurrentKey(), str, l));
        });
        Assert.assertEquals(hashSet, hashSet2);
    }

    @Test
    public void testForEachProcessingTimeTimers() throws Exception {
        int i;
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            i = keyInKeyGroupRange2;
            if (i != keyInKeyGroupRange) {
                break;
            } else {
                keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
            }
        }
        HashSet<Tuple3> hashSet = new HashSet();
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(keyInKeyGroupRange), "hello", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "ciao", 10L));
        hashSet.add(Tuple3.of(Integer.valueOf(i), "hello", 10L));
        for (Tuple3 tuple3 : hashSet) {
            testKeyContext.setCurrentKey(tuple3.f0);
            createAndStartInternalTimerService.registerProcessingTimeTimer(tuple3.f1, ((Long) tuple3.f2).longValue());
        }
        HashSet hashSet2 = new HashSet();
        createAndStartInternalTimerService.forEachProcessingTimeTimer((str, l) -> {
            hashSet2.add(Tuple3.of((Integer) testKeyContext.getCurrentKey(), str, l));
        });
        Assert.assertEquals(hashSet, hashSet2);
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        testSnapshotAndRestore(2);
    }

    @Test
    public void testSnapshotAndRebalancingRestore() throws Exception {
        testSnapshotAndRebalancingRestore(2);
    }

    private void testSnapshotAndRestore(int i) throws Exception {
        int i2;
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, new TestProcessingTimeService(), this.testKeyGroupRange, createQueueFactory());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (true) {
            i2 = keyInKeyGroupRange2;
            if (i2 != keyInKeyGroupRange) {
                break;
            } else {
                keyInKeyGroupRange2 = getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
            }
        }
        testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
        testKeyContext.setCurrentKey(Integer.valueOf(i2));
        createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
        Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
        Assert.assertEquals(2L, createAndStartInternalTimerService.numEventTimeTimers());
        Assert.assertEquals(1L, createAndStartInternalTimerService.numEventTimeTimers("hello"));
        Assert.assertEquals(1L, createAndStartInternalTimerService.numEventTimeTimers("ciao"));
        HashMap hashMap = new HashMap();
        Iterator it = this.testKeyGroupRange.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    InternalTimersSnapshotReaderWriters.getWriterForVersion(i, createAndStartInternalTimerService.snapshotTimersForKeyGroup(num.intValue()), createAndStartInternalTimerService.getKeySerializer(), createAndStartInternalTimerService.getNamespaceSerializer()).writeTimersSnapshot(new DataOutputViewStreamWrapper(byteArrayOutputStream));
                    hashMap.put(num, byteArrayOutputStream.toByteArray());
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (byteArrayOutputStream != null) {
                    if (th != null) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                throw th3;
            }
        }
        Triggerable triggerable2 = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext2 = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> restoreTimerService = restoreTimerService(hashMap, i, triggerable2, testKeyContext2, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory());
        testProcessingTimeService.setCurrentTime(10L);
        restoreTimerService.advanceWatermark(10L);
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(2))).onProcessingTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i2), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(2))).onEventTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(i2), "ciao")));
        Assert.assertEquals(0L, restoreTimerService.numEventTimeTimers());
    }

    private void testSnapshotAndRebalancingRestore(int i) throws Exception {
        Triggerable triggerable = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        PriorityQueueSetFactory createQueueFactory = createQueueFactory();
        InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService = createAndStartInternalTimerService(triggerable, testKeyContext, testProcessingTimeService, this.testKeyGroupRange, createQueueFactory);
        int startKeyGroup = this.testKeyGroupRange.getStartKeyGroup() + ((this.testKeyGroupRange.getEndKeyGroup() - this.testKeyGroupRange.getStartKeyGroup()) / 2);
        KeyGroupRange keyGroupRange = new KeyGroupRange(this.testKeyGroupRange.getStartKeyGroup(), startKeyGroup);
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(startKeyGroup + 1, this.testKeyGroupRange.getEndKeyGroup());
        int keyInKeyGroupRange = getKeyInKeyGroupRange(keyGroupRange, this.maxParallelism);
        int keyInKeyGroupRange2 = getKeyInKeyGroupRange(keyGroupRange2, this.maxParallelism);
        testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange));
        createAndStartInternalTimerService.registerProcessingTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerEventTimeTimer("hello", 10L);
        testKeyContext.setCurrentKey(Integer.valueOf(keyInKeyGroupRange2));
        createAndStartInternalTimerService.registerEventTimeTimer("ciao", 10L);
        createAndStartInternalTimerService.registerProcessingTimeTimer("hello", 10L);
        Assert.assertEquals(2L, createAndStartInternalTimerService.numProcessingTimeTimers());
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("hello"));
        Assert.assertEquals(1L, createAndStartInternalTimerService.numProcessingTimeTimers("ciao"));
        Assert.assertEquals(2L, createAndStartInternalTimerService.numEventTimeTimers());
        Assert.assertEquals(1L, createAndStartInternalTimerService.numEventTimeTimers("hello"));
        Assert.assertEquals(1L, createAndStartInternalTimerService.numEventTimeTimers("ciao"));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Iterator it = this.testKeyGroupRange.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    InternalTimersSnapshotReaderWriters.getWriterForVersion(i, createAndStartInternalTimerService.snapshotTimersForKeyGroup(num.intValue()), createAndStartInternalTimerService.getKeySerializer(), createAndStartInternalTimerService.getNamespaceSerializer()).writeTimersSnapshot(new DataOutputViewStreamWrapper(byteArrayOutputStream));
                    if (keyGroupRange.contains(num.intValue())) {
                        hashMap.put(num, byteArrayOutputStream.toByteArray());
                    } else {
                        if (!keyGroupRange2.contains(num.intValue())) {
                            throw new IllegalStateException("Key-Group index doesn't belong to any sub range.");
                        }
                        hashMap2.put(num, byteArrayOutputStream.toByteArray());
                    }
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (byteArrayOutputStream != null) {
                    if (th != null) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                throw th3;
            }
        }
        Triggerable triggerable2 = (Triggerable) Mockito.mock(Triggerable.class);
        Triggerable triggerable3 = (Triggerable) Mockito.mock(Triggerable.class);
        TestKeyContext testKeyContext2 = new TestKeyContext();
        TestKeyContext testKeyContext3 = new TestKeyContext();
        TestProcessingTimeService testProcessingTimeService2 = new TestProcessingTimeService();
        TestProcessingTimeService testProcessingTimeService3 = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> restoreTimerService = restoreTimerService(hashMap, i, triggerable2, testKeyContext2, testProcessingTimeService2, keyGroupRange, createQueueFactory);
        InternalTimerServiceImpl<Integer, String> restoreTimerService2 = restoreTimerService(hashMap2, i, triggerable3, testKeyContext3, testProcessingTimeService3, keyGroupRange2, createQueueFactory);
        testProcessingTimeService2.setCurrentTime(10L);
        restoreTimerService.advanceWatermark(10L);
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.never())).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable2, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
        ((Triggerable) Mockito.verify(triggerable2, Mockito.never())).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "ciao")));
        Assert.assertEquals(0L, restoreTimerService.numEventTimeTimers());
        testProcessingTimeService3.setCurrentTime(10L);
        restoreTimerService2.advanceWatermark(10L);
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onProcessingTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable3, Mockito.never())).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "ciao")));
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onProcessingTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "hello")));
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onEventTime(anyInternalTimer());
        ((Triggerable) Mockito.verify(triggerable3, Mockito.never())).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange), "hello")));
        ((Triggerable) Mockito.verify(triggerable3, Mockito.times(1))).onEventTime((InternalTimer) Mockito.eq(new TimerHeapInternalTimer(10L, Integer.valueOf(keyInKeyGroupRange2), "ciao")));
        Assert.assertEquals(0L, restoreTimerService2.numEventTimeTimers());
    }

    private static int getKeyInKeyGroup(int i, int i2) {
        Random random = new Random(System.currentTimeMillis());
        int nextInt = random.nextInt();
        while (true) {
            int i3 = nextInt;
            if (KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i3), i2) == i) {
                return i3;
            }
            nextInt = random.nextInt();
        }
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange keyGroupRange, int i) {
        Random random = new Random(System.currentTimeMillis());
        int nextInt = random.nextInt();
        while (true) {
            int i2 = nextInt;
            if (keyGroupRange.contains(KeyGroupRangeAssignment.assignToKeyGroup(Integer.valueOf(i2), i))) {
                return i2;
            }
            nextInt = random.nextInt();
        }
    }

    private static InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService(Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupRange keyGroupRange, PriorityQueueSetFactory priorityQueueSetFactory) {
        InternalTimerServiceImpl<Integer, String> createInternalTimerService = createInternalTimerService(keyGroupRange, keyContext, processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
        return createInternalTimerService;
    }

    private static InternalTimerServiceImpl<Integer, String> restoreTimerService(Map<Integer, byte[]> map, int i, Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupRange keyGroupRange, PriorityQueueSetFactory priorityQueueSetFactory) throws Exception {
        InternalTimerServiceImpl<Integer, String> createInternalTimerService = createInternalTimerService(keyGroupRange, keyContext, processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        Iterator it = keyGroupRange.iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (map.containsKey(num)) {
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(map.get(num));
                Throwable th = null;
                try {
                    try {
                        createInternalTimerService.restoreTimersForKeyGroup(InternalTimersSnapshotReaderWriters.getReaderForVersion(i, InternalTimerServiceImplTest.class.getClassLoader()).readTimersSnapshot(new DataInputViewStreamWrapper(byteArrayInputStream)), num.intValue());
                        if (byteArrayInputStream != null) {
                            if (0 != 0) {
                                try {
                                    byteArrayInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                byteArrayInputStream.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (byteArrayInputStream != null) {
                        if (th != null) {
                            try {
                                byteArrayInputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            byteArrayInputStream.close();
                        }
                    }
                    throw th3;
                }
            }
        }
        createInternalTimerService.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
        return createInternalTimerService;
    }

    private PriorityQueueSetFactory createQueueFactory() {
        return createQueueFactory(this.testKeyGroupRange, this.maxParallelism);
    }

    protected PriorityQueueSetFactory createQueueFactory(KeyGroupRange keyGroupRange, int i) {
        return new HeapPriorityQueueSetFactory(keyGroupRange, i, 128);
    }

    @Parameterized.Parameters(name = "start = {0}, end = {1}, max = {2}")
    public static Collection<Object[]> keyRanges() {
        return Arrays.asList(new Object[]{0, 32766, Short.MAX_VALUE}, new Object[]{0, 10, Short.MAX_VALUE}, new Object[]{0, 10, 10}, new Object[]{10, 32766, Short.MAX_VALUE}, new Object[]{2, 5, 100}, new Object[]{2, 5, 6});
    }

    private static <K, N> InternalTimerServiceImpl<K, N> createInternalTimerService(KeyGroupRange keyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, PriorityQueueSetFactory priorityQueueSetFactory) {
        TimerSerializer timerSerializer = new TimerSerializer(typeSerializer, typeSerializer2);
        return new InternalTimerServiceImpl<>(keyGroupRange, keyContext, processingTimeService, createTimerQueue("__test_processing_timers", timerSerializer, priorityQueueSetFactory), createTimerQueue("__test_event_timers", timerSerializer, priorityQueueSetFactory), StreamTaskCancellationContext.alwaysRunning());
    }

    private static <K, N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerQueue(String str, TimerSerializer<K, N> timerSerializer, PriorityQueueSetFactory priorityQueueSetFactory) {
        return priorityQueueSetFactory.create(str, timerSerializer);
    }
}
