/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.instance;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.ComponentMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.Message;

public class OutgoingTupleCollection {
    protected PhysicalPlanHelper helper;
    private final Communicator<Message> outQueue;
    private final ByteAmount maxDataTupleSize;
    private final int dataTupleSetCapacity;
    private final int controlTupleSetCapacity;
    private final IPluggableSerializer serializer;
    private HeronTuples.HeronDataTupleSet.Builder currentDataTuple;
    private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
    private AtomicLong totalDataEmittedInBytes = new AtomicLong();
    private long currentDataTupleSizeInBytes;
    private final ReentrantLock lock;
    protected final ComponentMetrics metrics;

    public OutgoingTupleCollection(PhysicalPlanHelper helper, Communicator<Message> outQueue, ReentrantLock lock, ComponentMetrics metrics) {
        this.outQueue = outQueue;
        this.helper = helper;
        this.metrics = metrics;
        SystemConfig systemConfig = (SystemConfig)SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.serializer = SerializeDeSerializeHelper.getSerializer(helper.getTopologyContext().getTopologyConfig());
        this.totalDataEmittedInBytes.set(0L);
        this.currentDataTupleSizeInBytes = 0L;
        this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
        this.maxDataTupleSize = systemConfig.getInstanceSetDataTupleSize();
        this.controlTupleSetCapacity = systemConfig.getInstanceSetControlTupleCapacity();
        this.lock = lock;
    }

    public void sendOutTuples() {
        this.lock.lock();
        try {
            this.flushRemaining();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendOutState(State<Serializable, Serializable> state, String checkpointId, boolean spillState, String location) {
        this.lock.lock();
        try {
            this.flushRemaining();
            byte[] serializedState = this.serializer.serialize(state);
            CheckpointManager.InstanceStateCheckpoint.Builder instanceStateBuilder = CheckpointManager.InstanceStateCheckpoint.newBuilder();
            instanceStateBuilder.setCheckpointId(checkpointId);
            if (spillState) {
                FileUtils.cleanDir(location);
                String stateLocation = location + checkpointId + "-" + UUID.randomUUID();
                if (!FileUtils.writeToFile(stateLocation, serializedState, true)) {
                    throw new RuntimeException("failed to spill state. Bailing out...");
                }
                instanceStateBuilder.setStateLocation(stateLocation);
            } else {
                instanceStateBuilder.setState(ByteString.copyFrom(serializedState));
            }
            CheckpointManager.StoreInstanceStateCheckpoint storeRequest = CheckpointManager.StoreInstanceStateCheckpoint.newBuilder().setState(instanceStateBuilder.build()).build();
            this.outQueue.offer(storeRequest);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addDataTuple(String streamId, HeronTuples.HeronDataTuple.Builder newTuple, long tupleSizeInBytes) {
        this.lock.lock();
        try {
            if (tupleSizeInBytes > this.maxDataTupleSize.asBytes()) {
                throw new RuntimeException(String.format("Data tuple (stream id: %s) is too large: %d bytes", streamId, tupleSizeInBytes));
            }
            if (this.currentDataTuple == null || !this.currentDataTuple.getStream().getId().equals(streamId) || this.currentDataTuple.getTuplesCount() >= this.dataTupleSetCapacity || this.currentDataTupleSizeInBytes >= this.maxDataTupleSize.asBytes()) {
                this.initNewDataTuple(streamId);
            }
            this.currentDataTuple.addTuples(newTuple);
            this.currentDataTupleSizeInBytes += tupleSizeInBytes;
            this.totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
        this.lock.lock();
        try {
            if (this.currentControlTuple == null || this.currentControlTuple.getFailsCount() > 0 || this.currentControlTuple.getAcksCount() >= this.controlTupleSetCapacity) {
                this.initNewControlTuple();
            }
            this.currentControlTuple.addAcks(newTuple);
            this.totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
        this.lock.lock();
        try {
            if (this.currentControlTuple == null || this.currentControlTuple.getAcksCount() > 0 || this.currentControlTuple.getFailsCount() >= this.controlTupleSetCapacity) {
                this.initNewControlTuple();
            }
            this.currentControlTuple.addFails(newTuple);
            this.totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void initNewDataTuple(String streamId) {
        this.flushRemaining();
        this.currentDataTupleSizeInBytes = 0L;
        TopologyAPI.StreamId.Builder sbldr = TopologyAPI.StreamId.newBuilder();
        sbldr.setId(streamId);
        sbldr.setComponentName(this.helper.getMyComponent());
        this.currentDataTuple = HeronTuples.HeronDataTupleSet.newBuilder();
        this.currentDataTuple.setStream(sbldr);
    }

    private void initNewControlTuple() {
        this.flushRemaining();
        this.currentControlTuple = HeronTuples.HeronControlTupleSet.newBuilder();
    }

    private void flushRemaining() {
        HeronTuples.HeronTupleSet.Builder bldr;
        if (this.currentDataTuple != null) {
            bldr = HeronTuples.HeronTupleSet.newBuilder();
            bldr.setSrcTaskId(this.helper.getMyTaskId());
            bldr.setData(this.currentDataTuple);
            this.pushTupleToQueue(bldr, this.outQueue);
            this.metrics.addTupleToQueue(this.currentDataTuple.getTuplesCount());
            this.currentDataTuple = null;
        }
        if (this.currentControlTuple != null) {
            bldr = HeronTuples.HeronTupleSet.newBuilder();
            bldr.setSrcTaskId(this.helper.getMyTaskId());
            bldr.setControl(this.currentControlTuple);
            this.pushTupleToQueue(bldr, this.outQueue);
            this.currentControlTuple = null;
        }
    }

    private void pushTupleToQueue(HeronTuples.HeronTupleSet.Builder bldr, Communicator<Message> out) {
        out.offer(bldr.build());
    }

    public boolean isOutQueuesAvailable() {
        return this.outQueue.size() < this.outQueue.getExpectedAvailableCapacity();
    }

    public long getTotalDataEmittedInBytes() {
        return this.totalDataEmittedInBytes.get();
    }

    public void clear() {
        this.lock.lock();
        try {
            this.currentControlTuple = null;
            this.currentDataTuple = null;
            this.outQueue.clear();
        }
        finally {
            this.lock.unlock();
        }
    }

    public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
        this.lock.lock();
        try {
            this.helper = physicalPlanHelper;
        }
        finally {
            this.lock.unlock();
        }
    }
}

