package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.class */
public class DetectNewPartitionsAction {
    private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class);
    private final PartitionMetadataDao dao;
    private final PartitionMetadataMapper mapper;
    private final ChangeStreamMetrics metrics;
    private final Duration resumeDuration;

    public DetectNewPartitionsAction(PartitionMetadataDao partitionMetadataDao, PartitionMetadataMapper partitionMetadataMapper, ChangeStreamMetrics changeStreamMetrics, Duration duration) {
        this.dao = partitionMetadataDao;
        this.mapper = partitionMetadataMapper;
        this.metrics = changeStreamMetrics;
        this.resumeDuration = duration;
    }

    public DoFn.ProcessContinuation run(RestrictionTracker<TimestampRange, Timestamp> restrictionTracker, DoFn.OutputReceiver<PartitionMetadata> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator) {
        Timestamp from = ((TimestampRange) restrictionTracker.currentRestriction()).getFrom();
        Timestamp unfinishedMinWatermark = this.dao.getUnfinishedMinWatermark();
        return unfinishedMinWatermark != null ? processPartitions(restrictionTracker, outputReceiver, manualWatermarkEstimator, unfinishedMinWatermark, from) : terminate(restrictionTracker);
    }

    private DoFn.ProcessContinuation processPartitions(RestrictionTracker<TimestampRange, Timestamp> restrictionTracker, DoFn.OutputReceiver<PartitionMetadata> outputReceiver, ManualWatermarkEstimator<Instant> manualWatermarkEstimator, Timestamp timestamp, Timestamp timestamp2) {
        manualWatermarkEstimator.setWatermark(new Instant(timestamp.toSqlTimestamp()));
        return schedulePartitions(restrictionTracker, outputReceiver, timestamp, batchByCreatedAt(getAllPartitionsCreatedAfter(timestamp2)));
    }

    private List<PartitionMetadata> getAllPartitionsCreatedAfter(Timestamp timestamp) {
        ArrayList arrayList = new ArrayList();
        ResultSet allPartitionsCreatedAfter = this.dao.getAllPartitionsCreatedAfter(timestamp);
        Throwable th = null;
        while (allPartitionsCreatedAfter.next()) {
            try {
                try {
                    arrayList.add(this.mapper.from(allPartitionsCreatedAfter.getCurrentRowAsStruct()));
                } finally {
                }
            } catch (Throwable th2) {
                if (allPartitionsCreatedAfter != null) {
                    if (th != null) {
                        try {
                            allPartitionsCreatedAfter.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        allPartitionsCreatedAfter.close();
                    }
                }
                throw th2;
            }
        }
        if (allPartitionsCreatedAfter != null) {
            if (0 != 0) {
                try {
                    allPartitionsCreatedAfter.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                allPartitionsCreatedAfter.close();
            }
        }
        LOG.info("Found " + arrayList.size() + " to be scheduled (readTimestamp = " + timestamp + ")");
        return arrayList;
    }

    private TreeMap<Timestamp, List<PartitionMetadata>> batchByCreatedAt(List<PartitionMetadata> list) {
        return (TreeMap) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getCreatedAt();
        }, TreeMap::new, Collectors.toList()));
    }

    private DoFn.ProcessContinuation schedulePartitions(RestrictionTracker<TimestampRange, Timestamp> restrictionTracker, DoFn.OutputReceiver<PartitionMetadata> outputReceiver, Timestamp timestamp, TreeMap<Timestamp, List<PartitionMetadata>> treeMap) {
        for (Map.Entry<Timestamp, List<PartitionMetadata>> entry : treeMap.entrySet()) {
            Timestamp key = entry.getKey();
            List<PartitionMetadata> value = entry.getValue();
            Timestamp updateBatchToScheduled = updateBatchToScheduled(value);
            if (!restrictionTracker.tryClaim(key)) {
                return DoFn.ProcessContinuation.stop();
            }
            outputBatch(outputReceiver, timestamp, value, updateBatchToScheduled);
        }
        return DoFn.ProcessContinuation.resume().withResumeDelay(this.resumeDuration);
    }

    private Timestamp updateBatchToScheduled(List<PartitionMetadata> list) {
        return this.dao.updateToScheduled((List) list.stream().map((v0) -> {
            return v0.getPartitionToken();
        }).collect(Collectors.toList()));
    }

    private void outputBatch(DoFn.OutputReceiver<PartitionMetadata> outputReceiver, Timestamp timestamp, List<PartitionMetadata> list, Timestamp timestamp2) {
        for (PartitionMetadata partitionMetadata : list) {
            Timestamp createdAt = partitionMetadata.getCreatedAt();
            PartitionMetadata build = partitionMetadata.toBuilder().setScheduledAt(timestamp2).build();
            LOG.info("[" + build.getPartitionToken() + "] Scheduled partition at " + build.getScheduledAt() + " with start time " + build.getStartTimestamp() + " and end time " + build.getEndTimestamp());
            outputReceiver.outputWithTimestamp(partitionMetadata, new Instant(timestamp.toSqlTimestamp()));
            this.metrics.incPartitionRecordCount();
            this.metrics.updatePartitionCreatedToScheduled(new Duration(createdAt.toSqlTimestamp().getTime(), timestamp2.toSqlTimestamp().getTime()));
        }
    }

    private DoFn.ProcessContinuation terminate(RestrictionTracker<TimestampRange, Timestamp> restrictionTracker) {
        restrictionTracker.tryClaim(((TimestampRange) restrictionTracker.currentRestriction()).getTo());
        LOG.info("All partitions have been processed, stopping");
        return DoFn.ProcessContinuation.stop();
    }
}
