package org.apache.heron.instance.spout;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.spout.ISpoutOutputCollector;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.utils.metrics.ComponentMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.TupleKeyGenerator;
import org.apache.heron.instance.AbstractOutputCollector;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/instance/spout/SpoutOutputCollectorImpl.class */
public class SpoutOutputCollectorImpl extends AbstractOutputCollector implements ISpoutOutputCollector {
    private static final Logger LOG = Logger.getLogger(SpoutOutputCollectorImpl.class.getName());
    private final LinkedHashMap<Long, RootTupleInfo> inFlightTuples;
    private final TupleKeyGenerator keyGenerator;
    private final Queue<RootTupleInfo> immediateAcks;

    /* JADX INFO: Access modifiers changed from: protected */
    public SpoutOutputCollectorImpl(IPluggableSerializer iPluggableSerializer, PhysicalPlanHelper physicalPlanHelper, Communicator<Message> communicator, ComponentMetrics componentMetrics) {
        super(iPluggableSerializer, physicalPlanHelper, communicator, componentMetrics);
        if (physicalPlanHelper.getMySpout() == null) {
            throw new RuntimeException(physicalPlanHelper.getMyTaskId() + " is not a spout ");
        }
        this.keyGenerator = new TupleKeyGenerator();
        this.inFlightTuples = new LinkedHashMap<>();
        if (this.ackEnabled) {
            this.immediateAcks = null;
        } else {
            this.immediateAcks = new ArrayDeque();
        }
    }

    @Override // org.apache.heron.api.spout.ISpoutOutputCollector
    public List<Integer> emit(String str, List<Object> list, Object obj) {
        return admitSpoutTuple(str, list, obj, null);
    }

    @Override // org.apache.heron.api.spout.ISpoutOutputCollector
    public void emitDirect(int i, String str, List<Object> list, Object obj) {
        admitSpoutTuple(str, list, obj, Integer.valueOf(i));
    }

    @Override // org.apache.heron.api.spout.ISpoutOutputCollector
    public void reportError(Throwable th) {
        LOG.log(Level.SEVERE, "Reporting an error in topology code ", th);
    }

    public boolean isAckEnabled() {
        return this.ackEnabled;
    }

    public int numInFlight() {
        return this.inFlightTuples.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Queue<RootTupleInfo> getImmediateAcks() {
        return this.immediateAcks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RootTupleInfo retireInFlight(long j) {
        return this.inFlightTuples.remove(Long.valueOf(j));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<RootTupleInfo> retireExpired(Duration duration) {
        ArrayList arrayList = new ArrayList();
        long nanoTime = System.nanoTime();
        Iterator<RootTupleInfo> it = this.inFlightTuples.values().iterator();
        while (it.hasNext()) {
            RootTupleInfo next = it.next();
            if (!next.isExpired(nanoTime, duration.toNanos())) {
                break;
            }
            arrayList.add(next);
            it.remove();
        }
        return arrayList;
    }

    private List<Integer> admitSpoutTuple(String str, List<Object> list, Object obj, Integer num) {
        if (getPhysicalPlanHelper().isTerminatedComponent()) {
            return null;
        }
        HeronTuples.HeronDataTuple.Builder initTupleBuilder = initTupleBuilder(str, list, num);
        if (obj != null) {
            RootTupleInfo rootTupleInfo = new RootTupleInfo(str, obj);
            if (this.ackEnabled) {
                initTupleBuilder.addRoots(establishRootId(rootTupleInfo));
            } else {
                this.immediateAcks.offer(rootTupleInfo);
            }
        }
        sendTuple(initTupleBuilder, str, list);
        return null;
    }

    private HeronTuples.RootId.Builder establishRootId(RootTupleInfo rootTupleInfo) {
        long next = this.keyGenerator.next();
        HeronTuples.RootId.Builder newBuilder = HeronTuples.RootId.newBuilder();
        newBuilder.setTaskid(getPhysicalPlanHelper().getMyTaskId());
        newBuilder.setKey(next);
        this.inFlightTuples.put(Long.valueOf(next), rootTupleInfo);
        return newBuilder;
    }
}
