package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitRetrierTest.class */
class CommitRetrierTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitRetrierTest$CommitterHandlerWithRetries.class */
    private static class CommitterHandlerWithRetries extends ForwardCommittingHandler<String> {
        private AtomicInteger retriesNeeded;

        private CommitterHandlerWithRetries() {
            this.retriesNeeded = new AtomicInteger(0);
        }

        void addRetries(int i) {
            this.retriesNeeded.addAndGet(i);
        }

        int getPendingRetries() {
            return this.retriesNeeded.get();
        }

        public boolean needsRetry() {
            return getPendingRetries() > 0;
        }

        public void retry() throws IOException, InterruptedException {
            this.retriesNeeded.decrementAndGet();
        }
    }

    CommitRetrierTest() {
    }

    @Test
    void testRetry() throws Exception {
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        CommitterHandlerWithRetries committerHandlerWithRetries = new CommitterHandlerWithRetries();
        CommitRetrier commitRetrier = new CommitRetrier(testProcessingTimeService, committerHandlerWithRetries);
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(false));
        committerHandlerWithRetries.addRetries(2);
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(Boolean.valueOf(commitRetrier.retry(0L)), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(Boolean.valueOf(commitRetrier.retry(1L)), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(Boolean.valueOf(commitRetrier.retry(1L)), CoreMatchers.equalTo(false));
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(0));
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(false));
    }

    @Test
    void testInfiniteRetry() throws Exception {
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        CommitterHandlerWithRetries committerHandlerWithRetries = new CommitterHandlerWithRetries();
        CommitRetrier commitRetrier = new CommitRetrier(testProcessingTimeService, committerHandlerWithRetries);
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(false));
        committerHandlerWithRetries.addRetries(2);
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(2));
        commitRetrier.retryIndefinitely();
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(0));
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(false));
    }

    @Test
    void testTimedRetry() throws Exception {
        TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
        ManualClock manualClock = new ManualClock();
        testProcessingTimeService.setCurrentTime(manualClock.absoluteTimeMillis());
        CommitterHandlerWithRetries committerHandlerWithRetries = new CommitterHandlerWithRetries();
        CommitRetrier commitRetrier = new CommitRetrier(testProcessingTimeService, committerHandlerWithRetries, manualClock);
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(false));
        committerHandlerWithRetries.addRetries(2);
        commitRetrier.retryWithDelay();
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(true));
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(2));
        testProcessingTimeService.advance(1000L);
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(1));
        testProcessingTimeService.advance(1000L);
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(0));
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(false));
        testProcessingTimeService.advance(1000L);
        MatcherAssert.assertThat(Integer.valueOf(committerHandlerWithRetries.getPendingRetries()), CoreMatchers.equalTo(0));
        MatcherAssert.assertThat(Boolean.valueOf(committerHandlerWithRetries.needsRetry()), CoreMatchers.equalTo(false));
    }
}
