package org.apache.iceberg.spark.actions;

import java.io.IOException;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.FileRewriter;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.class */
public class RewriteDataFilesSparkAction extends BaseSnapshotUpdateSparkAction<RewriteDataFilesSparkAction> implements RewriteDataFiles {
    private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesSparkAction.class);
    private static final Set<String> VALID_OPTIONS = ImmutableSet.of("max-concurrent-file-group-rewrites", "max-file-group-size-bytes", "partial-progress.enabled", "partial-progress.max-commits", "target-file-size-bytes", RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "rewrite-job-order");
    private static final RewriteDataFiles.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
    private final Table table;
    private Expression filter;
    private int maxConcurrentFileGroupRewrites;
    private int maxCommits;
    private boolean partialProgressEnabled;
    private boolean useStartingSequenceNumber;
    private RewriteJobOrder rewriteJobOrder;
    private FileRewriter<FileScanTask, DataFile> rewriter;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction$RewriteExecutionContext.class */
    public static class RewriteExecutionContext {
        private final StructLikeMap<Integer> numGroupsByPartition;
        private final int totalGroupCount;
        private final Map<StructLike, Integer> partitionIndexMap = Maps.newConcurrentMap();
        private final AtomicInteger groupIndex = new AtomicInteger(1);

        RewriteExecutionContext(StructLikeMap<List<List<FileScanTask>>> structLikeMap) {
            this.numGroupsByPartition = structLikeMap.transformValues((v0) -> {
                return v0.size();
            });
            this.totalGroupCount = this.numGroupsByPartition.values().stream().reduce((v0, v1) -> {
                return Integer.sum(v0, v1);
            }).orElse(0).intValue();
        }

        public int currentGlobalIndex() {
            return this.groupIndex.getAndIncrement();
        }

        public int currentPartitionIndex(StructLike structLike) {
            return this.partitionIndexMap.merge(structLike, 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            }).intValue();
        }

        public int groupsInPartition(StructLike structLike) {
            return this.numGroupsByPartition.get(structLike).intValue();
        }

        public int totalGroupCount() {
            return this.totalGroupCount;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RewriteDataFilesSparkAction(SparkSession sparkSession, Table table) {
        super(sparkSession.cloneSession());
        this.filter = Expressions.alwaysTrue();
        this.rewriter = null;
        spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
        this.table = table;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.iceberg.spark.actions.BaseSparkAction
    public RewriteDataFilesSparkAction self() {
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteDataFiles
    public RewriteDataFilesSparkAction binPack() {
        Preconditions.checkArgument(this.rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkBinPackDataRewriter(spark(), this.table);
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteDataFiles
    public RewriteDataFilesSparkAction sort(SortOrder sortOrder) {
        Preconditions.checkArgument(this.rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkSortDataRewriter(spark(), this.table, sortOrder);
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteDataFiles
    public RewriteDataFilesSparkAction sort() {
        Preconditions.checkArgument(this.rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkSortDataRewriter(spark(), this.table);
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteDataFiles
    public RewriteDataFilesSparkAction zOrder(String... strArr) {
        Preconditions.checkArgument(this.rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
        this.rewriter = new SparkZOrderDataRewriter(spark(), this.table, Arrays.asList(strArr));
        return this;
    }

    @Override // org.apache.iceberg.actions.RewriteDataFiles
    public RewriteDataFilesSparkAction filter(Expression expression) {
        this.filter = Expressions.and(this.filter, expression);
        return this;
    }

    @Override // org.apache.iceberg.actions.Action
    public RewriteDataFiles.Result execute() {
        if (this.table.currentSnapshot() == null) {
            return EMPTY_RESULT;
        }
        long snapshotId = this.table.currentSnapshot().snapshotId();
        if (this.rewriter == null) {
            this.rewriter = new SparkBinPackDataRewriter(spark(), this.table);
        }
        validateAndInitOptions();
        StructLikeMap<List<List<FileScanTask>>> planFileGroups = planFileGroups(snapshotId);
        RewriteExecutionContext rewriteExecutionContext = new RewriteExecutionContext(planFileGroups);
        if (rewriteExecutionContext.totalGroupCount() == 0) {
            LOG.info("Nothing found to rewrite in {}", this.table.name());
            return EMPTY_RESULT;
        }
        Stream<RewriteFileGroup> groupStream = toGroupStream(rewriteExecutionContext, planFileGroups);
        return this.partialProgressEnabled ? doExecuteWithPartialProgress(rewriteExecutionContext, groupStream, commitManager(snapshotId)) : doExecute(rewriteExecutionContext, groupStream, commitManager(snapshotId));
    }

    StructLikeMap<List<List<FileScanTask>>> planFileGroups(long j) {
        CloseableIterable<FileScanTask> planFiles = this.table.newScan().useSnapshot(j).filter(this.filter).ignoreResiduals().planFiles();
        try {
            return fileGroupsByPartition(groupByPartition(this.table.spec().partitionType(), planFiles));
        } finally {
            try {
                planFiles.close();
            } catch (IOException e) {
                LOG.error("Cannot properly close file iterable while planning for rewrite", e);
            }
        }
    }

    private StructLikeMap<List<FileScanTask>> groupByPartition(Types.StructType structType, Iterable<FileScanTask> iterable) {
        StructLikeMap<List<FileScanTask>> create = StructLikeMap.create(structType);
        GenericRecord create2 = GenericRecord.create(structType);
        for (FileScanTask fileScanTask : iterable) {
            StructLike partition = fileScanTask.file().specId() == this.table.spec().specId() ? fileScanTask.file().partition() : create2;
            List<FileScanTask> list = create.get(partition);
            if (list == null) {
                list = Lists.newArrayList();
            }
            list.add(fileScanTask);
            create.put2(partition, (StructLike) list);
        }
        return create;
    }

    private StructLikeMap<List<List<FileScanTask>>> fileGroupsByPartition(StructLikeMap<List<FileScanTask>> structLikeMap) {
        return structLikeMap.transformValues(this::planFileGroups);
    }

    private List<List<FileScanTask>> planFileGroups(List<FileScanTask> list) {
        return ImmutableList.copyOf(this.rewriter.planFileGroups(list));
    }

    @VisibleForTesting
    RewriteFileGroup rewriteFiles(RewriteExecutionContext rewriteExecutionContext, RewriteFileGroup rewriteFileGroup) {
        String jobDesc = jobDesc(rewriteFileGroup, rewriteExecutionContext);
        rewriteFileGroup.setOutputFiles((Set) withJobGroupInfo(newJobGroupInfo("REWRITE-DATA-FILES", jobDesc), () -> {
            return this.rewriter.rewrite(rewriteFileGroup.fileScans());
        }));
        LOG.info("Rewrite Files Ready to be Committed - {}", jobDesc);
        return rewriteFileGroup;
    }

    private ExecutorService rewriteService() {
        return MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) Executors.newFixedThreadPool(this.maxConcurrentFileGroupRewrites, new ThreadFactoryBuilder().setNameFormat("Rewrite-Service-%d").build()));
    }

    @VisibleForTesting
    RewriteDataFilesCommitManager commitManager(long j) {
        return new RewriteDataFilesCommitManager(this.table, j, this.useStartingSequenceNumber);
    }

    private RewriteDataFiles.Result doExecute(RewriteExecutionContext rewriteExecutionContext, Stream<RewriteFileGroup> stream, RewriteDataFilesCommitManager rewriteDataFilesCommitManager) {
        ExecutorService rewriteService = rewriteService();
        ConcurrentLinkedQueue newConcurrentLinkedQueue = Queues.newConcurrentLinkedQueue();
        try {
            try {
                Tasks.foreach(stream).executeWith(rewriteService).stopOnFailure().noRetry().onFailure((rewriteFileGroup, exc) -> {
                    LOG.warn("Failure during rewrite process for group {}", rewriteFileGroup.info(), exc);
                }).run(rewriteFileGroup2 -> {
                    newConcurrentLinkedQueue.add(rewriteFiles(rewriteExecutionContext, rewriteFileGroup2));
                });
                rewriteService.shutdown();
                try {
                    rewriteDataFilesCommitManager.commitOrClean(Sets.newHashSet(newConcurrentLinkedQueue));
                    return ImmutableRewriteDataFiles.Result.builder().rewriteResults((List) newConcurrentLinkedQueue.stream().map((v0) -> {
                        return v0.asResult();
                    }).collect(Collectors.toList())).build();
                } catch (CommitFailedException | ValidationException e) {
                    throw new RuntimeException(String.format("Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of conflicts, set %s which will break up the rewrite into multiple smaller commits controlled by %s. Separate smaller rewrite commits can succeed independently while any commits that conflict with another Iceberg operation will be ignored. This mode will create additional snapshots in the table history, one for each commit.", "partial-progress.enabled", "partial-progress.max-commits"), e);
                }
            } catch (Exception e2) {
                LOG.error("Cannot complete rewrite, {} is not enabled and one of the file set groups failed to be rewritten. This error occurred during the writing of new files, not during the commit process. This indicates something is wrong that doesn't involve conflicts with other Iceberg operations. Enabling {} may help in this case but the root cause should be investigated. Cleaning up {} groups which finished being written.", new Object[]{"partial-progress.enabled", "partial-progress.enabled", Integer.valueOf(newConcurrentLinkedQueue.size()), e2});
                Tasks.Builder suppressFailureWhenFinished = Tasks.foreach(newConcurrentLinkedQueue).suppressFailureWhenFinished();
                rewriteDataFilesCommitManager.getClass();
                suppressFailureWhenFinished.run(rewriteDataFilesCommitManager::abortFileGroup);
                throw e2;
            }
        } catch (Throwable th) {
            rewriteService.shutdown();
            throw th;
        }
    }

    private RewriteDataFiles.Result doExecuteWithPartialProgress(RewriteExecutionContext rewriteExecutionContext, Stream<RewriteFileGroup> stream, RewriteDataFilesCommitManager rewriteDataFilesCommitManager) {
        ExecutorService rewriteService = rewriteService();
        RewriteDataFilesCommitManager.CommitService service = rewriteDataFilesCommitManager.service(IntMath.divide(rewriteExecutionContext.totalGroupCount(), this.maxCommits, RoundingMode.CEILING));
        service.start();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Tasks.foreach(stream).suppressFailureWhenFinished().executeWith(rewriteService).noRetry().onFailure((rewriteFileGroup, exc) -> {
            LOG.error("Failure during rewrite group {}", rewriteFileGroup.info(), exc);
            concurrentLinkedQueue.add(ImmutableRewriteDataFiles.FileGroupFailureResult.builder().info(rewriteFileGroup.info()).dataFilesCount(rewriteFileGroup.numFiles()).build());
        }).run(rewriteFileGroup2 -> {
            service.offer(rewriteFiles(rewriteExecutionContext, rewriteFileGroup2));
        });
        rewriteService.shutdown();
        service.close();
        List<RewriteFileGroup> results = service.results();
        if (results.size() == 0) {
            LOG.error("{} is true but no rewrite commits succeeded. Check the logs to determine why the individual commits failed. If this is persistent it may help to increase {} which will break the rewrite operation into smaller commits.", "partial-progress.enabled", "partial-progress.max-commits");
        }
        return ImmutableRewriteDataFiles.Result.builder().rewriteResults((List) results.stream().map((v0) -> {
            return v0.asResult();
        }).collect(Collectors.toList())).rewriteFailures(concurrentLinkedQueue).build();
    }

    Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext rewriteExecutionContext, Map<StructLike, List<List<FileScanTask>>> map) {
        return map.entrySet().stream().filter(entry -> {
            return ((List) entry.getValue()).size() != 0;
        }).flatMap(entry2 -> {
            StructLike structLike = (StructLike) entry2.getKey();
            return ((List) entry2.getValue()).stream().map(list -> {
                return newRewriteGroup(rewriteExecutionContext, structLike, list);
            });
        }).sorted(RewriteFileGroup.comparator(this.rewriteJobOrder));
    }

    private RewriteFileGroup newRewriteGroup(RewriteExecutionContext rewriteExecutionContext, StructLike structLike, List<FileScanTask> list) {
        int currentGlobalIndex = rewriteExecutionContext.currentGlobalIndex();
        return new RewriteFileGroup(ImmutableRewriteDataFiles.FileGroupInfo.builder().globalIndex(currentGlobalIndex).partitionIndex(rewriteExecutionContext.currentPartitionIndex(structLike)).partition(structLike).build(), list);
    }

    void validateAndInitOptions() {
        HashSet newHashSet = Sets.newHashSet(this.rewriter.validOptions());
        newHashSet.addAll(VALID_OPTIONS);
        HashSet newHashSet2 = Sets.newHashSet(options().keySet());
        newHashSet2.removeAll(newHashSet);
        Preconditions.checkArgument(newHashSet2.isEmpty(), "Cannot use options %s, they are not supported by the action or the rewriter %s", newHashSet2, this.rewriter.description());
        this.rewriter.init(options());
        this.maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(), "max-concurrent-file-group-rewrites", 5);
        this.maxCommits = PropertyUtil.propertyAsInt(options(), "partial-progress.max-commits", 10);
        this.partialProgressEnabled = PropertyUtil.propertyAsBoolean(options(), "partial-progress.enabled", false);
        this.useStartingSequenceNumber = PropertyUtil.propertyAsBoolean(options(), RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, true);
        this.rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(), "rewrite-job-order", REWRITE_JOB_ORDER_DEFAULT));
        Preconditions.checkArgument(this.maxConcurrentFileGroupRewrites >= 1, "Cannot set %s to %s, the value must be positive.", (Object) "max-concurrent-file-group-rewrites", this.maxConcurrentFileGroupRewrites);
        Preconditions.checkArgument(!this.partialProgressEnabled || this.maxCommits > 0, "Cannot set %s to %s, the value must be positive when %s is true", "partial-progress.max-commits", Integer.valueOf(this.maxCommits), "partial-progress.enabled");
    }

    private String jobDesc(RewriteFileGroup rewriteFileGroup, RewriteExecutionContext rewriteExecutionContext) {
        StructLike partition = rewriteFileGroup.info().partition();
        return partition.size() > 0 ? String.format("Rewriting %d files (%s, file group %d/%d, %s (%d/%d)) in %s", Integer.valueOf(rewriteFileGroup.rewrittenFiles().size()), this.rewriter.description(), Integer.valueOf(rewriteFileGroup.info().globalIndex()), Integer.valueOf(rewriteExecutionContext.totalGroupCount()), partition, Integer.valueOf(rewriteFileGroup.info().partitionIndex()), Integer.valueOf(rewriteExecutionContext.groupsInPartition(partition)), this.table.name()) : String.format("Rewriting %d files (%s, file group %d/%d) in %s", Integer.valueOf(rewriteFileGroup.rewrittenFiles().size()), this.rewriter.description(), Integer.valueOf(rewriteFileGroup.info().globalIndex()), Integer.valueOf(rewriteExecutionContext.totalGroupCount()), this.table.name());
    }
}
