/*
 * Decompiled with CFR 0.152.
 */
package com.higherfrequencytrading.chronicle.tcp;

import com.higherfrequencytrading.chronicle.Chronicle;
import com.higherfrequencytrading.chronicle.Excerpt;
import com.higherfrequencytrading.chronicle.impl.IndexedChronicle;
import com.higherfrequencytrading.chronicle.tcp.NamedThreadFactory;
import com.higherfrequencytrading.chronicle.tcp.TcpUtil;
import com.higherfrequencytrading.chronicle.tools.IOTools;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jetbrains.annotations.NotNull;

public class ChronicleSource<C extends Chronicle>
implements Closeable {
    @NotNull
    private final C chronicle;
    private final ServerSocketChannel server;
    private final int delayNS;
    @NotNull
    private final String name;
    @NotNull
    private final ExecutorService service;
    private final Logger logger;
    private volatile boolean closed = false;

    public ChronicleSource(@NotNull C chronicle, int port, int delayNS) throws IOException {
        this.chronicle = chronicle;
        this.delayNS = delayNS;
        this.server = ServerSocketChannel.open();
        this.server.socket().bind(new InetSocketAddress(port));
        this.name = chronicle.name() + "@" + port;
        this.logger = Logger.getLogger(this.getClass().getName() + "." + this.name);
        this.service = Executors.newCachedThreadPool(new NamedThreadFactory(this.name));
        this.service.execute(new Acceptor());
    }

    public static void main(String ... args) throws IOException {
        if (args.length < 2) {
            System.err.println("Usage: java " + ChronicleSource.class.getName() + " {chronicle-base-path} {port} [delayNS]");
            System.exit(-1);
        }
        int dataBitsHintSize = Integer.getInteger("dataBitsHintSize", 27);
        String def = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN ? "Big" : "Little";
        ByteOrder byteOrder = System.getProperty("byteOrder", def).equalsIgnoreCase("Big") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN;
        String basePath = args[0];
        int port = Integer.parseInt(args[1]);
        int delayNS = 5000000;
        if (args.length > 2) {
            delayNS = Integer.parseInt(args[2]);
        }
        IndexedChronicle ic = new IndexedChronicle(basePath, dataBitsHintSize, byteOrder);
        new ChronicleSource<IndexedChronicle>(ic, port, delayNS);
    }

    protected void pause(int delayNS) {
        if (delayNS < 1) {
            return;
        }
        long start = System.nanoTime();
        if (delayNS >= 1000000) {
            LockSupport.parkNanos(delayNS - 1000000);
        }
        while (System.nanoTime() - start < (long)delayNS) {
            Thread.yield();
        }
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        this.service.shutdown();
        try {
            this.service.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.chronicle.close();
    }

    class Handler
    implements Runnable {
        private final SocketChannel socket;

        public Handler(SocketChannel socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            block7: {
                try {
                    long index = this.readIndex(this.socket);
                    Excerpt excerpt = ChronicleSource.this.chronicle.createExcerpt();
                    ByteBuffer bb = TcpUtil.createBuffer(1, ChronicleSource.this.chronicle.byteOrder());
                    if (ChronicleSource.this.closed) {
                        return;
                    }
                    while (true) {
                        if (!excerpt.index(index)) {
                            ChronicleSource.this.pause(ChronicleSource.this.delayNS);
                            continue;
                        }
                        int size = excerpt.capacity();
                        bb.clear();
                        bb.putLong(index);
                        bb.putInt(size);
                        for (int remaining = size + 12; remaining > 0; remaining -= bb.remaining()) {
                            int size2 = Math.min(remaining, bb.capacity());
                            bb.limit(size2);
                            excerpt.read(bb);
                            bb.flip();
                            IOTools.writeAll(this.socket, bb);
                        }
                        if (bb.remaining() > 0) {
                            throw new EOFException("Failed to send index=" + index);
                        }
                        ++index;
                        if (ChronicleSource.this.closed) break;
                    }
                }
                catch (IOException e) {
                    if (ChronicleSource.this.closed) break block7;
                    ChronicleSource.this.logger.log(Level.INFO, "Connect " + this.socket + " died", e);
                }
            }
        }

        private long readIndex(SocketChannel socket) throws IOException {
            ByteBuffer bb = ByteBuffer.allocate(8);
            IOTools.readFullyOrEOF(socket, bb);
            return bb.getLong(0);
        }
    }

    class Acceptor
    implements Runnable {
        Acceptor() {
        }

        @Override
        public void run() {
            block3: {
                Thread.currentThread().setName(ChronicleSource.this.name + "-acceptor");
                try {
                    while (!ChronicleSource.this.closed) {
                        SocketChannel socket = ChronicleSource.this.server.accept();
                        ChronicleSource.this.service.execute(new Handler(socket));
                    }
                }
                catch (IOException e) {
                    if (ChronicleSource.this.closed) break block3;
                    ChronicleSource.this.logger.log(Level.SEVERE, "Acceptor dying", e);
                }
            }
        }
    }
}

