package org.apache.flink.connector.file.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriter.class */
public class FileWriter<IN> implements StatefulSinkWriter<IN, FileWriterBucketState>, CommittingSinkWriter<IN, FileSinkCommittable>, SinkWriter<IN>, ProcessingTimeService.ProcessingTimeCallback {
    private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
    private final Path basePath;
    private final FileWriterBucketFactory<IN> bucketFactory;
    private final BucketAssigner<IN, String> bucketAssigner;
    private final BucketWriter<IN, String> bucketWriter;
    private final RollingPolicy<IN, String> rollingPolicy;
    private final ProcessingTimeService processingTimeService;
    private final long bucketCheckInterval;
    private final OutputFileConfig outputFileConfig;
    private final Counter numRecordsOutCounter;
    private boolean endOfInput;
    private final Map<String, FileWriterBucket<IN>> activeBuckets = new HashMap();
    private final BucketerContext bucketerContext = new BucketerContext();

    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriter$BucketerContext.class */
    private static final class BucketerContext implements BucketAssigner.Context {

        @Nullable
        private Long elementTimestamp;
        private long currentWatermark;
        private long currentProcessingTime;

        private BucketerContext() {
            this.elementTimestamp = null;
            this.currentWatermark = Long.MIN_VALUE;
            this.currentProcessingTime = Long.MIN_VALUE;
        }

        void update(@Nullable Long l, long j, long j2) {
            this.elementTimestamp = l;
            this.currentWatermark = j;
            this.currentProcessingTime = j2;
        }

        public long currentProcessingTime() {
            return this.currentProcessingTime;
        }

        public long currentWatermark() {
            return this.currentWatermark;
        }

        @Nullable
        public Long timestamp() {
            return this.elementTimestamp;
        }
    }

    public FileWriter(Path path, SinkWriterMetricGroup sinkWriterMetricGroup, BucketAssigner<IN, String> bucketAssigner, FileWriterBucketFactory<IN> fileWriterBucketFactory, BucketWriter<IN, String> bucketWriter, RollingPolicy<IN, String> rollingPolicy, OutputFileConfig outputFileConfig, ProcessingTimeService processingTimeService, long j) {
        this.basePath = (Path) Preconditions.checkNotNull(path);
        this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
        this.bucketFactory = (FileWriterBucketFactory) Preconditions.checkNotNull(fileWriterBucketFactory);
        this.bucketWriter = (BucketWriter) Preconditions.checkNotNull(bucketWriter);
        this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
        this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
        this.numRecordsOutCounter = ((SinkWriterMetricGroup) Preconditions.checkNotNull(sinkWriterMetricGroup)).getIOMetricGroup().getNumRecordsOutCounter();
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        Preconditions.checkArgument(j > 0, "Bucket checking interval for processing time should be positive.");
        this.bucketCheckInterval = j;
    }

    public void initializeState(Collection<FileWriterBucketState> collection) throws IOException {
        Preconditions.checkNotNull(collection, "The retrieved state was null.");
        for (FileWriterBucketState fileWriterBucketState : collection) {
            String bucketId = fileWriterBucketState.getBucketId();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Restoring: {}", fileWriterBucketState);
            }
            updateActiveBucketId(bucketId, this.bucketFactory.restoreBucket(this.bucketWriter, this.rollingPolicy, fileWriterBucketState, this.outputFileConfig));
        }
        registerNextBucketInspectionTimer();
    }

    private void updateActiveBucketId(String str, FileWriterBucket<IN> fileWriterBucket) throws IOException {
        FileWriterBucket<IN> fileWriterBucket2 = this.activeBuckets.get(str);
        if (fileWriterBucket2 != null) {
            fileWriterBucket2.merge(fileWriterBucket);
        } else {
            this.activeBuckets.put(str, fileWriterBucket);
        }
    }

    public void write(IN in, SinkWriter.Context context) throws IOException, InterruptedException {
        this.bucketerContext.update(context.timestamp(), context.currentWatermark(), this.processingTimeService.getCurrentProcessingTime());
        getOrCreateBucketForBucketId((String) this.bucketAssigner.getBucketId(in, this.bucketerContext)).write(in, this.processingTimeService.getCurrentProcessingTime());
        this.numRecordsOutCounter.inc();
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        this.endOfInput = z;
    }

    public Collection<FileSinkCommittable> prepareCommit() throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, FileWriterBucket<IN>>> it = this.activeBuckets.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, FileWriterBucket<IN>> next = it.next();
            if (next.getValue().isActive()) {
                arrayList.addAll(next.getValue().prepareCommit(this.endOfInput));
            } else {
                it.remove();
            }
        }
        return arrayList;
    }

    public List<FileWriterBucketState> snapshotState(long j) throws IOException {
        Preconditions.checkState(this.bucketWriter != null, "sink has not been initialized");
        ArrayList arrayList = new ArrayList();
        Iterator<FileWriterBucket<IN>> it = this.activeBuckets.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().snapshotState());
        }
        return arrayList;
    }

    private FileWriterBucket<IN> getOrCreateBucketForBucketId(String str) throws IOException {
        FileWriterBucket<IN> fileWriterBucket = this.activeBuckets.get(str);
        if (fileWriterBucket == null) {
            fileWriterBucket = this.bucketFactory.getNewBucket(str, assembleBucketPath(str), this.bucketWriter, this.rollingPolicy, this.outputFileConfig);
            this.activeBuckets.put(str, fileWriterBucket);
        }
        return fileWriterBucket;
    }

    public void close() {
        if (this.activeBuckets != null) {
            this.activeBuckets.values().forEach((v0) -> {
                v0.disposePartFile();
            });
        }
    }

    private Path assembleBucketPath(String str) {
        return "".equals(str) ? this.basePath : new Path(this.basePath, str);
    }

    public void onProcessingTime(long j) throws IOException {
        Iterator<FileWriterBucket<IN>> it = this.activeBuckets.values().iterator();
        while (it.hasNext()) {
            it.next().onProcessingTime(j);
        }
        registerNextBucketInspectionTimer();
    }

    private void registerNextBucketInspectionTimer() {
        this.processingTimeService.registerTimer(this.processingTimeService.getCurrentProcessingTime() + this.bucketCheckInterval, this);
    }

    @VisibleForTesting
    Map<String, FileWriterBucket<IN>> getActiveBuckets() {
        return this.activeBuckets;
    }
}
