package io.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.joda.time.DateTime;

/* loaded from: input_file:io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.class */
public class FiniteAppenderatorDriver implements Closeable {
    private static final Logger log = new Logger(FiniteAppenderatorDriver.class);
    private final Appenderator appenderator;
    private final SegmentAllocator segmentAllocator;
    private final SegmentHandoffNotifier handoffNotifier;
    private final UsedSegmentChecker usedSegmentChecker;
    private final ObjectMapper objectMapper;
    private final int maxRowsPerSegment;
    private final long handoffConditionTimeout;
    private final Map<String, NavigableMap<Long, SegmentIdentifier>> activeSegments = new TreeMap();
    private final Map<String, String> lastSegmentIds = Maps.newHashMap();
    private final Object handoffMonitor = new Object();

    public FiniteAppenderatorDriver(Appenderator appenderator, SegmentAllocator segmentAllocator, SegmentHandoffNotifierFactory segmentHandoffNotifierFactory, UsedSegmentChecker usedSegmentChecker, ObjectMapper objectMapper, int i, long j) {
        this.appenderator = (Appenderator) Preconditions.checkNotNull(appenderator, "appenderator");
        this.segmentAllocator = (SegmentAllocator) Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
        this.handoffNotifier = ((SegmentHandoffNotifierFactory) Preconditions.checkNotNull(segmentHandoffNotifierFactory, "handoffNotifierFactory")).createSegmentHandoffNotifier(appenderator.getDataSource());
        this.usedSegmentChecker = (UsedSegmentChecker) Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.maxRowsPerSegment = i;
        this.handoffConditionTimeout = j;
    }

    public Object startJob() {
        this.handoffNotifier.start();
        FiniteAppenderatorDriverMetadata finiteAppenderatorDriverMetadata = (FiniteAppenderatorDriverMetadata) this.objectMapper.convertValue(this.appenderator.startJob(), FiniteAppenderatorDriverMetadata.class);
        log.info("Restored metadata[%s].", new Object[]{finiteAppenderatorDriverMetadata});
        if (finiteAppenderatorDriverMetadata == null) {
            return null;
        }
        synchronized (this.activeSegments) {
            for (Map.Entry<String, List<SegmentIdentifier>> entry : finiteAppenderatorDriverMetadata.getActiveSegments().entrySet()) {
                String key = entry.getKey();
                TreeMap newTreeMap = Maps.newTreeMap();
                this.lastSegmentIds.put(key, finiteAppenderatorDriverMetadata.getLastSegmentIds().get(key));
                this.activeSegments.put(key, newTreeMap);
                for (SegmentIdentifier segmentIdentifier : entry.getValue()) {
                    newTreeMap.put(Long.valueOf(segmentIdentifier.getInterval().getStartMillis()), segmentIdentifier);
                }
            }
        }
        return finiteAppenderatorDriverMetadata.getCallerMetadata();
    }

    public void clear() throws InterruptedException {
        synchronized (this.activeSegments) {
            this.activeSegments.clear();
        }
        this.appenderator.clear();
    }

    public SegmentIdentifier add(InputRow inputRow, String str, Supplier<Committer> supplier) throws IOException {
        Preconditions.checkNotNull(inputRow, "row");
        Preconditions.checkNotNull(str, "sequenceName");
        Preconditions.checkNotNull(supplier, "committerSupplier");
        SegmentIdentifier segment = getSegment(inputRow.getTimestamp(), str);
        if (segment != null) {
            try {
                if (this.appenderator.add(segment, inputRow, wrapCommitterSupplier(supplier)) >= this.maxRowsPerSegment) {
                    moveSegmentOut(str, ImmutableList.of(segment));
                }
            } catch (SegmentNotWritableException e) {
                throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", new Object[]{segment});
            }
        }
        return segment;
    }

