package io.druid.segment.realtime;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.guice.annotations.Processing;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Sink;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;

/* loaded from: input_file:io/druid/segment/realtime/RealtimeManager.class */
public class RealtimeManager implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(RealtimeManager.class);
    private final List<FireDepartment> fireDepartments;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private ExecutorService executorService;
    private final Map<String, List<FireChief>> chiefs = Maps.newHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManager$FireChief.class */
    public class FireChief extends Thread implements Closeable {
        private final FireDepartment fireDepartment;
        private final FireDepartmentMetrics metrics;
        private volatile RealtimeTuningConfig config = null;
        private volatile Firehose firehose = null;
        private volatile Plumber plumber = null;
        private volatile boolean normalExit = true;

        public FireChief(FireDepartment fireDepartment) {
            this.fireDepartment = fireDepartment;
            this.metrics = fireDepartment.getMetrics();
        }

        public void init() throws IOException {
            this.config = this.fireDepartment.getTuningConfig();
            synchronized (this) {
                try {
                    RealtimeManager.log.info("Calling the FireDepartment and getting a Firehose.", new Object[0]);
                    this.firehose = this.fireDepartment.connect();
                    RealtimeManager.log.info("Firehose acquired!", new Object[0]);
                    RealtimeManager.log.info("Someone get us a plumber!", new Object[0]);
                    this.plumber = this.fireDepartment.findPlumber();
                    RealtimeManager.log.info("We have our plumber!", new Object[0]);
                } catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
        }

        public FireDepartmentMetrics getMetrics() {
            return this.metrics;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            boolean z;
            boolean z2;
            verifyState();
            Period intermediatePersistPeriod = this.config.getIntermediatePersistPeriod();
            try {
                try {
                    try {
                        this.plumber.startJob();
                        long millis = new DateTime().plus(intermediatePersistPeriod).getMillis();
                        while (this.firehose.hasMore()) {
                            InputRow inputRow = null;
                            try {
                                try {
                                    inputRow = this.firehose.nextRow();
                                    z = false;
                                    z2 = false;
                                    try {
                                        z = this.plumber.add(inputRow) == -1;
                                    } catch (IndexSizeExceededException e) {
                                        RealtimeManager.log.info("Index limit exceeded: %s", new Object[]{e.getMessage()});
                                        z2 = true;
                                    }
                                } catch (ParseException e2) {
                                    if (inputRow != null) {
                                        RealtimeManager.log.error(e2, "unparseable line: %s", new Object[]{inputRow});
                                    }
                                    this.metrics.incrementUnparseable();
                                }
                            } catch (Exception e3) {
                                RealtimeManager.log.debug(e3, "thrown away line due to exception, considering unparseable", new Object[0]);
                                this.metrics.incrementUnparseable();
                            }
                            if (z2 || z) {
                                this.metrics.incrementThrownAway();
                                RealtimeManager.log.debug("Throwing away event[%s]", new Object[]{inputRow});
                                if (z2 || System.currentTimeMillis() > millis) {
                                    this.plumber.persist(this.firehose.commit());
                                    millis = new DateTime().plus(intermediatePersistPeriod).getMillis();
                                }
                            } else {
                                Sink sink = this.plumber.getSink(inputRow.getTimestampFromEpoch());
                                if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > millis) {
                                    this.plumber.persist(this.firehose.commit());
                                    millis = new DateTime().plus(intermediatePersistPeriod).getMillis();
                                }
                                this.metrics.incrementProcessed();
                            }
                        }
                    } catch (RuntimeException e4) {
                        RealtimeManager.log.makeAlert(e4, "RuntimeException aborted realtime processing[%s]", new Object[]{this.fireDepartment.getDataSchema().getDataSource()}).emit();
                        this.normalExit = false;
                        throw e4;
                    }
                } catch (Error e5) {
                    RealtimeManager.log.makeAlert(e5, "Exception aborted realtime processing[%s]", new Object[]{this.fireDepartment.getDataSchema().getDataSource()}).emit();
                    this.normalExit = false;
                    throw e5;
                }
            } finally {
                CloseQuietly.close(this.firehose);
                if (this.normalExit) {
                    this.plumber.finishJob();
                    this.plumber = null;
                    this.firehose = null;
                }
            }
        }

        private void verifyState() {
            Preconditions.checkNotNull(this.config, "config is null, init() must be called first.");
            Preconditions.checkNotNull(this.firehose, "firehose is null, init() must be called first.");
            Preconditions.checkNotNull(this.plumber, "plumber is null, init() must be called first.");
            RealtimeManager.log.info("FireChief[%s] state ok.", new Object[]{this.fireDepartment.getDataSchema().getDataSource()});
        }

        public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
            return new FinalizeResultsQueryRunner(this.plumber.getQueryRunner(query), RealtimeManager.this.conglomerate.findFactory(query).getToolchest());
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            synchronized (this) {
                if (this.firehose != null) {
                    this.normalExit = false;
                    this.firehose.close();
                }
            }
        }
    }

    @Inject
    public RealtimeManager(List<FireDepartment> list, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @Processing @JacksonInject ExecutorService executorService) {
        this.fireDepartments = list;
        this.conglomerate = queryRunnerFactoryConglomerate;
        this.executorService = executorService;
    }

    @LifecycleStart
    public void start() throws IOException {
        for (FireDepartment fireDepartment : this.fireDepartments) {
            DataSchema dataSchema = fireDepartment.getDataSchema();
            FireChief fireChief = new FireChief(fireDepartment);
            List<FireChief> list = this.chiefs.get(dataSchema.getDataSource());
            if (list == null) {
                list = new ArrayList();
                this.chiefs.put(dataSchema.getDataSource(), list);
            }
            list.add(fireChief);
            fireChief.setName(String.format("chief-%s", dataSchema.getDataSource()));
            fireChief.setDaemon(true);
            fireChief.init();
            fireChief.start();
        }
    }

    @LifecycleStop
    public void stop() {
        Iterator<List<FireChief>> it = this.chiefs.values().iterator();
        while (it.hasNext()) {
            Iterator<T> it2 = it.next().iterator();
            while (it2.hasNext()) {
                CloseQuietly.close((FireChief) it2.next());
            }
        }
    }

    public FireDepartmentMetrics getMetrics(String str) {
        List<FireChief> list = this.chiefs.get(str);
        if (list == null) {
            return null;
        }
        FireDepartmentMetrics fireDepartmentMetrics = null;
        for (FireChief fireChief : list) {
            if (fireDepartmentMetrics == null) {
                fireDepartmentMetrics = fireChief.getMetrics().snapshot();
            } else {
                fireDepartmentMetrics.merge(fireChief.getMetrics());
            }
        }
        return fireDepartmentMetrics;
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> iterable) {
        return getQueryRunnerForSegments(query, null);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, Iterable<SegmentDescriptor> iterable) {
        final QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        return new UnionQueryRunner(Iterables.transform(query.getDataSource().getNames(), new Function<String, QueryRunner>() { // from class: io.druid.segment.realtime.RealtimeManager.1
            public QueryRunner<T> apply(String str) {
                Iterable iterable2 = (Iterable) RealtimeManager.this.chiefs.get(str);
                return iterable2 == null ? new NoopQueryRunner() : findFactory.getToolchest().mergeResults(findFactory.mergeRunners(RealtimeManager.this.executorService, Iterables.transform(iterable2, new Function<FireChief, QueryRunner<T>>() { // from class: io.druid.segment.realtime.RealtimeManager.1.1
                    public QueryRunner<T> apply(FireChief fireChief) {
                        return fireChief.getQueryRunner(query);
                    }
                })));
            }
        }), this.conglomerate.findFactory(query).getToolchest());
    }
}
