package net.openhft.chronicle.tcp;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.tcp.ChronicleTcp;
import net.openhft.lang.model.constraints.NotNull;
import org.slf4j.Logger;

/* loaded from: input_file:net/openhft/chronicle/tcp/ChronicleSourceSocketHandler.class */
public abstract class ChronicleSourceSocketHandler implements Runnable, Closeable {
    private final Logger logger;
    private final Chronicle chronicle;
    private final ChronicleSource source;
    private final ChronicleSourceConfig config;
    protected final SocketChannel socket;
    protected final Selector selector;
    protected ExcerptTailer tailer = null;
    protected final ByteBuffer buffer = ChronicleTcp.createBuffer(1, ByteOrder.nativeOrder());
    protected ChronicleTcp.Command command = new ChronicleTcp.Command();
    protected long lastHeartbeat = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    public ChronicleSourceSocketHandler(@NotNull ChronicleSource chronicleSource, @NotNull SocketChannel socketChannel, @NotNull Logger logger) throws IOException {
        this.logger = logger;
        this.source = chronicleSource;
        this.chronicle = this.source.chronicle();
        this.config = this.source.config();
        this.socket = socketChannel;
        this.socket.configureBlocking(false);
        this.socket.socket().setSendBufferSize(this.config.minBufferSize());
        this.socket.socket().setTcpNoDelay(true);
        this.selector = Selector.open();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.tailer != null) {
            this.tailer.close();
            this.tailer = null;
        }
        if (this.socket.isOpen()) {
            this.socket.close();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.tailer = this.chronicle.createTailer();
            this.socket.register(this.selector, 1);
            while (!this.source.closed() && !Thread.currentThread().isInterrupted()) {
                if (this.selector.select(this.config.selectTimeout()) > 0) {
                    Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        SelectionKey next = it.next();
                        if (next.isReadable()) {
                            if (!onRead(next)) {
                                selectedKeys.clear();
                                break;
                            }
                            it.remove();
                        } else if (!next.isWritable()) {
                            it.remove();
                        } else {
                            if (!onWrite(next)) {
                                selectedKeys.clear();
                                break;
                            }
                            it.remove();
                        }
                    }
                }
            }
        } catch (EOFException e) {
            if (!this.source.closed()) {
                this.logger.info("Connection {} died", this.socket);
            }
        } catch (Exception e2) {
            if (!this.source.closed()) {
                String message = e2.getMessage();
                if (message == null || !(message.contains("reset by peer") || message.contains("Broken pipe") || message.contains("was aborted by"))) {
                    this.logger.info("Connection {} died", this.socket, e2);
                } else {
                    this.logger.info("Connection {} closed from the other end: ", this.socket, e2.getMessage());
                }
            }
        }
        try {
            close();
        } catch (IOException e3) {
            this.logger.warn("", e3);
        }
    }

    protected void setLastHeartbeat() {
        this.lastHeartbeat = System.currentTimeMillis() + this.config.heartbeatInterval();
    }

    protected void setLastHeartbeat(long j) {
        this.lastHeartbeat = j + this.config.heartbeatInterval();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendSizeAndIndex(int i, long j) throws IOException {
        this.buffer.clear();
        this.buffer.putInt(i);
        this.buffer.putLong(j);
        this.buffer.flip();
        ChronicleTcp.writeAll(this.socket, this.buffer);
        setLastHeartbeat();
    }

    protected boolean handleSubscribe(SelectionKey selectionKey) throws IOException {
        return true;
    }

    protected boolean handleQuery(SelectionKey selectionKey) throws IOException {
        if (!this.tailer.index(this.command.data())) {
            sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
            return true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        setLastHeartbeat(currentTimeMillis);
        while (!this.tailer.nextIndex()) {
            if (this.lastHeartbeat <= currentTimeMillis) {
                sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
                return true;
            }
        }
        sendSizeAndIndex(ChronicleTcp.SYNC_IDX_LEN, this.tailer.index());
        return true;
    }

    protected boolean onRead(SelectionKey selectionKey) throws IOException {
        try {
            this.command.read(this.socket);
            if (this.command.isSubscribe()) {
                return handleSubscribe(selectionKey);
            }
            if (this.command.isQuery()) {
                return handleQuery(selectionKey);
            }
            throw new IOException("Unknown action received (" + this.command.action() + ")");
        } catch (EOFException e) {
            selectionKey.selector().close();
            throw e;
        }
    }

    protected boolean onWrite(SelectionKey selectionKey) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.source.closed() || publishData() || this.lastHeartbeat > currentTimeMillis) {
            return true;
        }
        sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
        return true;
    }

    protected abstract boolean publishData() throws IOException;
}
