/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.client.async.nio;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
import org.apache.nifi.controller.queue.clustered.TransactionThreshold;
import org.apache.nifi.controller.queue.clustered.client.LoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.nio.PeerChannel;
import org.apache.nifi.controller.queue.clustered.client.async.nio.RegisteredPartition;
import org.apache.nifi.controller.queue.clustered.server.TransactionAbortedException;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadBalanceSession {
    private static final Logger logger = LoggerFactory.getLogger(LoadBalanceSession.class);
    static final int MAX_DATA_FRAME_SIZE = 65535;
    private static final long PENALTY_MILLIS = TimeUnit.SECONDS.toMillis(2L);
    private final RegisteredPartition partition;
    private final Supplier<FlowFileRecord> flowFileSupplier;
    private final FlowFileContentAccess flowFileContentAccess;
    private final LoadBalanceFlowFileCodec flowFileCodec;
    private final PeerChannel channel;
    private final int timeoutMillis;
    private final String peerDescription;
    private final String connectionId;
    private final TransactionThreshold transactionThreshold;
    private volatile boolean canceled = false;
    final VersionNegotiator negotiator = new StandardVersionNegotiator(new int[]{1});
    private int protocolVersion = 1;
    private final Checksum checksum = new CRC32();
    private ByteBuffer preparedFrame;
    private FlowFileRecord currentFlowFile;
    private List<FlowFileRecord> flowFilesSent = new ArrayList<FlowFileRecord>();
    private TransactionPhase phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
    private InputStream flowFileInputStream;
    private byte[] byteBuffer = new byte[65535];
    private boolean complete = false;
    private long readTimeout;
    private long penaltyExpiration = -1L;

    public LoadBalanceSession(RegisteredPartition partition, FlowFileContentAccess contentAccess, LoadBalanceFlowFileCodec flowFileCodec, PeerChannel peerChannel, int timeoutMillis, TransactionThreshold transactionThreshold) {
        this.partition = partition;
        this.flowFileSupplier = partition.getFlowFileRecordSupplier();
        this.connectionId = partition.getConnectionId();
        this.flowFileContentAccess = contentAccess;
        this.flowFileCodec = flowFileCodec;
        this.channel = peerChannel;
        this.peerDescription = peerChannel.getPeerDescription();
        if (timeoutMillis < 1) {
            throw new IllegalArgumentException();
        }
        this.timeoutMillis = timeoutMillis;
        this.transactionThreshold = transactionThreshold;
    }

    public RegisteredPartition getPartition() {
        return this.partition;
    }

    public synchronized int getDesiredReadinessFlag() {
        return this.phase.getRequiredSelectionKey();
    }

    public synchronized List<FlowFileRecord> getFlowFilesSent() {
        return Collections.unmodifiableList(this.flowFilesSent);
    }

    public synchronized boolean isComplete() {
        return this.complete;
    }

    public synchronized boolean communicate() throws IOException {
        if (this.isComplete()) {
            return false;
        }
        if (this.isPenalized()) {
            logger.debug("Will not communicate with Peer {} for Connection {} because session is penalized", (Object)this.peerDescription, (Object)this.connectionId);
            return false;
        }
        try {
            if (this.preparedFrame != null && this.preparedFrame.hasRemaining()) {
                logger.trace("Current Frame is already available. Will continue writing current frame to channel");
                int bytesWritten = this.channel.write(this.preparedFrame);
                return bytesWritten > 0;
            }
            switch (this.phase) {
                case RECEIVE_SPACE_RESPONSE: {
                    return this.receiveSpaceAvailableResponse();
                }
                case VERIFY_CHECKSUM: {
                    return this.verifyChecksum();
                }
                case CONFIRM_TRANSACTION_COMPLETE: {
                    return this.confirmTransactionComplete();
                }
                case RECEIVE_PROTOCOL_VERSION_ACKNOWLEDGMENT: {
                    return this.receiveProtocolVersionAcknowledgment();
                }
                case RECEIVE_RECOMMENDED_PROTOCOL_VERSION: {
                    return this.receiveRecommendedProtocolVersion();
                }
            }
            ByteBuffer byteBuffer = this.getDataFrame();
            this.preparedFrame = this.channel.prepareForWrite(byteBuffer);
            int bytesWritten = this.channel.write(this.preparedFrame);
            return bytesWritten > 0;
        }
        catch (Exception e) {
            this.complete = true;
            throw e;
        }
    }

    public synchronized boolean cancel() {
        if (this.complete) {
            return false;
        }
        this.complete = true;
        this.canceled = true;
        return true;
    }

    public boolean isCanceled() {
        return this.canceled;
    }

    private boolean confirmTransactionComplete() throws IOException {
        logger.debug("Confirming Transaction Complete for Peer {}", (Object)this.peerDescription);
        OptionalInt transactionResponse = this.channel.read();
        if (!transactionResponse.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to confirm the transaction is complete");
            }
            return false;
        }
        int response = transactionResponse.getAsInt();
        if (response < 0) {
            throw new EOFException("Confirmed checksum when writing data to Peer " + this.peerDescription + " but encountered End-of-File when expecting a Transaction Complete confirmation");
        }
        if (response == 36) {
            throw new TransactionAbortedException("Confirmed checksum when writing data to Peer " + this.peerDescription + " but Peer aborted transaction instead of completing it");
        }
        if (response != 37) {
            throw new IOException("Expected a CONFIRM_COMPLETE_TRANSACTION response from Peer " + this.peerDescription + " but received a value of " + response);
        }
        this.complete = true;
        logger.debug("Successfully completed Transaction to send {} FlowFiles to Peer {} for Connection {}", new Object[]{this.flowFilesSent.size(), this.peerDescription, this.connectionId});
        return true;
    }

    private boolean verifyChecksum() throws IOException {
        logger.debug("Verifying Checksum for Peer {}", (Object)this.peerDescription);
        OptionalInt checksumResponse = this.channel.read();
        if (!checksumResponse.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to verify the checksum");
            }
            return false;
        }
        int response = checksumResponse.getAsInt();
        if (response < 0) {
            throw new EOFException("Encountered End-of-File when trying to verify Checksum with Peer " + this.peerDescription);
        }
        if (response == 34) {
            throw new TransactionAbortedException("After transferring FlowFiles to Peer " + this.peerDescription + " received a REJECT_CHECKSUM response. Aborting transaction.");
        }
        if (response != 33) {
            throw new TransactionAbortedException("After transferring FlowFiles to Peer " + this.peerDescription + " received an unexpected response code " + response + ". Aborting transaction.");
        }
        logger.debug("Checksum confirmed. Writing COMPLETE_TRANSACTION flag");
        this.phase = TransactionPhase.SEND_TRANSACTION_COMPLETE;
        return true;
    }

    private ByteBuffer getDataFrame() throws IOException {
        switch (this.phase) {
            case RECOMMEND_PROTOCOL_VERSION: {
                return this.recommendProtocolVersion();
            }
            case ABORT_PROTOCOL_NEGOTIATION: {
                return this.abortProtocolNegotiation();
            }
            case SEND_CONNECTION_ID: {
                return this.getConnectionId();
            }
            case CHECK_SPACE: {
                return this.checkSpace();
            }
            case GET_NEXT_FLOWFILE: {
                return this.getNextFlowFile();
            }
            case SEND_FLOWFILE_DEFINITION: 
            case SEND_FLOWFILE_CONTENTS: {
                return this.getFlowFileContent();
            }
            case SEND_CHECKSUM: {
                return this.getChecksum();
            }
            case SEND_TRANSACTION_COMPLETE: {
                return this.getTransactionComplete();
            }
        }
        logger.debug("Phase of {}, returning null ByteBuffer", (Object)this.phase);
        return null;
    }

    private ByteBuffer getTransactionComplete() {
        logger.debug("Sending Transaction Complete Indicator to Peer {}", (Object)this.peerDescription);
        ByteBuffer buffer = ByteBuffer.allocate(1);
        buffer.put((byte)35);
        buffer.rewind();
        this.readTimeout = System.currentTimeMillis() + (long)this.timeoutMillis;
        this.phase = TransactionPhase.CONFIRM_TRANSACTION_COMPLETE;
        return buffer;
    }

    private ByteBuffer getChecksum() {
        logger.debug("Sending Checksum of {} to Peer {}", (Object)this.checksum.getValue(), (Object)this.peerDescription);
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putLong(this.checksum.getValue());
        this.readTimeout = System.currentTimeMillis() + (long)this.timeoutMillis;
        this.phase = TransactionPhase.VERIFY_CHECKSUM;
        buffer.rewind();
        return buffer;
    }

    private ByteBuffer getFlowFileContent() throws IOException {
        try {
            ByteBuffer buffer;
            int bytesRead;
            if (this.flowFileInputStream == null) {
                this.flowFileInputStream = this.flowFileContentAccess.read(this.currentFlowFile);
            }
            if ((bytesRead = StreamUtils.fillBuffer((InputStream)this.flowFileInputStream, (byte[])this.byteBuffer, (boolean)false)) < 1) {
                this.flowFileInputStream.close();
                this.flowFileInputStream = null;
                this.phase = TransactionPhase.GET_NEXT_FLOWFILE;
                ByteBuffer buffer2 = ByteBuffer.allocate(1);
                buffer2.put((byte)64);
                buffer2.rewind();
                this.checksum.update(64);
                logger.debug("Sending NO_DATA_FRAME indicator to Peer {}", (Object)this.peerDescription);
                return buffer2;
            }
            logger.trace("Sending Data Frame that is {} bytes long to Peer {}", (Object)bytesRead, (Object)this.peerDescription);
            if (this.partition.getCompression() == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT) {
                byte[] compressed = this.compressDataFrame(this.byteBuffer, bytesRead);
                int compressedMaxLen = compressed.length;
                buffer = ByteBuffer.allocate(5 + compressedMaxLen);
                buffer.put((byte)66);
                buffer.putInt(compressedMaxLen);
                buffer.put(compressed, 0, compressedMaxLen);
            } else {
                buffer = ByteBuffer.allocate(5 + bytesRead);
                buffer.put((byte)66);
                buffer.putInt(bytesRead);
                buffer.put(this.byteBuffer, 0, bytesRead);
            }
            byte[] frameArray = buffer.array();
            this.checksum.update(frameArray, 0, frameArray.length);
            this.phase = TransactionPhase.SEND_FLOWFILE_CONTENTS;
            buffer.rewind();
            return buffer;
        }
        catch (ContentNotFoundException cnfe) {
            throw new ContentNotFoundException(this.currentFlowFile, cnfe.getMissingClaim(), cnfe.getMessage());
        }
    }

    /*
     * Exception decompiling
     */
    private byte[] compressDataFrame(byte[] uncompressed, int byteCount) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private ByteBuffer getNextFlowFile() throws IOException {
        byte[] flowFileEncoded;
        if (this.transactionThreshold.isThresholdMet()) {
            this.currentFlowFile = null;
            logger.debug("Transaction Threshold reached sending to Peer {}; Transitioning phase to SEND_CHECKSUM", (Object)this.peerDescription);
        } else {
            this.currentFlowFile = this.flowFileSupplier.get();
            if (this.currentFlowFile == null) {
                logger.debug("No more FlowFiles to send to Peer {}; Transitioning phase to SEND_CHECKSUM", (Object)this.peerDescription);
            }
        }
        if (this.currentFlowFile == null) {
            this.phase = TransactionPhase.SEND_CHECKSUM;
            return this.noMoreFlowFiles();
        }
        this.transactionThreshold.adjust(1, this.currentFlowFile.getSize());
        logger.debug("Next FlowFile to send to Peer {} is {}", (Object)this.peerDescription, (Object)this.currentFlowFile);
        this.flowFilesSent.add(this.currentFlowFile);
        LoadBalanceCompression compression = this.partition.getCompression();
        boolean compressAttributes = compression != LoadBalanceCompression.DO_NOT_COMPRESS;
        logger.debug("Compression to use for sending to Peer {} is {}", (Object)this.peerDescription, (Object)compression);
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            block41: {
                if (compressAttributes) {
                    try (GZIPOutputStream gzipOut = new GZIPOutputStream((OutputStream)baos, 1);
                         ByteCountingOutputStream out = new ByteCountingOutputStream((OutputStream)gzipOut);){
                        this.flowFileCodec.encode(this.currentFlowFile, (OutputStream)out);
                        break block41;
                    }
                }
                this.flowFileCodec.encode(this.currentFlowFile, baos);
            }
            flowFileEncoded = baos.toByteArray();
        }
        int metadataLength = flowFileEncoded.length;
        ByteBuffer buffer = ByteBuffer.allocate(flowFileEncoded.length + 5);
        buffer.put((byte)49);
        this.checksum.update(49);
        buffer.putInt(metadataLength);
        this.checksum.update(metadataLength >> 24 & 0xFF);
        this.checksum.update(metadataLength >> 16 & 0xFF);
        this.checksum.update(metadataLength >> 8 & 0xFF);
        this.checksum.update(metadataLength & 0xFF);
        buffer.put(flowFileEncoded);
        this.checksum.update(flowFileEncoded, 0, flowFileEncoded.length);
        this.phase = TransactionPhase.SEND_FLOWFILE_DEFINITION;
        buffer.rewind();
        return buffer;
    }

    private ByteBuffer recommendProtocolVersion() {
        logger.debug("Recommending to Peer {} that Protocol Version {} be used", (Object)this.peerDescription, (Object)this.protocolVersion);
        ByteBuffer buffer = ByteBuffer.allocate(1);
        buffer.put((byte)this.protocolVersion);
        buffer.rewind();
        this.readTimeout = System.currentTimeMillis() + (long)this.timeoutMillis;
        this.phase = TransactionPhase.RECEIVE_PROTOCOL_VERSION_ACKNOWLEDGMENT;
        return buffer;
    }

    private boolean receiveProtocolVersionAcknowledgment() throws IOException {
        logger.debug("Confirming Transaction Complete for Peer {}", (Object)this.peerDescription);
        OptionalInt ackResponse = this.channel.read();
        if (!ackResponse.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to acknowledge Protocol Version");
            }
            return false;
        }
        int response = ackResponse.getAsInt();
        if (response < 0) {
            throw new EOFException("Encounter End-of-File with Peer " + this.peerDescription + " when expecting a Protocol Version Acknowledgment");
        }
        if (response == 16) {
            logger.debug("Peer {} accepted Protocol Version {}", (Object)this.peerDescription, (Object)this.protocolVersion);
            this.phase = TransactionPhase.SEND_CONNECTION_ID;
            return true;
        }
        if (response == 17) {
            logger.debug("Recommended using Protocol Version of {} with Peer {} but received REQUEST_DIFFERENT_VERSION response", (Object)this.protocolVersion, (Object)this.peerDescription);
            this.readTimeout = System.currentTimeMillis() + (long)this.timeoutMillis;
            this.phase = TransactionPhase.RECEIVE_RECOMMENDED_PROTOCOL_VERSION;
            return true;
        }
        throw new IOException("Failed to negotiate Protocol Version with Peer " + this.peerDescription + ". Recommended version " + this.protocolVersion + " but instead of an ACCEPT or REJECT response got back a response of " + response);
    }

    private boolean receiveRecommendedProtocolVersion() throws IOException {
        logger.debug("Receiving Protocol Version from Peer {}", (Object)this.peerDescription);
        OptionalInt recommendationResponse = this.channel.read();
        if (!recommendationResponse.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to recommend Protocol Version");
            }
            return false;
        }
        int requestedVersion = recommendationResponse.getAsInt();
        if (requestedVersion < 0) {
            throw new EOFException("Encounter End-of-File with Peer " + this.peerDescription + " when expecting a Protocol Version Recommendation");
        }
        if (this.negotiator.isVersionSupported(requestedVersion)) {
            this.protocolVersion = requestedVersion;
            this.phase = TransactionPhase.SEND_CONNECTION_ID;
            logger.debug("Peer {} recommended Protocol Version of {}. Accepting version.", (Object)this.peerDescription, (Object)requestedVersion);
            return true;
        }
        Integer preferred = this.negotiator.getPreferredVersion(requestedVersion);
        if (preferred == null) {
            logger.debug("Peer {} requested version {} of the Load Balance Protocol. This version is not acceptable. Aborting communications.", (Object)this.peerDescription, (Object)requestedVersion);
            this.phase = TransactionPhase.ABORT_PROTOCOL_NEGOTIATION;
            return true;
        }
        logger.debug("Peer {} requested version {} of the Protocol. Recommending version {} instead", new Object[]{this.peerDescription, requestedVersion, preferred});
        this.protocolVersion = preferred;
        this.phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
        return true;
    }

    private ByteBuffer noMoreFlowFiles() {
        ByteBuffer buffer = ByteBuffer.allocate(1);
        buffer.put((byte)50);
        buffer.rewind();
        this.checksum.update(50);
        return buffer;
    }

    private ByteBuffer abortProtocolNegotiation() {
        ByteBuffer buffer = ByteBuffer.allocate(1);
        buffer.put((byte)18);
        buffer.rewind();
        return buffer;
    }

    private ByteBuffer getConnectionId() {
        logger.debug("Sending Connection ID {} to Peer {}", (Object)this.connectionId, (Object)this.peerDescription);
        ByteBuffer buffer = ByteBuffer.allocate(this.connectionId.length() + 2);
        buffer.putShort((short)this.connectionId.length());
        buffer.put(this.connectionId.getBytes(StandardCharsets.UTF_8));
        buffer.rewind();
        byte[] frameBytes = buffer.array();
        this.checksum.update(frameBytes, 0, frameBytes.length);
        this.phase = TransactionPhase.CHECK_SPACE;
        return buffer;
    }

    private ByteBuffer checkSpace() {
        logger.debug("Sending a 'Check Space' request to Peer {} to determine if there is space in the queue for more FlowFiles", (Object)this.peerDescription);
        ByteBuffer buffer = ByteBuffer.allocate(1);
        if (this.partition.isHonorBackpressure()) {
            buffer.put((byte)97);
            this.checksum.update(97);
            this.readTimeout = System.currentTimeMillis() + (long)this.timeoutMillis;
            this.phase = TransactionPhase.RECEIVE_SPACE_RESPONSE;
        } else {
            buffer.put((byte)98);
            this.checksum.update(98);
            this.phase = TransactionPhase.GET_NEXT_FLOWFILE;
        }
        buffer.rewind();
        return buffer;
    }

    private boolean receiveSpaceAvailableResponse() throws IOException {
        logger.debug("Receiving response from Peer {} to determine whether or not space is available in queue {}", (Object)this.peerDescription, (Object)this.connectionId);
        OptionalInt spaceAvailableResponse = this.channel.read();
        if (!spaceAvailableResponse.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to verify whether or not space is available for Connection " + this.connectionId);
            }
            return false;
        }
        int response = spaceAvailableResponse.getAsInt();
        if (response < 0) {
            throw new EOFException("Encountered End-of-File when trying to verify with Peer " + this.peerDescription + " whether or not space is available in Connection " + this.connectionId);
        }
        if (response == 101) {
            logger.debug("Peer {} has confirmed that space is available in Connection {}", (Object)this.peerDescription, (Object)this.connectionId);
            this.phase = TransactionPhase.GET_NEXT_FLOWFILE;
        } else if (response == 102) {
            logger.debug("Peer {} has confirmed that the queue is full for Connection {}", (Object)this.peerDescription, (Object)this.connectionId);
            this.phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
            this.checksum.reset();
            this.complete = true;
            this.partition.penalize(1000L);
        } else {
            throw new TransactionAbortedException("After requesting to know whether or not Peer " + this.peerDescription + " has space available in Connection " + this.connectionId + ", received unexpected response of " + response + ". Aborting transaction.");
        }
        return true;
    }

    private void penalize() {
        this.penaltyExpiration = System.currentTimeMillis() + PENALTY_MILLIS;
    }

    private boolean isPenalized() {
        return this.penaltyExpiration > -1L && System.currentTimeMillis() < this.penaltyExpiration;
    }

    private static enum TransactionPhase {
        RECOMMEND_PROTOCOL_VERSION(4),
        RECEIVE_PROTOCOL_VERSION_ACKNOWLEDGMENT(1),
        RECEIVE_RECOMMENDED_PROTOCOL_VERSION(1),
        ABORT_PROTOCOL_NEGOTIATION(4),
        SEND_CONNECTION_ID(4),
        CHECK_SPACE(4),
        RECEIVE_SPACE_RESPONSE(1),
        SEND_FLOWFILE_DEFINITION(4),
        SEND_FLOWFILE_CONTENTS(4),
        GET_NEXT_FLOWFILE(4),
        SEND_CHECKSUM(4),
        VERIFY_CHECKSUM(1),
        SEND_TRANSACTION_COMPLETE(4),
        CONFIRM_TRANSACTION_COMPLETE(1);

        private final int requiredSelectionKey;

        private TransactionPhase(int requiredSelectionKey) {
            this.requiredSelectionKey = requiredSelectionKey;
        }

        public int getRequiredSelectionKey() {
            return this.requiredSelectionKey;
        }
    }
}

