package org.apache.heron.common.network;

import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.logging.Logger;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.ISelectHandler;
import org.apache.heron.common.basics.NIOLooper;

/* loaded from: input_file:org/apache/heron/common/network/SocketChannelHelper.class */
public class SocketChannelHelper {
    private static final Logger LOG = Logger.getLogger(SocketChannelHelper.class.getName());
    private final NIOLooper looper;
    private final ISelectHandler selectHandler;
    private final SocketChannel socketChannel;
    private final ByteAmount writeBatchSize;
    private final Duration writeBatchTime;
    private final ByteAmount readBatchSize;
    private final Duration readReadBatchTime;
    private long totalPacketsRead;
    private long totalPacketsWritten;
    private long totalBytesRead;
    private long totalBytesWritten;
    private ByteAmount maximumPacketSize;
    private final Queue<OutgoingPacket> outgoingPacketsToWrite = new LinkedList();
    private IncomingPacket incomingPacket = new IncomingPacket();

    public SocketChannelHelper(NIOLooper nIOLooper, ISelectHandler iSelectHandler, SocketChannel socketChannel, HeronSocketOptions heronSocketOptions) {
        this.looper = nIOLooper;
        this.selectHandler = iSelectHandler;
        this.socketChannel = socketChannel;
        this.writeBatchSize = heronSocketOptions.getNetworkWriteBatchSize();
        this.writeBatchTime = heronSocketOptions.getNetworkWriteBatchTime();
        this.readBatchSize = heronSocketOptions.getNetworkReadBatchSize();
        this.readReadBatchTime = heronSocketOptions.getNetworkReadBatchTime();
        this.maximumPacketSize = heronSocketOptions.getMaximumPacketSize();
        enableReading();
    }

    public void clear() {
        this.outgoingPacketsToWrite.clear();
    }

    public boolean sendPacket(OutgoingPacket outgoingPacket) {
        this.outgoingPacketsToWrite.add(outgoingPacket);
        enableWriting();
        return true;
    }

    public List<IncomingPacket> read() {
        int readFromChannel;
        long nanoTime = System.nanoTime();
        long j = 0;
        long j2 = 0;
        ArrayList arrayList = new ArrayList();
        while (true) {
            if ((System.nanoTime() - nanoTime) - this.readReadBatchTime.toNanos() >= 0 || j >= this.readBatchSize.asBytes() || (readFromChannel = this.incomingPacket.readFromChannel(this.socketChannel, this.maximumPacketSize.asBytes())) > 0) {
                break;
            }
            if (readFromChannel < 0) {
                LOG.severe("Something bad happened while reading from channel: " + this.socketChannel.socket().getRemoteSocketAddress());
                this.selectHandler.handleError(this.socketChannel);
                arrayList.clear();
                break;
            }
            j2++;
            j += this.incomingPacket.size();
            arrayList.add(this.incomingPacket);
            this.incomingPacket = new IncomingPacket();
        }
        this.totalPacketsRead += j2;
        this.totalBytesRead += j;
        return arrayList;
    }

    public void write() {
        OutgoingPacket peek;
        int writeToChannel;
        long nanoTime = System.nanoTime();
        long j = 0;
        long j2 = 0;
        while ((System.nanoTime() - nanoTime) - this.writeBatchTime.toNanos() < 0 && j < this.writeBatchSize.asBytes() && (peek = this.outgoingPacketsToWrite.peek()) != null && (writeToChannel = peek.writeToChannel(this.socketChannel)) <= 0) {
            if (writeToChannel < 0) {
                LOG.severe("Something bad happened while writing to channel");
                this.selectHandler.handleError(this.socketChannel);
                return;
            } else {
                j += peek.size();
                j2++;
                this.outgoingPacketsToWrite.remove();
            }
        }
        this.totalPacketsWritten += j2;
        this.totalBytesWritten += j;
        if (getOutstandingPackets() == 0) {
            disableWriting();
        }
    }

    public void forceFlushWithBestEffort() {
        LOG.info("Forcing to flush data to socket with best effort.");
        while (!this.outgoingPacketsToWrite.isEmpty()) {
            if (this.outgoingPacketsToWrite.poll().writeToChannel(this.socketChannel) != 0) {
                LOG.info("Failed to write more to Socket. Clear and finish the flush.");
                clear();
                return;
            }
        }
    }

    public void enableReading() {
        if (this.looper.isReadRegistered(this.socketChannel)) {
            return;
        }
        try {
            this.looper.registerRead(this.socketChannel, this.selectHandler);
        } catch (ClosedChannelException e) {
            this.selectHandler.handleError(this.socketChannel);
        }
    }

    public void disableReading() {
        if (this.looper.isReadRegistered(this.socketChannel)) {
            this.looper.unregisterRead(this.socketChannel);
        }
    }

    public void enableWriting() {
        if (this.looper.isWriteRegistered(this.socketChannel)) {
            return;
        }
        try {
            this.looper.registerWrite(this.socketChannel, this.selectHandler);
        } catch (ClosedChannelException e) {
            this.selectHandler.handleError(this.socketChannel);
        }
    }

    public void disableWriting() {
        if (this.looper.isWriteRegistered(this.socketChannel)) {
            this.looper.unregisterWrite(this.socketChannel);
        }
    }

    public int getOutstandingPackets() {
        return this.outgoingPacketsToWrite.size();
    }

    public boolean hasPacketsToSend() {
        return this.outgoingPacketsToWrite.size() > 0;
    }

    public long getTotalPacketsWritten() {
        return this.totalPacketsWritten;
    }

    public long getTotalPacketsRead() {
        return this.totalPacketsRead;
    }

    public long getTotalBytesRead() {
        return this.totalBytesRead;
    }

    public long getTotalBytesWritten() {
        return this.totalBytesWritten;
    }
}
