package org.apache.flink.connector.file.table.stream;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/table/stream/StreamingFileWriter.class */
public class StreamingFileWriter<IN> extends AbstractStreamingWriter<IN, PartitionCommitInfo> {
    private static final long serialVersionUID = 2;
    private final List<String> partitionKeys;
    private final Configuration conf;
    private transient Set<String> currentNewPartitions;
    private transient TreeMap<Long, Set<String>> newPartitions;
    private transient Set<String> committablePartitions;
    private transient Map<String, Long> inProgressPartitions;
    private transient PartitionCommitPredicate partitionCommitPredicate;

    public StreamingFileWriter(long j, StreamingFileSink.BucketsBuilder<IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder, List<String> list, Configuration configuration) {
        super(j, bucketsBuilder);
        this.partitionKeys = list;
        this.conf = configuration;
    }

    @Override // org.apache.flink.connector.file.table.stream.AbstractStreamingWriter
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        if (isPartitionCommitTriggerEnabled()) {
            this.partitionCommitPredicate = PartitionCommitPredicate.create(this.conf, getUserCodeClassloader(), this.partitionKeys);
        }
        this.currentNewPartitions = new HashSet();
        this.newPartitions = new TreeMap<>();
        this.committablePartitions = new HashSet();
        this.inProgressPartitions = new HashMap();
        super.initializeState(stateInitializationContext);
    }

    @Override // org.apache.flink.connector.file.table.stream.AbstractStreamingWriter
    protected void partitionCreated(String str) {
        this.currentNewPartitions.add(str);
        this.inProgressPartitions.putIfAbsent(str, Long.valueOf(getProcessingTimeService().getCurrentProcessingTime()));
    }

    @Override // org.apache.flink.connector.file.table.stream.AbstractStreamingWriter
    protected void partitionInactive(String str) {
        this.committablePartitions.add(str);
        this.inProgressPartitions.remove(str);
    }

    @Override // org.apache.flink.connector.file.table.stream.AbstractStreamingWriter
    protected void onPartFileOpened(String str, Path path) {
    }

    @Override // org.apache.flink.connector.file.table.stream.AbstractStreamingWriter
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        closePartFileForPartitions();
        super.snapshotState(stateSnapshotContext);
        this.newPartitions.put(Long.valueOf(stateSnapshotContext.getCheckpointId()), new HashSet(this.currentNewPartitions));
        this.currentNewPartitions.clear();
    }

    private boolean isPartitionCommitTriggerEnabled() {
        return this.partitionKeys.size() > 0 && this.conf.contains(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND);
    }

    private void closePartFileForPartitions() throws Exception {
        if (this.partitionCommitPredicate != null) {
            Iterator<Map.Entry<String, Long>> it = this.inProgressPartitions.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, Long> next = it.next();
                String key = next.getKey();
                if (this.partitionCommitPredicate.isPartitionCommittable(PartitionCommitPredicate.createPredicateContext(key, next.getValue().longValue(), this.processingTimeService.getCurrentProcessingTime(), this.currentWatermark))) {
                    this.buckets.closePartFileForBucket(key);
                    it.remove();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.connector.file.table.stream.AbstractStreamingWriter
    public void commitUpToCheckpoint(long j) throws Exception {
        super.commitUpToCheckpoint(j);
        NavigableMap<Long, Set<String>> headMap = this.newPartitions.headMap(Long.valueOf(j), true);
        HashSet hashSet = new HashSet(this.committablePartitions);
        this.committablePartitions.clear();
        Collection<Set<String>> values = headMap.values();
        hashSet.getClass();
        values.forEach((v1) -> {
            r1.addAll(v1);
        });
        headMap.clear();
        this.output.collect(new StreamRecord(new PartitionCommitInfo(j, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), new ArrayList(hashSet))));
    }
}
