package org.apache.flink.cdc.runtime.operators.schema;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.class */
public class SchemaOperator extends AbstractStreamOperator<Event> implements OneInputStreamOperator<Event, Event> {
    private final List<Tuple2<String, TableId>> routingRules;
    private transient List<Tuple2<Selectors, TableId>> routes;
    private transient TaskOperatorEventGateway toCoordinator;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private transient LoadingCache<TableId, Schema> cachedSchemas;
    private final long rpcTimeOutInMillis;
    private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
    private static final long serialVersionUID = 1;
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(serialVersionUID);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.cdc.runtime.operators.schema.SchemaOperator$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/SchemaOperator$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$cdc$common$event$OperationType = new int[OperationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$cdc$common$event$OperationType[OperationType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$cdc$common$event$OperationType[OperationType.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$cdc$common$event$OperationType[OperationType.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$cdc$common$event$OperationType[OperationType.REPLACE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/SchemaOperator$NullFieldGetter.class */
    public static class NullFieldGetter implements RecordData.FieldGetter {
        private NullFieldGetter() {
        }

        @Nullable
        public Object getFieldOrNull(RecordData recordData) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/SchemaOperator$TypeCoercionFieldGetter.class */
    public static class TypeCoercionFieldGetter implements RecordData.FieldGetter {
        private final DataType destinationType;
        private final RecordData.FieldGetter originalFieldGetter;

        public TypeCoercionFieldGetter(DataType dataType, RecordData.FieldGetter fieldGetter) {
            this.destinationType = dataType;
            this.originalFieldGetter = fieldGetter;
        }

        @Nullable
        public Object getFieldOrNull(RecordData recordData) {
            Object fieldOrNull = this.originalFieldGetter.getFieldOrNull(recordData);
            if (fieldOrNull == null) {
                return null;
            }
            if (this.destinationType.is(DataTypeRoot.BIGINT)) {
                if (fieldOrNull instanceof Byte) {
                    return Long.valueOf(((Byte) fieldOrNull).longValue());
                }
                if (fieldOrNull instanceof Short) {
                    return Long.valueOf(((Short) fieldOrNull).longValue());
                }
                if (fieldOrNull instanceof Integer) {
                    return Long.valueOf(((Integer) fieldOrNull).longValue());
                }
                throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a BIGINT column. Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column", fieldOrNull.getClass()));
            }
            if (this.destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
                if (fieldOrNull instanceof Float) {
                    return Double.valueOf(((Float) fieldOrNull).doubleValue());
                }
                throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a DOUBLE column. Currently only FLOAT can be accepted by a DOUBLE column", fieldOrNull.getClass()));
            }
            if (!this.destinationType.is(DataTypeRoot.VARCHAR)) {
                throw new IllegalArgumentException(String.format("Column type \"%s\" doesn't support type coercion", this.destinationType));
            }
            if (fieldOrNull instanceof StringData) {
                return fieldOrNull;
            }
            throw new IllegalArgumentException(String.format("Cannot fit type \"%s\" into a STRING column. Currently only CHAR / VARCHAR can be accepted by a STRING column", fieldOrNull.getClass()));
        }
    }

    public SchemaOperator(List<Tuple2<String, TableId>> list) {
        this.routingRules = list;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeOutInMillis = PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
    }

    public SchemaOperator(List<Tuple2<String, TableId>> list, Duration duration) {
        this.routingRules = list;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.rpcTimeOutInMillis = duration.toMillis();
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Event>> output) {
        super.setup(streamTask, streamConfig, output);
        this.toCoordinator = streamTask.getEnvironment().getOperatorCoordinatorEventGateway();
        this.routes = (List) this.routingRules.stream().map(tuple2 -> {
            String str = (String) tuple2.f0;
            return new Tuple2(new Selectors.SelectorsBuilder().includeTables(str).build(), (TableId) tuple2.f1);
        }).collect(Collectors.toList());
        this.schemaEvolutionClient = new SchemaEvolutionClient(this.toCoordinator, getOperatorID());
        this.cachedSchemas = CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build(new CacheLoader<TableId, Schema>() { // from class: org.apache.flink.cdc.runtime.operators.schema.SchemaOperator.1
            public Schema load(TableId tableId) {
                return SchemaOperator.this.getLatestSchema(tableId);
            }
        });
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        if (stateInitializationContext.isRestored() && getRuntimeContext().getIndexOfThisSubtask() == 0) {
            sendRequestToCoordinator(new RefreshPendingListsRequest());
        }
    }

    public void processElement(StreamRecord<Event> streamRecord) throws InterruptedException, TimeoutException {
        SchemaChangeEvent schemaChangeEvent = (Event) streamRecord.getValue();
        if (schemaChangeEvent instanceof SchemaChangeEvent) {
            TableId tableId = schemaChangeEvent.tableId();
            LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", tableId.toString());
            handleSchemaChangeEvent(tableId, schemaChangeEvent);
            this.cachedSchemas.put(tableId, getLatestSchema(tableId));
            getRoutedTable(tableId).ifPresent(tableId2 -> {
                this.cachedSchemas.put(tableId2, getLatestSchema(tableId2));
            });
            return;
        }
        DataChangeEvent dataChangeEvent = (DataChangeEvent) schemaChangeEvent;
        Optional<TableId> routedTable = getRoutedTable(dataChangeEvent.tableId());
        if (routedTable.isPresent()) {
            this.output.collect(new StreamRecord(maybeFillInNullForEmptyColumns(dataChangeEvent, routedTable.get())));
        } else {
            this.output.collect(streamRecord);
        }
    }

    private DataChangeEvent maybeFillInNullForEmptyColumns(DataChangeEvent dataChangeEvent, TableId tableId) {
        try {
            Schema schema = (Schema) this.cachedSchemas.get(dataChangeEvent.tableId());
            Schema schema2 = (Schema) this.cachedSchemas.get(tableId);
            if (schema.equals(schema2)) {
                return ChangeEventUtils.recreateDataChangeEvent(dataChangeEvent, tableId);
            }
            switch (AnonymousClass2.$SwitchMap$org$apache$flink$cdc$common$event$OperationType[dataChangeEvent.op().ordinal()]) {
                case 1:
                    return DataChangeEvent.insertEvent(tableId, regenerateRecordData(dataChangeEvent.after(), schema, schema2), dataChangeEvent.meta());
                case 2:
                    return DataChangeEvent.updateEvent(tableId, regenerateRecordData(dataChangeEvent.before(), schema, schema2), regenerateRecordData(dataChangeEvent.after(), schema, schema2), dataChangeEvent.meta());
                case 3:
                    return DataChangeEvent.deleteEvent(tableId, regenerateRecordData(dataChangeEvent.before(), schema, schema2), dataChangeEvent.meta());
                case 4:
                    return DataChangeEvent.replaceEvent(tableId, regenerateRecordData(dataChangeEvent.after(), schema, schema2), dataChangeEvent.meta());
                default:
                    throw new IllegalArgumentException(String.format("Unrecognized operation type \"%s\"", dataChangeEvent.op()));
            }
        } catch (Exception e) {
            throw new IllegalStateException("Unable to fill null for empty columns", e);
        }
    }

    private RecordData regenerateRecordData(RecordData recordData, Schema schema, Schema schema2) {
        ArrayList arrayList = new ArrayList();
        for (Column column : schema2.getColumns()) {
            String name = column.getName();
            int indexOf = schema.getColumnNames().indexOf(name);
            if (indexOf == -1) {
                arrayList.add(new NullFieldGetter());
            } else {
                RecordData.FieldGetter createFieldGetter = RecordData.createFieldGetter(((Column) schema.getColumn(name).get()).getType(), indexOf);
                if (((Column) schema.getColumn(name).get()).getType().equals(column.getType())) {
                    arrayList.add(createFieldGetter);
                } else {
                    arrayList.add(new TypeCoercionFieldGetter(column.getType(), createFieldGetter));
                }
            }
        }
        return new BinaryRecordDataGenerator((DataType[]) schema2.getColumnDataTypes().toArray(new DataType[0])).generate(arrayList.stream().map(fieldGetter -> {
            return fieldGetter.getFieldOrNull(recordData);
        }).toArray());
    }

    private Optional<TableId> getRoutedTable(TableId tableId) {
        for (Tuple2<Selectors, TableId> tuple2 : this.routes) {
            if (((Selectors) tuple2.f0).isMatch(tableId)) {
                return Optional.of(tuple2.f1);
            }
        }
        return Optional.empty();
    }

    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException {
        SchemaChangeResponse requestSchemaChange = requestSchemaChange(tableId, schemaChangeEvent);
        if (requestSchemaChange.getSchemaChangeEvents().isEmpty()) {
            return;
        }
        LOG.info("Sending the FlushEvent for table {} in subtask {}.", tableId, Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        this.output.collect(new StreamRecord(new FlushEvent(tableId)));
        requestSchemaChange.getSchemaChangeEvents().forEach(schemaChangeEvent2 -> {
            this.output.collect(new StreamRecord(schemaChangeEvent2));
        });
        requestReleaseUpstream();
    }

    private SchemaChangeResponse requestSchemaChange(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
        return (SchemaChangeResponse) sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
    }

    private void requestReleaseUpstream() throws InterruptedException, TimeoutException {
        CoordinationResponse sendRequestToCoordinator = sendRequestToCoordinator(new ReleaseUpstreamRequest());
        long currentTimeMillis = System.currentTimeMillis() + this.rpcTimeOutInMillis;
        while (sendRequestToCoordinator instanceof SchemaChangeProcessingResponse) {
            if (System.currentTimeMillis() >= currentTimeMillis) {
                throw new TimeoutException("TimeOut when requesting release upstream");
            }
            Thread.sleep(1000L);
            sendRequestToCoordinator = sendRequestToCoordinator(new SchemaChangeResultRequest());
        }
    }

    private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse> RESPONSE sendRequestToCoordinator(REQUEST request) {
        try {
            return (RESPONSE) CoordinationResponseUtils.unwrap((CoordinationResponse) this.toCoordinator.sendRequestToCoordinator(getOperatorID(), new SerializedValue(request)).get());
        } catch (Exception e) {
            throw new IllegalStateException("Failed to send request to coordinator: " + request.toString(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Schema getLatestSchema(TableId tableId) {
        try {
            Optional<Schema> latestSchema = this.schemaEvolutionClient.getLatestSchema(tableId);
            if (latestSchema.isPresent()) {
                return latestSchema.get();
            }
            throw new IllegalStateException(String.format("Schema doesn't exist for table \"%s\"", tableId));
        } catch (Exception e) {
            throw new IllegalStateException(String.format("Unable to get latest schema for table \"%s\"", tableId));
        }
    }
}
