package org.apache.flink.cdc.runtime.partitioning;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.class */
public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEvent> implements OneInputStreamOperator<Event, PartitioningEvent> {
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
    private final OperatorID schemaOperatorId;
    private final int downstreamParallelism;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private transient LoadingCache<TableId, HashFunction> cachedHashFunctions;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator$HashFunction.class */
    public static class HashFunction implements Function<DataChangeEvent, Integer> {
        private final List<RecordData.FieldGetter> primaryKeyGetters;

        public HashFunction(Schema schema) {
            this.primaryKeyGetters = createFieldGetters(schema);
        }

        @Override // java.util.function.Function
        public Integer apply(DataChangeEvent dataChangeEvent) {
            ArrayList arrayList = new ArrayList();
            TableId tableId = dataChangeEvent.tableId();
            Optional ofNullable = Optional.ofNullable(tableId.getNamespace());
            arrayList.getClass();
            ofNullable.ifPresent((v1) -> {
                r1.add(v1);
            });
            Optional ofNullable2 = Optional.ofNullable(tableId.getSchemaName());
            arrayList.getClass();
            ofNullable2.ifPresent((v1) -> {
                r1.add(v1);
            });
            arrayList.add(tableId.getTableName());
            RecordData before = dataChangeEvent.op().equals(OperationType.DELETE) ? dataChangeEvent.before() : dataChangeEvent.after();
            Iterator<RecordData.FieldGetter> it = this.primaryKeyGetters.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getFieldOrNull(before));
            }
            return Integer.valueOf((Objects.hash(arrayList.toArray()) * 31) & Integer.MAX_VALUE);
        }

        private List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
            ArrayList arrayList = new ArrayList(schema.primaryKeys().size());
            for (int i : schema.primaryKeys().stream().mapToInt(str -> {
                int i2 = 0;
                while (!((Column) schema.getColumns().get(i2)).getName().equals(str)) {
                    i2++;
                }
                if (i2 >= schema.getColumnCount()) {
                    throw new IllegalStateException(String.format("Unable to find column \"%s\" which is defined as primary key", str));
                }
                return i2;
            }).toArray()) {
                arrayList.add(RecordData.createFieldGetter(((Column) schema.getColumns().get(i)).getType(), i));
            }
            return arrayList;
        }
    }

    public PrePartitionOperator(OperatorID operatorID, int i) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.schemaOperatorId = operatorID;
        this.downstreamParallelism = i;
    }

    public void open() throws Exception {
        super.open();
        this.schemaEvolutionClient = new SchemaEvolutionClient(getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway(), this.schemaOperatorId);
        this.cachedHashFunctions = createCache();
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        SchemaChangeEvent schemaChangeEvent = (Event) streamRecord.getValue();
        if (schemaChangeEvent instanceof SchemaChangeEvent) {
            TableId tableId = schemaChangeEvent.tableId();
            this.cachedHashFunctions.put(tableId, recreateHashFunction(tableId));
            broadcastEvent(schemaChangeEvent);
        } else if (schemaChangeEvent instanceof FlushEvent) {
            broadcastEvent(schemaChangeEvent);
        } else if (schemaChangeEvent instanceof DataChangeEvent) {
            partitionBy((DataChangeEvent) schemaChangeEvent);
        }
    }

    private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception {
        this.output.collect(new StreamRecord(new PartitioningEvent(dataChangeEvent, ((HashFunction) this.cachedHashFunctions.get(dataChangeEvent.tableId())).apply(dataChangeEvent).intValue() % this.downstreamParallelism)));
    }

    private void broadcastEvent(Event event) {
        for (int i = 0; i < this.downstreamParallelism; i++) {
            this.output.collect(new StreamRecord(new PartitioningEvent(event, i)));
        }
    }

    private Schema loadLatestSchemaFromRegistry(TableId tableId) {
        try {
            Optional<Schema> latestSchema = this.schemaEvolutionClient.getLatestSchema(tableId);
            if (latestSchema.isPresent()) {
                return latestSchema.get();
            }
            throw new IllegalStateException(String.format("Schema is never registered or outdated for table \"%s\"", tableId));
        } catch (Exception e) {
            throw new RuntimeException(String.format("Failed to request latest schema for table \"%s\"", tableId), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HashFunction recreateHashFunction(TableId tableId) {
        return new HashFunction(loadLatestSchemaFromRegistry(tableId));
    }

    private LoadingCache<TableId, HashFunction> createCache() {
        return CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build(new CacheLoader<TableId, HashFunction>() { // from class: org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.1
            public HashFunction load(TableId tableId) {
                return PrePartitionOperator.this.recreateHashFunction(tableId);
            }
        });
    }
}
