package net.openhft.chronicle.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleConfig;
import net.openhft.chronicle.Excerpt;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptCommon;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.VanillaChronicle;
import net.openhft.chronicle.tools.WrappedExcerpt;
import net.openhft.lang.model.constraints.NotNull;
import net.openhft.lang.thread.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/tcp/VanillaChronicleSource.class */
public class VanillaChronicleSource implements Chronicle {
    static final int IN_SYNC_LEN = -128;
    static final int PADDED_LEN = -127;
    private static final long HEARTBEAT_INTERVAL_MS = 2500;
    private static final int MAX_MESSAGE = 128;

    @NotNull
    private final VanillaChronicle chronicle;
    private final Selector selector;

    @NotNull
    private final String name;

    @NotNull
    private final ExecutorService service;
    private final Logger logger;
    private static final long busyWaitTimeNS = 100000;
    private final Object notifier = new Object();
    private volatile boolean closed = false;
    private long lastUnpausedNS = 0;
    private final ServerSocketChannel server = ServerSocketChannel.open();

    /* loaded from: input_file:net/openhft/chronicle/tcp/VanillaChronicleSource$Acceptor.class */
    private class Acceptor implements Runnable {
        private Acceptor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(VanillaChronicleSource.this.name + "-acceptor");
            while (!VanillaChronicleSource.this.closed) {
                try {
                    try {
                        VanillaChronicleSource.this.selector.select();
                        Iterator<SelectionKey> it = VanillaChronicleSource.this.selector.keys().iterator();
                        while (it.hasNext()) {
                            if (it.next().isAcceptable()) {
                                SocketChannel accept = VanillaChronicleSource.this.server.accept();
                                accept.configureBlocking(true);
                                VanillaChronicleSource.this.service.execute(new Handler(accept));
                            }
                        }
                    } catch (IOException e) {
                        if (!VanillaChronicleSource.this.closed) {
                            VanillaChronicleSource.this.logger.warn("Acceptor dying", e);
                        }
                        VanillaChronicleSource.this.service.shutdown();
                        VanillaChronicleSource.this.logger.info("Acceptor loop ended");
                        return;
                    }
                } catch (Throwable th) {
                    VanillaChronicleSource.this.service.shutdown();
                    VanillaChronicleSource.this.logger.info("Acceptor loop ended");
                    throw th;
                }
            }
            VanillaChronicleSource.this.service.shutdown();
            VanillaChronicleSource.this.logger.info("Acceptor loop ended");
        }
    }

    /* loaded from: input_file:net/openhft/chronicle/tcp/VanillaChronicleSource$Handler.class */
    class Handler implements Runnable {

        @NotNull
        private final SocketChannel socket;

        public Handler(@NotNull SocketChannel socketChannel) throws SocketException {
            this.socket = socketChannel;
            socketChannel.socket().setSendBufferSize(262144);
            socketChannel.socket().setTcpNoDelay(true);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                long readIndex = readIndex(this.socket);
                ExcerptTailer createTailer = VanillaChronicleSource.this.chronicle.createTailer();
                ByteBuffer createBuffer = TcpUtil.createBuffer(1, ByteOrder.nativeOrder());
                long j = 0;
                createTailer.index(readIndex);
                loop0: while (!VanillaChronicleSource.this.closed) {
                    while (!createTailer.nextIndex()) {
                        long currentTimeMillis = System.currentTimeMillis();
                        if (j <= currentTimeMillis) {
                            createBuffer.clear();
                            createBuffer.putLong(-128L);
                            createBuffer.putInt(VanillaChronicleSource.IN_SYNC_LEN);
                            createBuffer.flip();
                            TcpUtil.writeAll(this.socket, createBuffer);
                            j = currentTimeMillis + VanillaChronicleSource.HEARTBEAT_INTERVAL_MS;
                        }
                        VanillaChronicleSource.this.pause();
                        if (VanillaChronicleSource.this.closed) {
                            break loop0;
                        }
                    }
                    VanillaChronicleSource.this.pauseReset();
                    long capacity = createTailer.capacity();
                    createBuffer.clear();
                    long j2 = capacity + 8 + 4;
                    createBuffer.putLong(createTailer.index());
                    createBuffer.putInt((int) capacity);
                    if (capacity > createBuffer.capacity() / 2) {
                        while (j2 > 0) {
                            createBuffer.limit((int) Math.min(j2, createBuffer.capacity()));
                            createTailer.read(createBuffer);
                            createBuffer.flip();
                            j2 -= createBuffer.remaining();
                            TcpUtil.writeAll(this.socket, createBuffer);
                        }
                    } else {
                        createBuffer.limit((int) j2);
                        createTailer.read(createBuffer);
                        int i = 1;
                        while (true) {
                            int i2 = i;
                            i++;
                            if (i2 >= VanillaChronicleSource.MAX_MESSAGE) {
                                break;
                            }
                            if (createTailer.nextIndex()) {
                                if (createTailer.wasPadding()) {
                                    throw new AssertionError("Entry should not be padding - remove");
                                }
                                if (createTailer.remaining() + 4 + 8 >= createBuffer.capacity() - createBuffer.position()) {
                                    break;
                                }
                                int capacity2 = (int) createTailer.capacity();
                                createBuffer.limit(createBuffer.position() + capacity2 + 4 + 8);
                                createBuffer.putLong(createTailer.index());
                                createBuffer.putInt(capacity2);
                                createTailer.read(createBuffer);
                            }
                        }
                        createBuffer.flip();
                        TcpUtil.writeAll(this.socket, createBuffer);
                    }
                    if (createBuffer.remaining() > 0) {
                        throw new EOFException("Failed to send index=" + createTailer.index());
                    }
                    j = 0;
                }
            } catch (Exception e) {
                if (VanillaChronicleSource.this.closed) {
                    return;
                }
                String message = e.getMessage();
                if (message == null || !(message.contains("reset by peer") || message.contains("Broken pipe") || message.contains("was aborted by"))) {
                    VanillaChronicleSource.this.logger.info("Connect {} died", this.socket, e);
                } else {
                    VanillaChronicleSource.this.logger.info("Connect {} closed from the other end ", this.socket, e);
                }
            }
        }

        private long readIndex(@NotNull SocketChannel socketChannel) throws IOException {
            ByteBuffer allocate = ByteBuffer.allocate(8);
            TcpUtil.readFullyOrEOF(socketChannel, allocate);
            return allocate.getLong(0);
        }
    }

    /* loaded from: input_file:net/openhft/chronicle/tcp/VanillaChronicleSource$SourceExcerpt.class */
    class SourceExcerpt extends WrappedExcerpt {
        public SourceExcerpt(ExcerptCommon excerptCommon) {
            super(excerptCommon);
        }

        @Override // net.openhft.chronicle.tools.WrappedExcerpt, net.openhft.chronicle.ExcerptCommon
        public void finish() {
            super.finish();
            VanillaChronicleSource.this.wakeSessionHandlers();
        }
    }

    public VanillaChronicleSource(@NotNull VanillaChronicle vanillaChronicle, int i) throws IOException {
        this.chronicle = vanillaChronicle;
        this.server.socket().setReuseAddress(true);
        this.server.socket().bind(new InetSocketAddress(i));
        this.server.configureBlocking(false);
        this.selector = Selector.open();
        this.server.register(this.selector, 16);
        this.name = vanillaChronicle.name() + "@" + i;
        this.logger = LoggerFactory.getLogger(getClass().getName() + "." + this.name);
        this.service = Executors.newCachedThreadPool(new NamedThreadFactory(this.name, true));
        this.service.execute(new Acceptor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void pauseReset() {
        this.lastUnpausedNS = System.nanoTime();
    }

    void pause() {
        if (this.lastUnpausedNS + busyWaitTimeNS > System.nanoTime()) {
            return;
        }
        try {
            synchronized (this.notifier) {
                this.notifier.wait(1250L);
            }
        } catch (InterruptedException e) {
            this.logger.warn("Interrupt ignored");
        }
    }

    void wakeSessionHandlers() {
        synchronized (this.notifier) {
            this.notifier.notifyAll();
        }
    }

    @Override // net.openhft.chronicle.Chronicle
    public String name() {
        return this.chronicle.name();
    }

    public int getLocalPort() {
        return this.server.socket().getLocalPort();
    }

    public void clear() {
        this.chronicle.clear();
    }

    @Override // net.openhft.chronicle.Chronicle
    @NotNull
    public Excerpt createExcerpt() throws IOException {
        return new SourceExcerpt(this.chronicle.createExcerpt());
    }

    @Override // net.openhft.chronicle.Chronicle
    @NotNull
    public ExcerptTailer createTailer() throws IOException {
        return new SourceExcerpt(this.chronicle.createTailer());
    }

    @Override // net.openhft.chronicle.Chronicle
    @NotNull
    public ExcerptAppender createAppender() throws IOException {
        return new SourceExcerpt(this.chronicle.createAppender());
    }

    @Override // net.openhft.chronicle.Chronicle
    public long lastWrittenIndex() {
        return this.chronicle.lastWrittenIndex();
    }

    @Override // net.openhft.chronicle.Chronicle
    public long size() {
        return this.chronicle.size();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        try {
            this.chronicle.close();
            this.server.close();
            this.service.shutdownNow();
            this.service.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        } catch (IOException e) {
            this.logger.warn("Error closing server port", e);
        } catch (InterruptedException e2) {
            this.logger.warn("Error shutting down service threads", e2);
        }
    }

    public ChronicleConfig config() {
        throw new UnsupportedOperationException();
    }
}
