package org.apache.heron.instance;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.ComponentMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.Message;
import org.apache.heron.shaded.org.apache.commons.cli.HelpFormatter;

/* loaded from: input_file:org/apache/heron/instance/OutgoingTupleCollection.class */
public class OutgoingTupleCollection {
    protected PhysicalPlanHelper helper;
    private final Communicator<Message> outQueue;
    private final ByteAmount maxDataTupleSize;
    private final int dataTupleSetCapacity;
    private final int controlTupleSetCapacity;
    private final IPluggableSerializer serializer;
    private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
    private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
    private AtomicLong totalDataEmittedInBytes = new AtomicLong();
    private long currentDataTupleSizeInBytes;
    private final ReentrantLock lock;
    protected final ComponentMetrics metrics;

    public OutgoingTupleCollection(PhysicalPlanHelper physicalPlanHelper, Communicator<Message> communicator, ReentrantLock reentrantLock, ComponentMetrics componentMetrics) {
        this.outQueue = communicator;
        this.helper = physicalPlanHelper;
        this.metrics = componentMetrics;
        SystemConfig systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.serializer = SerializeDeSerializeHelper.getSerializer(physicalPlanHelper.getTopologyContext().getTopologyConfig());
        this.totalDataEmittedInBytes.set(0L);
        this.currentDataTupleSizeInBytes = 0L;
        this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
        this.maxDataTupleSize = systemConfig.getInstanceSetDataTupleSize();
        this.controlTupleSetCapacity = systemConfig.getInstanceSetControlTupleCapacity();
        this.lock = reentrantLock;
    }

    public void sendOutTuples() {
        this.lock.lock();
        try {
            flushRemaining();
        } finally {
            this.lock.unlock();
        }
    }

    public void sendOutState(State<Serializable, Serializable> state, String str, boolean z, String str2) {
        this.lock.lock();
        try {
            flushRemaining();
            byte[] serialize = this.serializer.serialize(state);
            CheckpointManager.InstanceStateCheckpoint.Builder newBuilder = CheckpointManager.InstanceStateCheckpoint.newBuilder();
            newBuilder.setCheckpointId(str);
            if (z) {
                FileUtils.cleanDir(str2);
                String str3 = str2 + str + HelpFormatter.DEFAULT_OPT_PREFIX + UUID.randomUUID();
                if (!FileUtils.writeToFile(str3, serialize, true)) {
                    throw new RuntimeException("failed to spill state. Bailing out...");
                }
                newBuilder.setStateLocation(str3);
            } else {
                newBuilder.setState(ByteString.copyFrom(serialize));
            }
            this.outQueue.offer(CheckpointManager.StoreInstanceStateCheckpoint.newBuilder().setState(newBuilder.build()).build());
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void addDataTuple(String str, HeronTuples.HeronDataTuple.Builder builder, long j) {
        this.lock.lock();
        try {
            if (j > this.maxDataTupleSize.asBytes()) {
                throw new RuntimeException(String.format("Data tuple (stream id: %s) is too large: %d bytes", str, Long.valueOf(j)));
            }
            if (this.currentDataTuple == null || !this.currentDataTuple.getStream().getId().equals(str) || this.currentDataTuple.getTuplesCount() >= this.dataTupleSetCapacity || this.currentDataTupleSizeInBytes >= this.maxDataTupleSize.asBytes()) {
                initNewDataTuple(str);
            }
            this.currentDataTuple.addTuples(builder);
            this.currentDataTupleSizeInBytes += j;
            this.totalDataEmittedInBytes.getAndAdd(j);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void addAckTuple(HeronTuples.AckTuple.Builder builder, long j) {
        this.lock.lock();
        try {
            if (this.currentControlTuple == null || this.currentControlTuple.getFailsCount() > 0 || this.currentControlTuple.getAcksCount() >= this.controlTupleSetCapacity) {
                initNewControlTuple();
            }
            this.currentControlTuple.addAcks(builder);
            this.totalDataEmittedInBytes.getAndAdd(j);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void addFailTuple(HeronTuples.AckTuple.Builder builder, long j) {
        this.lock.lock();
        try {
            if (this.currentControlTuple == null || this.currentControlTuple.getAcksCount() > 0 || this.currentControlTuple.getFailsCount() >= this.controlTupleSetCapacity) {
                initNewControlTuple();
            }
            this.currentControlTuple.addFails(builder);
            this.totalDataEmittedInBytes.getAndAdd(j);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private void initNewDataTuple(String str) {
        flushRemaining();
        this.currentDataTupleSizeInBytes = 0L;
        TopologyAPI.StreamId.Builder newBuilder = TopologyAPI.StreamId.newBuilder();
        newBuilder.setId(str);
        newBuilder.setComponentName(this.helper.getMyComponent());
        this.currentDataTuple = HeronTuples.HeronDataTupleSet.newBuilder();
        this.currentDataTuple.setStream(newBuilder);
    }

    private void initNewControlTuple() {
        flushRemaining();
        this.currentControlTuple = HeronTuples.HeronControlTupleSet.newBuilder();
    }

    private void flushRemaining() {
        if (this.currentDataTuple != null) {
            HeronTuples.HeronTupleSet.Builder newBuilder = HeronTuples.HeronTupleSet.newBuilder();
            newBuilder.setSrcTaskId(this.helper.getMyTaskId());
            newBuilder.setData(this.currentDataTuple);
            pushTupleToQueue(newBuilder, this.outQueue);
            this.metrics.addTupleToQueue(this.currentDataTuple.getTuplesCount());
            this.currentDataTuple = null;
        }
        if (this.currentControlTuple != null) {
            HeronTuples.HeronTupleSet.Builder newBuilder2 = HeronTuples.HeronTupleSet.newBuilder();
            newBuilder2.setSrcTaskId(this.helper.getMyTaskId());
            newBuilder2.setControl(this.currentControlTuple);
            pushTupleToQueue(newBuilder2, this.outQueue);
            this.currentControlTuple = null;
        }
    }

    private void pushTupleToQueue(HeronTuples.HeronTupleSet.Builder builder, Communicator<Message> communicator) {
        communicator.offer(builder.build());
    }

    public boolean isOutQueuesAvailable() {
        return this.outQueue.size() < this.outQueue.getExpectedAvailableCapacity();
    }

    public long getTotalDataEmittedInBytes() {
        return this.totalDataEmittedInBytes.get();
    }

    public void clear() {
        this.lock.lock();
        try {
            this.currentControlTuple = null;
            this.currentDataTuple = null;
            this.outQueue.clear();
        } finally {
            this.lock.unlock();
        }
    }

    public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
        this.lock.lock();
        try {
            this.helper = physicalPlanHelper;
        } finally {
            this.lock.unlock();
        }
    }
}