    public Object persist(Committer committer) throws InterruptedException {
        try {
            log.info("Persisting data.", new Object[0]);
            long currentTimeMillis = System.currentTimeMillis();
            Object obj = this.appenderator.persistAll(wrapCommitter(committer)).get();
            log.info("Persisted pending data in %,dms.", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            return obj;
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        }
    }

    public SegmentsAndMetadata finish(TransactionalSegmentPublisher transactionalSegmentPublisher, Committer committer) throws InterruptedException {
        SegmentsAndMetadata publishAll = publishAll(transactionalSegmentPublisher, wrapCommitter(committer));
        if (publishAll == null) {
            return null;
        }
        long currentTimeMillis = this.handoffConditionTimeout > 0 ? System.currentTimeMillis() + this.handoffConditionTimeout : 0L;
        log.info("Awaiting handoff of segments: [%s]", new Object[]{Joiner.on(", ").join(this.appenderator.getSegments())});
        synchronized (this.handoffMonitor) {
            while (!this.appenderator.getSegments().isEmpty()) {
                if (currentTimeMillis == 0) {
                    this.handoffMonitor.wait();
                } else {
                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                    if (currentTimeMillis2 <= 0) {
                        throw new ISE("Segment handoff wait timeout. Segments not yet handed off: [%s]", new Object[]{Joiner.on(", ").join(this.appenderator.getSegments())});
                    }
                    this.handoffMonitor.wait(currentTimeMillis2);
                }
            }
        }
        log.info("All segments handed off.", new Object[0]);
        return new SegmentsAndMetadata(publishAll.getSegments(), ((FiniteAppenderatorDriverMetadata) publishAll.getCommitMetadata()).getCallerMetadata());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.handoffNotifier.close();
    }

    private SegmentIdentifier getActiveSegment(DateTime dateTime, String str) {
        synchronized (this.activeSegments) {
            NavigableMap<Long, SegmentIdentifier> navigableMap = this.activeSegments.get(str);
            if (navigableMap == null) {
                return null;
            }
            Map.Entry<Long, SegmentIdentifier> floorEntry = navigableMap.floorEntry(Long.valueOf(dateTime.getMillis()));
            if (floorEntry == null || !floorEntry.getValue().getInterval().contains(dateTime)) {
                return null;
            }
            return floorEntry.getValue();
        }
    }

    private SegmentIdentifier getSegment(DateTime dateTime, String str) throws IOException {
        synchronized (this.activeSegments) {
            SegmentIdentifier activeSegment = getActiveSegment(dateTime, str);
            if (activeSegment != null) {
                return activeSegment;
            }
            NavigableMap<Long, SegmentIdentifier> navigableMap = this.activeSegments.get(str);
            SegmentIdentifier allocate = this.segmentAllocator.allocate(dateTime, str, this.lastSegmentIds.get(str));
            if (allocate != null) {
                Long valueOf = Long.valueOf(allocate.getInterval().getStartMillis());
                for (SegmentIdentifier segmentIdentifier : this.appenderator.getSegments()) {
                    if (segmentIdentifier.equals(allocate)) {
                        throw new ISE("WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", new Object[]{allocate, segmentIdentifier});
                    }
                }
                log.info("New segment[%s] for sequenceName[%s].", new Object[]{allocate, str});
                if (navigableMap == null) {
                    this.activeSegments.put(str, Maps.newTreeMap());
                }
                this.activeSegments.get(str).put(valueOf, allocate);
                this.lastSegmentIds.put(str, allocate.getIdentifierAsString());
            } else {
                log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", new Object[]{dateTime, str});
            }
            return allocate;
        }
    }

    private void moveSegmentOut(String str, List<SegmentIdentifier> list) {
        synchronized (this.activeSegments) {
            NavigableMap<Long, SegmentIdentifier> navigableMap = this.activeSegments.get(str);
            if (navigableMap == null) {
                throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", new Object[]{str});
            }
            for (SegmentIdentifier segmentIdentifier : list) {
                log.info("Moving segment[%s] out of active list.", new Object[]{segmentIdentifier});
                if (navigableMap.remove(Long.valueOf(segmentIdentifier.getInterval().getStartMillis())) != segmentIdentifier) {
                    throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", new Object[]{segmentIdentifier});
                }
            }
        }
    }

    /* JADX WARN: Type inference failed for: r3v2, types: [java.lang.Object[]] */
    private SegmentsAndMetadata publishAll(TransactionalSegmentPublisher transactionalSegmentPublisher, Committer committer) throws InterruptedException {
        SegmentsAndMetadata segmentsAndMetadata;
        List<SegmentIdentifier> copyOf = ImmutableList.copyOf(this.appenderator.getSegments());
        long j = 0;
        while (true) {
            try {
                log.info("Pushing segments: [%s]", new Object[]{Joiner.on(", ").join(copyOf)});
                segmentsAndMetadata = (SegmentsAndMetadata) this.appenderator.push(copyOf, committer).get();
                break;
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                long j2 = j + 1;
                j = r3;
                long computeNextRetrySleep = computeNextRetrySleep(j2);
                Logger logger = log;
                ?? r3 = {Long.valueOf(j), Long.valueOf(computeNextRetrySleep)};
                logger.warn(e2, "Failed publishAll (try %d), retrying in %,dms.", (Object[]) r3);
                Thread.sleep(computeNextRetrySleep);
            }
        }
        if (!segmentsToIdentifiers(segmentsAndMetadata.getSegments()).equals(Sets.newHashSet(copyOf))) {
            throw new ISE("WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", new Object[]{Joiner.on(", ").join(identifiersToStrings(segmentsToIdentifiers(segmentsAndMetadata.getSegments()))), Joiner.on(", ").join(identifiersToStrings(copyOf))});
        }
        log.info("Publishing segments with commitMetadata[%s]: [%s]", new Object[]{segmentsAndMetadata.getCommitMetadata(), Joiner.on(", ").join(segmentsAndMetadata.getSegments())});
        if (segmentsAndMetadata.getSegments().isEmpty()) {
            log.info("Nothing to publish, skipping publish step.", new Object[0]);
        } else if (transactionalSegmentPublisher.publishSegments(ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata())) {
            log.info("Published segments, awaiting handoff.", new Object[0]);
        } else {
            log.info("Transaction failure while publishing segments, checking if someone else beat us to it.", new Object[0]);
            if (!this.usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments())).equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
                log.warn("Our segments don't exist, giving up.", new Object[0]);
                return null;
            }
            log.info("Our segments really do exist, awaiting handoff.", new Object[0]);
        }
        for (final DataSegment dataSegment : segmentsAndMetadata.getSegments()) {
            this.handoffNotifier.registerSegmentHandoffCallback(new SegmentDescriptor(dataSegment.getInterval(), dataSegment.getVersion(), dataSegment.getShardSpec().getPartitionNum()), MoreExecutors.sameThreadExecutor(), new Runnable() { // from class: io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.1
                @Override // java.lang.Runnable
                public void run() {
                    SegmentIdentifier fromDataSegment = SegmentIdentifier.fromDataSegment(dataSegment);
                    FiniteAppenderatorDriver.log.info("Segment[%s] successfully handed off, dropping.", new Object[]{fromDataSegment});
                    Futures.addCallback(FiniteAppenderatorDriver.this.appenderator.drop(fromDataSegment), new FutureCallback<Object>() { // from class: io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.1.1
                        public void onSuccess(Object obj) {
                            synchronized (FiniteAppenderatorDriver.this.handoffMonitor) {
                                FiniteAppenderatorDriver.this.handoffMonitor.notifyAll();
                            }
                        }

                        public void onFailure(Throwable th) {
                            FiniteAppenderatorDriver.log.warn(th, "Failed to drop segment[%s]?!", new Object[0]);
                            synchronized (FiniteAppenderatorDriver.this.handoffMonitor) {
                                FiniteAppenderatorDriver.this.handoffMonitor.notifyAll();
                            }
                        }
                    });
                }
            });
        }
        return segmentsAndMetadata;
    }

    private Supplier<Committer> wrapCommitterSupplier(final Supplier<Committer> supplier) {
        return new Supplier<Committer>() { // from class: io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Committer m128get() {
                return FiniteAppenderatorDriver.this.wrapCommitter((Committer) supplier.get());
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Committer wrapCommitter(final Committer committer) {
        Committer committer2;
        synchronized (this.activeSegments) {
            final FiniteAppenderatorDriverMetadata finiteAppenderatorDriverMetadata = new FiniteAppenderatorDriverMetadata(ImmutableMap.copyOf(Maps.transformValues(this.activeSegments, new Function<NavigableMap<Long, SegmentIdentifier>, List<SegmentIdentifier>>() { // from class: io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.3
                public List<SegmentIdentifier> apply(NavigableMap<Long, SegmentIdentifier> navigableMap) {
                    return ImmutableList.copyOf(navigableMap.values());
                }
            })), ImmutableMap.copyOf(this.lastSegmentIds), committer.getMetadata());
            committer2 = new Committer() { // from class: io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.4
                public Object getMetadata() {
                    return finiteAppenderatorDriverMetadata;
                }

                public void run() {
                    committer.run();
                }
            };
        }
        return committer2;
    }

    private static long computeNextRetrySleep(long j) {
        return (long) (Math.min(60000.0d, 1000.0d * Math.pow(2.0d, j)) * Math.min(Math.max(1.0d + (0.2d * new Random().nextGaussian()), 0.0d), 2.0d));
    }

    private static Set<SegmentIdentifier> segmentsToIdentifiers(Iterable<DataSegment> iterable) {
        return FluentIterable.from(iterable).transform(new Function<DataSegment, SegmentIdentifier>() { // from class: io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.5
            public SegmentIdentifier apply(DataSegment dataSegment) {
                return SegmentIdentifier.fromDataSegment(dataSegment);
            }
        }).toSet();
    }

    private static Iterable<String> identifiersToStrings(Iterable<SegmentIdentifier> iterable) {
        return FluentIterable.from(iterable).transform(new Function<SegmentIdentifier, String>() { // from class: io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.6
            public String apply(SegmentIdentifier segmentIdentifier) {
                return segmentIdentifier.getIdentifierAsString();
            }
        });
    }
}
