/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.instance;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.heron.api.Config;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.utils.metrics.ComponentMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.instance.OutgoingTupleCollection;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class AbstractOutputCollector {
    protected final IPluggableSerializer serializer;
    protected final OutgoingTupleCollection outputter;
    protected final ComponentMetrics metrics;
    protected final boolean ackEnabled;
    private long totalTuplesEmitted;
    private long totalBytesEmitted;
    private PhysicalPlanHelper helper;
    public final ReentrantLock lock = new ReentrantLock();

    public AbstractOutputCollector(IPluggableSerializer serializer, PhysicalPlanHelper helper, Communicator<Message> streamOutQueue, ComponentMetrics metrics) {
        this.serializer = serializer;
        this.metrics = metrics;
        this.totalTuplesEmitted = 0L;
        this.totalBytesEmitted = 0L;
        this.helper = helper;
        Map<String, Object> config = helper.getTopologyContext().getTopologyConfig();
        this.ackEnabled = config.containsKey("topology.reliability.mode") && config.get("topology.reliability.mode") != null ? Config.TopologyReliabilityMode.valueOf(config.get("topology.reliability.mode").toString()) == Config.TopologyReliabilityMode.ATLEAST_ONCE : (config.containsKey("topology.acking") && config.get("topology.acking") != null ? Boolean.parseBoolean(config.get("topology.acking").toString()) : false);
        this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, this.lock, metrics);
    }

    public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
        this.helper = physicalPlanHelper;
        this.outputter.updatePhysicalPlanHelper(physicalPlanHelper);
    }

    public PhysicalPlanHelper getPhysicalPlanHelper() {
        return this.helper;
    }

    public boolean isOutQueuesAvailable() {
        return this.outputter.isOutQueuesAvailable();
    }

    public long getTotalDataEmittedInBytes() {
        return this.outputter.getTotalDataEmittedInBytes();
    }

    public void sendOutTuples() {
        this.outputter.sendOutTuples();
    }

    public void sendOutState(State<Serializable, Serializable> state, String checkpointId, boolean spillState, String location) {
        this.outputter.sendOutState(state, checkpointId, spillState, location);
    }

    public void clear() {
        this.outputter.clear();
    }

    public long getTotalTuplesEmitted() {
        return this.totalTuplesEmitted;
    }

    public long getTotalBytesEmitted() {
        return this.totalBytesEmitted;
    }

    protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String streamId, List<Object> tuple, Integer emitDirectTaskId) {
        HeronTuples.HeronDataTuple.Builder builder = HeronTuples.HeronDataTuple.newBuilder();
        builder.setKey(0L);
        ArrayList<Integer> customGroupingTargetTaskIds = null;
        if (emitDirectTaskId != null) {
            customGroupingTargetTaskIds = new ArrayList<Integer>();
            customGroupingTargetTaskIds.add(emitDirectTaskId);
        } else if (!this.helper.isCustomGroupingEmpty()) {
            customGroupingTargetTaskIds = this.helper.chooseTasksForCustomStreamGrouping(streamId, tuple);
        }
        if (customGroupingTargetTaskIds != null) {
            builder.addAllDestTaskIds(customGroupingTargetTaskIds);
        }
        this.helper.getTopologyContext().invokeHookEmit(tuple, streamId, customGroupingTargetTaskIds);
        return builder;
    }

    protected void sendTuple(HeronTuples.HeronDataTuple.Builder bldr, String streamId, List<Object> tuple) {
        long tupleSizeInBytes = 0L;
        long startTime = System.nanoTime();
        for (Object obj : tuple) {
            byte[] b = this.serializer.serialize(obj);
            ByteString bstr = ByteString.copyFrom(b);
            bldr.addValues(bstr);
            tupleSizeInBytes += (long)b.length;
        }
        long latency = System.nanoTime() - startTime;
        this.metrics.serializeDataTuple(streamId, latency);
        this.outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
        ++this.totalTuplesEmitted;
        this.totalBytesEmitted += tupleSizeInBytes;
        this.metrics.emittedTuple(streamId);
    }
}

