/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.server;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import java.util.zip.GZIPInputStream;
import javax.net.ssl.SSLSocket;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import org.apache.nifi.controller.queue.clustered.server.TransactionAbortedException;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardLoadBalanceProtocol
implements LoadBalanceProtocol {
    private static final Logger logger = LoggerFactory.getLogger(StandardLoadBalanceProtocol.class);
    private static final int SOCKET_CLOSED = -1;
    private static final int NO_DATA_AVAILABLE = 0;
    private final FlowFileRepository flowFileRepository;
    private final ContentRepository contentRepository;
    private final ProvenanceRepository provenanceRepository;
    private final FlowController flowController;
    private final LoadBalanceAuthorizer authorizer;
    private final ThreadLocal<byte[]> dataBuffer = new ThreadLocal();
    private final AtomicLong lineageStartIndex = new AtomicLong(0L);

    public StandardLoadBalanceProtocol(FlowFileRepository flowFileRepository, ContentRepository contentRepository, ProvenanceRepository provenanceRepository, FlowController flowController, LoadBalanceAuthorizer authorizer) {
        this.flowFileRepository = flowFileRepository;
        this.contentRepository = contentRepository;
        this.provenanceRepository = provenanceRepository;
        this.flowController = flowController;
        this.authorizer = authorizer;
    }

    @Override
    public void receiveFlowFiles(Socket socket, InputStream in, OutputStream out) throws IOException {
        int version;
        String peerDescription = socket.getInetAddress().getHostName();
        String channelDescription = String.valueOf(socket.getLocalSocketAddress()) + "::" + String.valueOf(socket.getRemoteSocketAddress());
        if (socket instanceof SSLSocket) {
            logger.debug("Connection received from peer {}", (Object)peerDescription);
            peerDescription = this.authorizer.authorize((SSLSocket)socket);
            channelDescription = peerDescription + "::" + channelDescription;
            logger.debug("Client Identities are authorized to load balance data for peer {}", (Object)peerDescription);
        }
        if ((version = this.negotiateProtocolVersion(in, out, peerDescription, channelDescription)) == -1) {
            socket.close();
            return;
        }
        if (version == 0) {
            logger.debug("No data is available from {}", (Object)peerDescription);
            return;
        }
        this.receiveFlowFiles(in, out, peerDescription, version);
    }

    protected int negotiateProtocolVersion(InputStream in, OutputStream out, String peerDescription, String channelDescription) throws IOException {
        StandardVersionNegotiator negotiator = new StandardVersionNegotiator(new int[]{1});
        int i = 0;
        while (true) {
            int requestedVersion;
            try {
                requestedVersion = in.read();
            }
            catch (SocketTimeoutException ste) {
                if (i == 0) {
                    logger.debug("SocketTimeoutException thrown when trying to negotiate Protocol Version");
                    return 0;
                }
                throw ste;
            }
            if (requestedVersion < 0) {
                logger.debug("Encountered End-of-File when receiving the the recommended Protocol Version. Returning -1 for the protocol version");
                return -1;
            }
            boolean supported = negotiator.isVersionSupported(requestedVersion);
            if (supported) {
                logger.debug("Peer {} requested version {} of the Load Balance Protocol over Channel {}. Accepting version.", new Object[]{peerDescription, requestedVersion, channelDescription});
                out.write(16);
                out.flush();
                return requestedVersion;
            }
            Integer preferredVersion = negotiator.getPreferredVersion(requestedVersion);
            if (preferredVersion == null) {
                logger.debug("Peer {} requested version {} of the Load Balance Protocol over Channel {}. This version is not acceptable. Aborting communications.", new Object[]{peerDescription, requestedVersion, channelDescription});
                out.write(18);
                out.flush();
                throw new IOException("Peer " + peerDescription + " requested that we use version " + requestedVersion + " of the Load Balance Protocol over Channel " + channelDescription + ", but this version is unacceptable. Aborted communications.");
            }
            logger.debug("Peer {} requested version {} of the Load Balance Protocol over Channel {}. Requesting that peer change to version {} instead.", new Object[]{peerDescription, requestedVersion, channelDescription, preferredVersion});
            out.write(17);
            out.write(preferredVersion);
            out.flush();
            ++i;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void receiveFlowFiles(InputStream in, OutputStream out, String peerDescription, int protocolVersion) throws IOException {
        logger.debug("Receiving FlowFiles from {}", (Object)peerDescription);
        long startTimestamp = System.currentTimeMillis();
        CRC32 checksum = new CRC32();
        CheckedInputStream checkedInput = new CheckedInputStream(in, checksum);
        DataInputStream dataIn = new DataInputStream(checkedInput);
        String connectionId = this.getConnectionID(dataIn, peerDescription);
        if (connectionId == null) {
            logger.debug("Received no Connection ID from Peer {}. Will consider receipt of FlowFiles complete", (Object)peerDescription);
            return;
        }
        Connection connection = this.flowController.getFlowManager().getConnection(connectionId);
        if (connection == null) {
            logger.error("Attempted to receive FlowFiles from Peer {} for Connection with ID {} but no connection exists with that ID", (Object)peerDescription, (Object)connectionId);
            throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but no Connection exists with that ID");
        }
        FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
        if (!(flowFileQueue instanceof LoadBalancedFlowFileQueue)) {
            throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but the Connection with that ID is not configured to allow for Load Balancing");
        }
        LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = (LoadBalancedFlowFileQueue)flowFileQueue;
        int spaceCheck = dataIn.read();
        if (spaceCheck < 0) {
            throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription);
        }
        if (spaceCheck == 97) {
            if (loadBalancedFlowFileQueue.isLocalPartitionFull()) {
                logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", (Object)peerDescription, (Object)connectionId);
                out.write(102);
                out.flush();
                return;
            }
            logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with SPACE_AVAILABLE", (Object)peerDescription, (Object)connectionId);
            out.write(101);
            out.flush();
        } else if (spaceCheck != 98) {
            throw new TransactionAbortedException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription + " but instead received value " + spaceCheck);
        }
        LoadBalanceCompression compression = connection.getFlowFileQueue().getLoadBalanceCompression();
        logger.debug("Receiving FlowFiles from Peer {} for Connection {}; Compression = {}", new Object[]{peerDescription, connectionId, compression});
        ContentClaim contentClaim = null;
        ArrayList<RemoteFlowFileRecord> flowFilesReceived = new ArrayList<RemoteFlowFileRecord>();
        OutputStream contentClaimOut = null;
        long claimOffset = 0L;
        try {
            try {
                while (this.isMoreFlowFiles(dataIn, protocolVersion)) {
                    if (contentClaim == null) {
                        contentClaim = this.contentRepository.create(false);
                        contentClaimOut = this.contentRepository.write(contentClaim);
                    }
                    RemoteFlowFileRecord flowFile = this.receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);
                    this.contentRepository.incrementClaimaintCount(flowFile.getFlowFile().getContentClaim());
                    flowFilesReceived.add(flowFile);
                    claimOffset += flowFile.getFlowFile().getSize();
                }
            }
            finally {
                if (contentClaimOut != null) {
                    contentClaimOut.close();
                }
            }
            int count = this.contentRepository.decrementClaimantCount(contentClaim);
            this.verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size());
            this.completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue)flowFileQueue);
            if (count == 0) {
                this.contentRepository.remove(contentClaim);
            }
        }
        catch (Exception e) {
            for (RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) {
                this.contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim());
            }
            this.contentRepository.remove(contentClaim);
            throw e;
        }
        logger.debug("Successfully received {} FlowFiles from Peer {} to Load Balance for Connection {}", new Object[]{flowFilesReceived.size(), peerDescription, connectionId});
    }

    private void completeTransaction(InputStream in, OutputStream out, String peerDescription, List<RemoteFlowFileRecord> flowFilesReceived, String connectionId, long startTimestamp, LoadBalancedFlowFileQueue flowFileQueue) throws IOException {
        int completionIndicator = in.read();
        if (completionIndicator < 0) {
            throw new EOFException("Expected to receive a Transaction Completion Indicator from Peer " + peerDescription + " but encountered EOF");
        }
        if (completionIndicator == 36) {
            throw new TransactionAbortedException("Peer " + peerDescription + " chose to Abort Load Balance Transaction");
        }
        if (completionIndicator != 35) {
            logger.debug("Expected to receive Transaction Completion Indicator from Peer {} but instead received a value of {}. Sending back an Abort Transaction Flag.", (Object)peerDescription, (Object)completionIndicator);
            out.write(36);
            out.flush();
            throw new IOException("Expected to receive Transaction Completion Indicator from Peer " + peerDescription + " but instead received a value of " + completionIndicator);
        }
        logger.debug("Received Complete Transaction indicator from Peer {}", (Object)peerDescription);
        this.registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, connectionId, startTimestamp);
        this.updateFlowFileRepository(flowFilesReceived, (FlowFileQueue)flowFileQueue);
        try {
            this.transferFlowFilesToQueue(flowFilesReceived, flowFileQueue);
        }
        catch (IllegalClusterStateException e) {
            logger.error("Failed to transferred received data into FlowFile Queue {}", (Object)flowFileQueue, (Object)e);
            out.write(36);
            out.flush();
            try {
                this.cleanupRepositoriesOnTransferFailure(flowFilesReceived, (FlowFileQueue)flowFileQueue, "Rejected transfer due to " + e.getMessage());
            }
            catch (Exception e1) {
                logger.error("Failed to update FlowFile/Provenance Repositories to denote that the data that could not be received should no longer be present on this node", (Throwable)e1);
            }
            return;
        }
        out.write(37);
        out.flush();
    }

    private void cleanupRepositoriesOnTransferFailure(List<RemoteFlowFileRecord> flowFilesReceived, FlowFileQueue flowFileQueue, String details) throws IOException {
        this.dropFlowFilesFromRepository(flowFilesReceived, flowFileQueue);
        this.reportDropEvents(flowFilesReceived, flowFileQueue.getIdentifier(), details);
    }

    private void dropFlowFilesFromRepository(List<RemoteFlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
        List repoRecords = flowFiles.stream().map(remoteFlowFile -> {
            StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile());
            record.setDestination(flowFileQueue);
            record.markForDelete();
            return record;
        }).collect(Collectors.toList());
        this.flowFileRepository.updateRepository(repoRecords);
        logger.debug("Updated FlowFile Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue", repoRecords);
    }

    private void reportDropEvents(List<RemoteFlowFileRecord> flowFilesReceived, String connectionId, String details) {
        ArrayList<ProvenanceEventRecord> events = new ArrayList<ProvenanceEventRecord>(flowFilesReceived.size());
        for (RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) {
            FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile();
            ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder().fromFlowFile((FlowFile)flowFileRecord).setEventType(ProvenanceEventType.DROP).setComponentId(connectionId).setComponentType("Load Balanced Connection").setDetails(details);
            ContentClaim contentClaim = flowFileRecord.getContentClaim();
            if (contentClaim != null) {
                ResourceClaim resourceClaim = contentClaim.getResourceClaim();
                provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
            }
            ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build();
            events.add(provenanceEvent);
        }
        logger.debug("Updated Provenance Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue", (Object)events.size());
        this.provenanceRepository.registerEvents(events);
    }

    private void registerReceiveProvenanceEvents(List<RemoteFlowFileRecord> flowFiles, String nodeName, String connectionId, long startTimestamp) {
        long duration = System.currentTimeMillis() - startTimestamp;
        ArrayList<ProvenanceEventRecord> events = new ArrayList<ProvenanceEventRecord>(flowFiles.size());
        for (RemoteFlowFileRecord remoteFlowFile : flowFiles) {
            FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile();
            ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder().fromFlowFile((FlowFile)flowFileRecord).setEventType(ProvenanceEventType.RECEIVE).setTransitUri("nifi://" + nodeName + "/loadbalance/" + connectionId).setSourceSystemFlowFileIdentifier(remoteFlowFile.getRemoteUuid()).setEventDuration(duration).setComponentId(connectionId).setComponentType("Load Balanced Connection");
            ContentClaim contentClaim = flowFileRecord.getContentClaim();
            if (contentClaim != null) {
                ResourceClaim resourceClaim = contentClaim.getResourceClaim();
                provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
            }
            ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build();
            events.add(provenanceEvent);
        }
        this.provenanceRepository.registerEvents(events);
    }

    private void updateFlowFileRepository(List<RemoteFlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
        List repoRecords = flowFiles.stream().map(remoteFlowFile -> {
            StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile());
            record.setDestination(flowFileQueue);
            return record;
        }).collect(Collectors.toList());
        this.flowFileRepository.updateRepository(repoRecords);
    }

    private void transferFlowFilesToQueue(List<RemoteFlowFileRecord> remoteFlowFiles, LoadBalancedFlowFileQueue flowFileQueue) throws IllegalClusterStateException {
        List flowFiles = remoteFlowFiles.stream().map(RemoteFlowFileRecord::getFlowFile).collect(Collectors.toList());
        flowFileQueue.receiveFromPeer(flowFiles);
    }

    private void verifyChecksum(Checksum checksum, InputStream in, OutputStream out, String peerDescription, int flowFileCount) throws IOException {
        long expectedChecksum = this.readChecksum(in);
        if (checksum.getValue() != expectedChecksum) {
            logger.error("Received {} FlowFiles from peer {} but the Checksum reported by the peer ({}) did not match the checksum that was calculated ({}). Will reject the transaction.", new Object[]{flowFileCount, peerDescription, expectedChecksum, checksum.getValue()});
            out.write(34);
            out.flush();
            throw new TransactionAbortedException("Transaction with Peer " + peerDescription + " was aborted because the calculated checksum did not match the checksum provided by peer.");
        }
        logger.debug("Checksum from Peer {} matched the checksum that was calculated. Writing confirmation.", (Object)peerDescription);
        out.write(33);
        out.flush();
    }

    private long readChecksum(InputStream in) throws IOException {
        byte[] buffer = this.getDataBuffer();
        StreamUtils.read((InputStream)in, (byte[])buffer, (int)8);
        return ByteBuffer.wrap(buffer, 0, 8).getLong();
    }

    private byte[] getDataBuffer() {
        byte[] buffer = this.dataBuffer.get();
        if (buffer == null) {
            buffer = new byte[69632];
            this.dataBuffer.set(buffer);
        }
        return buffer;
    }

    private String getConnectionID(DataInputStream in, String peerDescription) throws IOException {
        try {
            return in.readUTF();
        }
        catch (EOFException eof) {
            logger.debug("Encountered EOFException when trying to receive Connection ID from Peer {}. Returning null for Connection ID", (Object)peerDescription);
            return null;
        }
    }

    private boolean isMoreFlowFiles(DataInputStream in, int protocolVersion) throws IOException {
        int indicator = in.read();
        if (indicator < 0) {
            throw new EOFException();
        }
        if (indicator == 49) {
            logger.debug("Peer indicates that there is another FlowFile in transaction");
            return true;
        }
        if (indicator == 50) {
            logger.debug("Peer indicates that there are no more FlowFiles in transaction");
            return false;
        }
        throw new IOException("Expected to receive 'More FlowFiles' indicator (49) or 'No More FlowFiles' indicator (50) but received invalid value of " + indicator);
    }

    private RemoteFlowFileRecord receiveFlowFile(DataInputStream dis, OutputStream out, ContentClaim contentClaim, long claimOffset, int protocolVersion, String peerDescription, LoadBalanceCompression compression) throws IOException {
        int metadataLength = dis.readInt();
        DataInputStream metadataIn = new DataInputStream((InputStream)new LimitingInputStream((InputStream)dis, (long)metadataLength));
        if (compression != LoadBalanceCompression.DO_NOT_COMPRESS) {
            metadataIn = new DataInputStream(new GZIPInputStream(metadataIn));
        }
        Map<String, String> attributes = this.readAttributes(metadataIn);
        String sourceSystemUuid = attributes.get(CoreAttributes.UUID.key());
        logger.debug("Received Attributes {} from Peer {}", attributes, (Object)peerDescription);
        long lineageStartDate = metadataIn.readLong();
        long entryDate = metadataIn.readLong();
        long penaltyExpirationMillis = metadataIn.readLong();
        ContentClaimTriple contentClaimTriple = this.consumeContent(dis, out, contentClaim, claimOffset, peerDescription, compression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
        FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder().id(this.flowFileRepository.getNextFlowFileSequence()).addAttributes(attributes).addAttribute(CoreAttributes.UUID.key(), UUID.randomUUID().toString()).contentClaim(contentClaimTriple.getContentClaim()).contentClaimOffset(contentClaimTriple.getClaimOffset()).size(contentClaimTriple.getContentLength()).entryDate(entryDate).lineageStart(lineageStartDate, this.lineageStartIndex.getAndIncrement()).penaltyExpirationTime(penaltyExpirationMillis).build();
        logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", new Object[]{flowFileRecord, attributes.size(), contentClaimTriple.getContentLength()});
        return new RemoteFlowFileRecord(sourceSystemUuid, flowFileRecord);
    }

    private Map<String, String> readAttributes(DataInputStream in) throws IOException {
        int attributeCount = in.readInt();
        HashMap<String, String> attributes = new HashMap<String, String>();
        for (int i = 0; i < attributeCount; ++i) {
            String key = this.readLongString(in);
            String value = this.readLongString(in);
            logger.trace("Received attribute '{}' = '{}'", (Object)key, (Object)value);
            attributes.put(key, value);
        }
        return attributes;
    }

    private String readLongString(DataInputStream in) throws IOException {
        int stringLength = in.readInt();
        byte[] bytes = new byte[stringLength];
        StreamUtils.fillBuffer((InputStream)in, (byte[])bytes);
        return new String(bytes, StandardCharsets.UTF_8);
    }

    private ContentClaimTriple consumeContent(DataInputStream in, OutputStream out, ContentClaim contentClaim, long claimOffset, String peerDescription, boolean compressed) throws IOException {
        logger.debug("Consuming content from Peer {}", (Object)peerDescription);
        int dataFrameIndicator = in.read();
        if (dataFrameIndicator < 0) {
            throw new EOFException("Encountered End-of-File when expecting to read Data Frame Indicator from Peer " + peerDescription);
        }
        if (dataFrameIndicator == 64) {
            logger.debug("Peer {} indicates that there is no Data Frame for the FlowFile", (Object)peerDescription);
            return new ContentClaimTriple(null, 0L, 0L);
        }
        if (dataFrameIndicator == 36) {
            throw new TransactionAbortedException("Peer " + peerDescription + " requested that transaction be aborted");
        }
        if (dataFrameIndicator != 66) {
            throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
        }
        int dataFrameLength = in.readInt();
        logger.trace("Received Data Frame Length of {} for {}", (Object)dataFrameLength, (Object)peerDescription);
        byte[] buffer = this.getDataBuffer();
        long claimLength = 0L;
        while (true) {
            LimitedInputStream limitedIn = new LimitedInputStream((InputStream)in, (long)dataFrameLength);
            ByteCountingInputStream bcis = new ByteCountingInputStream((InputStream)limitedIn);
            Object contentIn = compressed ? new GZIPInputStream((InputStream)bcis) : bcis;
            int decompressedSize = StreamUtils.fillBuffer((InputStream)contentIn, (byte[])buffer, (boolean)false);
            if (bcis.getBytesRead() < (long)dataFrameLength) {
                throw new EOFException("Expected to receive a Data Frame of length " + dataFrameLength + " bytes but received only " + bcis.getBytesRead() + " bytes");
            }
            out.write(buffer, 0, decompressedSize);
            claimLength += (long)decompressedSize;
            dataFrameIndicator = in.read();
            if (dataFrameIndicator < 0) {
                throw new EOFException("Encountered End-of-File when expecting to receive a Data Frame Indicator");
            }
            if (dataFrameIndicator == 64) break;
            if (dataFrameIndicator == 36) {
                logger.debug("Peer {} requested that transaction be aborted by sending Data Frame Length of {}", (Object)peerDescription, (Object)dataFrameLength);
                throw new TransactionAbortedException("Peer " + peerDescription + " requested that transaction be aborted");
            }
            if (dataFrameIndicator != 66) {
                throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
            }
            dataFrameLength = in.readInt();
            logger.trace("Received Data Frame Length of {} for {}", (Object)dataFrameLength, (Object)peerDescription);
        }
        logger.debug("Peer {} indicated that no more data frames are available", (Object)peerDescription);
        return new ContentClaimTriple(contentClaim, claimOffset, claimLength);
    }

    private static class RemoteFlowFileRecord {
        private final String remoteUuid;
        private final FlowFileRecord flowFile;

        public RemoteFlowFileRecord(String remoteUuid, FlowFileRecord flowFile) {
            this.remoteUuid = remoteUuid;
            this.flowFile = flowFile;
        }

        public String getRemoteUuid() {
            return this.remoteUuid;
        }

        public FlowFileRecord getFlowFile() {
            return this.flowFile;
        }
    }

    private static class ContentClaimTriple {
        private final ContentClaim contentClaim;
        private final long claimOffset;
        private final long contentLength;

        public ContentClaimTriple(ContentClaim contentClaim, long claimOffset, long contentLength) {
            this.contentClaim = contentClaim;
            this.claimOffset = claimOffset;
            this.contentLength = contentLength;
        }

        public ContentClaim getContentClaim() {
            return this.contentClaim;
        }

        public long getClaimOffset() {
            return this.claimOffset;
        }

        public long getContentLength() {
            return this.contentLength;
        }
    }
}

