/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.test.state.BackendSwitchSpecs;
import org.apache.flink.util.InstantiationUtil;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public abstract class SavepointStateBackendSwitchTestBase {
    private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1);
    private static final int NUM_KEY_GROUPS = KEY_GROUP_RANGE.getNumberOfKeyGroups();
    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    private final BackendSwitchSpecs.BackendSwitchSpec fromBackend;
    private final BackendSwitchSpecs.BackendSwitchSpec toBackend;

    protected SavepointStateBackendSwitchTestBase(BackendSwitchSpecs.BackendSwitchSpec fromBackend, BackendSwitchSpecs.BackendSwitchSpec toBackend) {
        this.fromBackend = fromBackend;
        this.toBackend = toBackend;
    }

    @Test
    public void switchStateBackend() throws Exception {
        SnapshotResult stateHandles;
        File pathToWrite = tempFolder.newFile();
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("my-map-state", Long.class, Long.class);
        mapStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("my-value-state", Long.class);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("my-list-state", Long.class);
        listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        Integer namespace1 = 1;
        Integer namespace2 = 2;
        Integer namespace3 = 3;
        Integer namespace4 = 4;
        try (CheckpointableKeyedStateBackend<String> keyedBackend = this.fromBackend.createBackend(KEY_GROUP_RANGE, NUM_KEY_GROUPS, Collections.emptyList());){
            this.takeSavepoint(keyedBackend, pathToWrite, (MapStateDescriptor<Long, Long>)mapStateDescriptor, (ValueStateDescriptor<Long>)valueStateDescriptor, (ListStateDescriptor<Long>)listStateDescriptor, namespace1, namespace2, namespace3, namespace4);
        }
        try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(pathToWrite));){
            stateHandles = (SnapshotResult)InstantiationUtil.deserializeObject((InputStream)bis, (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        KeyedStateHandle stateHandle = (KeyedStateHandle)stateHandles.getJobManagerOwnedSnapshot();
        try (CheckpointableKeyedStateBackend<String> keyedBackend = this.toBackend.createBackend(KEY_GROUP_RANGE, NUM_KEY_GROUPS, (Collection<KeyedStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));){
            this.verifyRestoredState((MapStateDescriptor<Long, Long>)mapStateDescriptor, (ValueStateDescriptor<Long>)valueStateDescriptor, (ListStateDescriptor<Long>)listStateDescriptor, namespace1, namespace2, namespace3, namespace4, keyedBackend);
        }
    }

    private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
        int i = 0;
        Iterator itt = mapState.iterator();
        while (itt.hasNext()) {
            ++i;
            itt.next();
        }
        return i;
    }

    private void takeSavepoint(CheckpointableKeyedStateBackend<String> keyedBackend, File pathToWrite, MapStateDescriptor<Long, Long> stateDescr, ValueStateDescriptor<Long> valueStateDescriptor, ListStateDescriptor<Long> listStateDescriptor, Integer namespace1, Integer namespace2, Integer namespace3, Integer namespace4) throws Exception {
        InternalMapState mapState = (InternalMapState)keyedBackend.createInternalState((TypeSerializer)IntSerializer.INSTANCE, stateDescr);
        InternalValueState valueState = (InternalValueState)keyedBackend.createInternalState((TypeSerializer)IntSerializer.INSTANCE, valueStateDescriptor);
        InternalListState listState = (InternalListState)keyedBackend.createInternalState((TypeSerializer)IntSerializer.INSTANCE, listStateDescriptor);
        keyedBackend.setCurrentKey((Object)"abc");
        mapState.setCurrentNamespace((Object)namespace1);
        mapState.put((Object)33L, (Object)33L);
        mapState.put((Object)55L, (Object)55L);
        mapState.setCurrentNamespace((Object)namespace2);
        mapState.put((Object)22L, (Object)22L);
        mapState.put((Object)11L, (Object)11L);
        listState.setCurrentNamespace((Object)namespace2);
        listState.add((Object)4L);
        listState.add((Object)5L);
        listState.add((Object)6L);
        mapState.setCurrentNamespace((Object)namespace3);
        mapState.put((Object)44L, (Object)44L);
        keyedBackend.setCurrentKey((Object)"mno");
        mapState.setCurrentNamespace((Object)namespace3);
        mapState.put((Object)11L, (Object)11L);
        mapState.put((Object)22L, (Object)22L);
        mapState.put((Object)33L, (Object)33L);
        mapState.put((Object)44L, (Object)44L);
        mapState.put((Object)55L, (Object)55L);
        valueState.setCurrentNamespace((Object)namespace3);
        valueState.update((Object)1239L);
        listState.setCurrentNamespace((Object)namespace3);
        listState.add((Object)1L);
        listState.add((Object)2L);
        listState.add((Object)3L);
        mapState.setCurrentNamespace((Object)namespace4);
        mapState.put((Object)1L, (Object)1L);
        Iterator iterator = mapState.iterator();
        while (iterator.hasNext()) {
            iterator.next();
            iterator.remove();
        }
        KeyGroupedInternalPriorityQueue priorityQueue = keyedBackend.create("event-time", (TypeSerializer)new TimerSerializer(keyedBackend.getKeySerializer(), (TypeSerializer)IntSerializer.INSTANCE));
        priorityQueue.add((Object)new TimerHeapInternalTimer(1234L, (Object)"mno", (Object)namespace3));
        priorityQueue.add((Object)new TimerHeapInternalTimer(2345L, (Object)"mno", (Object)namespace2));
        priorityQueue.add((Object)new TimerHeapInternalTimer(3456L, (Object)"mno", (Object)namespace3));
        SnapshotStrategyRunner savepointRunner = StreamOperatorStateHandler.prepareCanonicalSavepoint(keyedBackend, (CloseableRegistry)new CloseableRegistry());
        RunnableFuture snapshot = savepointRunner.snapshot(0L, 0L, (CheckpointStreamFactory)new MemCheckpointStreamFactory(0x400000), new CheckpointOptions((SnapshotType)SavepointType.savepoint((SavepointFormatType)SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
        snapshot.run();
        try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite));){
            InstantiationUtil.serializeObject((OutputStream)bis, snapshot.get());
        }
    }

    private void verifyRestoredState(MapStateDescriptor<Long, Long> mapStateDescriptor, ValueStateDescriptor<Long> valueStateDescriptor, ListStateDescriptor<Long> listStateDescriptor, Integer namespace1, Integer namespace2, Integer namespace3, Integer namespace4, CheckpointableKeyedStateBackend<String> keyedBackend) throws Exception {
        InternalMapState mapState = (InternalMapState)keyedBackend.createInternalState((TypeSerializer)IntSerializer.INSTANCE, mapStateDescriptor);
        InternalValueState valueState = (InternalValueState)keyedBackend.createInternalState((TypeSerializer)IntSerializer.INSTANCE, valueStateDescriptor);
        InternalListState listState = (InternalListState)keyedBackend.createInternalState((TypeSerializer)IntSerializer.INSTANCE, listStateDescriptor);
        keyedBackend.setCurrentKey((Object)"abc");
        mapState.setCurrentNamespace((Object)namespace1);
        Assert.assertEquals((long)33L, (long)((Long)mapState.get((Object)33L)));
        Assert.assertEquals((long)55L, (long)((Long)mapState.get((Object)55L)));
        Assert.assertEquals((long)2L, (long)this.getStateSize(mapState));
        mapState.setCurrentNamespace((Object)namespace2);
        Assert.assertEquals((long)22L, (long)((Long)mapState.get((Object)22L)));
        Assert.assertEquals((long)11L, (long)((Long)mapState.get((Object)11L)));
        Assert.assertEquals((long)2L, (long)this.getStateSize(mapState));
        listState.setCurrentNamespace((Object)namespace2);
        Assert.assertThat((Object)listState.get(), (Matcher)Matchers.contains((Object[])new Long[]{4L, 5L, 6L}));
        mapState.setCurrentNamespace((Object)namespace3);
        Assert.assertEquals((long)44L, (long)((Long)mapState.get((Object)44L)));
        Assert.assertEquals((long)1L, (long)this.getStateSize(mapState));
        keyedBackend.setCurrentKey((Object)"mno");
        mapState.setCurrentNamespace((Object)namespace3);
        Assert.assertEquals((long)11L, (long)((Long)mapState.get((Object)11L)));
        Assert.assertEquals((long)22L, (long)((Long)mapState.get((Object)22L)));
        Assert.assertEquals((long)33L, (long)((Long)mapState.get((Object)33L)));
        Assert.assertEquals((long)44L, (long)((Long)mapState.get((Object)44L)));
        Assert.assertEquals((long)55L, (long)((Long)mapState.get((Object)55L)));
        Assert.assertEquals((long)5L, (long)this.getStateSize(mapState));
        valueState.setCurrentNamespace((Object)namespace3);
        Assert.assertEquals((long)1239L, (long)((Long)valueState.value()));
        listState.setCurrentNamespace((Object)namespace3);
        Assert.assertThat((Object)listState.get(), (Matcher)Matchers.contains((Object[])new Long[]{1L, 2L, 3L}));
        mapState.setCurrentNamespace((Object)namespace4);
        Assert.assertThat((Object)mapState.isEmpty(), (Matcher)CoreMatchers.is((Object)true));
        KeyGroupedInternalPriorityQueue priorityQueue = keyedBackend.create("event-time", (TypeSerializer)new TimerSerializer(keyedBackend.getKeySerializer(), (TypeSerializer)IntSerializer.INSTANCE));
        Assert.assertThat((Object)priorityQueue.size(), (Matcher)CoreMatchers.equalTo((Object)3));
        Assert.assertThat((Object)priorityQueue.poll(), (Matcher)CoreMatchers.equalTo((Object)new TimerHeapInternalTimer(1234L, (Object)"mno", (Object)namespace3)));
        Assert.assertThat((Object)priorityQueue.poll(), (Matcher)CoreMatchers.equalTo((Object)new TimerHeapInternalTimer(2345L, (Object)"mno", (Object)namespace2)));
        Assert.assertThat((Object)priorityQueue.poll(), (Matcher)CoreMatchers.equalTo((Object)new TimerHeapInternalTimer(3456L, (Object)"mno", (Object)namespace3)));
    }
}

