/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.common.network;

import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.logging.Logger;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.ISelectHandler;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.common.network.IncomingPacket;
import org.apache.heron.common.network.OutgoingPacket;

public class SocketChannelHelper {
    private static final Logger LOG = Logger.getLogger(SocketChannelHelper.class.getName());
    private final NIOLooper looper;
    private final ISelectHandler selectHandler;
    private final SocketChannel socketChannel;
    private final Queue<OutgoingPacket> outgoingPacketsToWrite;
    private final ByteAmount writeBatchSize;
    private final Duration writeBatchTime;
    private final ByteAmount readBatchSize;
    private final Duration readReadBatchTime;
    private IncomingPacket incomingPacket;
    private long totalPacketsRead;
    private long totalPacketsWritten;
    private long totalBytesRead;
    private long totalBytesWritten;
    private ByteAmount maximumPacketSize;

    public SocketChannelHelper(NIOLooper looper, ISelectHandler selectHandler, SocketChannel socketChannel, HeronSocketOptions options) {
        this.looper = looper;
        this.selectHandler = selectHandler;
        this.socketChannel = socketChannel;
        this.outgoingPacketsToWrite = new LinkedList<OutgoingPacket>();
        this.incomingPacket = new IncomingPacket();
        this.writeBatchSize = options.getNetworkWriteBatchSize();
        this.writeBatchTime = options.getNetworkWriteBatchTime();
        this.readBatchSize = options.getNetworkReadBatchSize();
        this.readReadBatchTime = options.getNetworkReadBatchTime();
        this.maximumPacketSize = options.getMaximumPacketSize();
        this.enableReading();
    }

    public void clear() {
        this.outgoingPacketsToWrite.clear();
    }

    public boolean sendPacket(OutgoingPacket outgoingPacket) {
        this.outgoingPacketsToWrite.add(outgoingPacket);
        this.enableWriting();
        return true;
    }

    public List<IncomingPacket> read() {
        int readState;
        long bytesRead;
        long startOfCycle = System.nanoTime();
        long nPacketsRead = 0L;
        ArrayList<IncomingPacket> ret = new ArrayList<IncomingPacket>();
        for (bytesRead = 0L; System.nanoTime() - startOfCycle - this.readReadBatchTime.toNanos() < 0L && bytesRead < this.readBatchSize.asBytes() && (readState = this.incomingPacket.readFromChannel(this.socketChannel, this.maximumPacketSize.asBytes())) <= 0; bytesRead += (long)this.incomingPacket.size()) {
            if (readState < 0) {
                LOG.severe("Something bad happened while reading from channel: " + this.socketChannel.socket().getRemoteSocketAddress());
                this.selectHandler.handleError(this.socketChannel);
                ret.clear();
                break;
            }
            ++nPacketsRead;
            ret.add(this.incomingPacket);
            this.incomingPacket = new IncomingPacket();
        }
        this.totalPacketsRead += nPacketsRead;
        this.totalBytesRead += bytesRead;
        return ret;
    }

    public void write() {
        int writeState;
        OutgoingPacket outgoingPacket;
        long startOfCycle = System.nanoTime();
        long bytesWritten = 0L;
        long nPacketsWritten = 0L;
        while (System.nanoTime() - startOfCycle - this.writeBatchTime.toNanos() < 0L && bytesWritten < this.writeBatchSize.asBytes() && (outgoingPacket = this.outgoingPacketsToWrite.peek()) != null && (writeState = outgoingPacket.writeToChannel(this.socketChannel)) <= 0) {
            if (writeState < 0) {
                LOG.severe("Something bad happened while writing to channel");
                this.selectHandler.handleError(this.socketChannel);
                return;
            }
            bytesWritten += (long)outgoingPacket.size();
            ++nPacketsWritten;
            this.outgoingPacketsToWrite.remove();
        }
        this.totalPacketsWritten += nPacketsWritten;
        this.totalBytesWritten += bytesWritten;
        if (this.getOutstandingPackets() == 0) {
            this.disableWriting();
        }
    }

    public void forceFlushWithBestEffort() {
        LOG.info("Forcing to flush data to socket with best effort.");
        while (!this.outgoingPacketsToWrite.isEmpty()) {
            int writeState = this.outgoingPacketsToWrite.poll().writeToChannel(this.socketChannel);
            if (writeState == 0) continue;
            LOG.info("Failed to write more to Socket. Clear and finish the flush.");
            this.clear();
            return;
        }
    }

    public void enableReading() {
        if (!this.looper.isReadRegistered(this.socketChannel)) {
            try {
                this.looper.registerRead(this.socketChannel, this.selectHandler);
            }
            catch (ClosedChannelException e) {
                this.selectHandler.handleError(this.socketChannel);
            }
        }
    }

    public void disableReading() {
        if (this.looper.isReadRegistered(this.socketChannel)) {
            this.looper.unregisterRead(this.socketChannel);
        }
    }

    public void enableWriting() {
        if (!this.looper.isWriteRegistered(this.socketChannel)) {
            try {
                this.looper.registerWrite(this.socketChannel, this.selectHandler);
            }
            catch (ClosedChannelException e) {
                this.selectHandler.handleError(this.socketChannel);
            }
        }
    }

    public void disableWriting() {
        if (this.looper.isWriteRegistered(this.socketChannel)) {
            this.looper.unregisterWrite(this.socketChannel);
        }
    }

    public int getOutstandingPackets() {
        return this.outgoingPacketsToWrite.size();
    }

    public boolean hasPacketsToSend() {
        return this.outgoingPacketsToWrite.size() > 0;
    }

    public long getTotalPacketsWritten() {
        return this.totalPacketsWritten;
    }

    public long getTotalPacketsRead() {
        return this.totalPacketsRead;
    }

    public long getTotalBytesRead() {
        return this.totalBytesRead;
    }

    public long getTotalBytesWritten() {
        return this.totalBytesWritten;
    }
}

