/*
 * Decompiled with CFR 0.152.
 */
package com.higherfrequencytrading.chronicle.tcp;

import com.higherfrequencytrading.chronicle.Chronicle;
import com.higherfrequencytrading.chronicle.Excerpt;
import com.higherfrequencytrading.chronicle.ExcerptListener;
import com.higherfrequencytrading.chronicle.impl.IndexedChronicle;
import com.higherfrequencytrading.chronicle.tcp.NamedThreadFactory;
import com.higherfrequencytrading.chronicle.tcp.NullExcerptListener;
import com.higherfrequencytrading.chronicle.tcp.TcpUtil;
import com.higherfrequencytrading.chronicle.tools.IOTools;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class ChronicleSink
implements Closeable {
    @NotNull
    private final Chronicle chronicle;
    @NotNull
    private final SocketAddress address;
    private final ExcerptListener listener;
    @NotNull
    private final ExecutorService service;
    private final Logger logger;
    private volatile boolean closed = false;

    public ChronicleSink(@NotNull Chronicle chronicle, String hostname, int port) {
        this(chronicle, hostname, port, NullExcerptListener.INSTANCE);
    }

    public ChronicleSink(@NotNull Chronicle chronicle, String hostname, int port, ExcerptListener listener) {
        this.chronicle = chronicle;
        this.listener = listener;
        this.address = new InetSocketAddress(hostname, port);
        String name = chronicle.name() + '@' + hostname + ':' + port;
        this.logger = Logger.getLogger(this.getClass().getName() + '.' + chronicle);
        this.service = Executors.newSingleThreadExecutor(new NamedThreadFactory(name));
        this.service.execute(new Sink());
    }

    public static void main(String ... args) throws IOException {
        if (args.length < 3) {
            System.err.println("Usage: java " + ChronicleSink.class.getName() + " {chronicle-base-path} {hostname} {port}");
            System.exit(-1);
        }
        int dataBitsHintSize = Integer.getInteger("dataBitsHintSize", 27);
        String def = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN ? "Big" : "Little";
        ByteOrder byteOrder = System.getProperty("byteOrder", def).equalsIgnoreCase("Big") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN;
        String basePath = args[0];
        String hostname = args[1];
        int port = Integer.parseInt(args[2]);
        IndexedChronicle ic = new IndexedChronicle(basePath, dataBitsHintSize, byteOrder);
        new ChronicleSink(ic, hostname, port);
    }

    void closeSocket(@Nullable SocketChannel sc) {
        if (sc != null) {
            try {
                sc.close();
            }
            catch (IOException e) {
                this.logger.warning("Error closing socket " + e);
            }
        }
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        this.service.shutdownNow();
        this.chronicle.close();
    }

    class Sink
    implements Runnable {
        final Excerpt excerpt;

        Sink() {
            this.excerpt = ChronicleSink.this.chronicle.createExcerpt();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            AbstractInterruptibleChannel sc = null;
            try {
                while (!ChronicleSink.this.closed) {
                    if (sc == null || !sc.isOpen()) {
                        sc = this.createConnection();
                        continue;
                    }
                    this.readNextExcerpt((SocketChannel)sc);
                }
                ChronicleSink.this.closeSocket((SocketChannel)sc);
            }
            catch (Throwable throwable) {
                ChronicleSink.this.closeSocket((SocketChannel)sc);
                throw throwable;
            }
        }

        private void readNextExcerpt(@NotNull SocketChannel sc) {
            block10: {
                ByteBuffer bb = TcpUtil.createBuffer(1, ChronicleSink.this.chronicle.byteOrder());
                try {
                    if (!ChronicleSink.this.closed) {
                        do {
                            this.readHeader(sc, bb);
                            long index = bb.getLong(0);
                            long size = bb.getInt(8);
                            if (index != ChronicleSink.this.chronicle.size()) {
                                throw new StreamCorruptedException("Expected index " + ChronicleSink.this.chronicle.size() + " but got " + index);
                            }
                            if (size > Integer.MAX_VALUE || size < 0L) {
                                throw new StreamCorruptedException("size was " + size);
                            }
                            this.excerpt.startExcerpt((int)size);
                            bb.position(0);
                            for (long remaining = size; remaining > 0L; remaining -= (long)bb.remaining()) {
                                int size2 = (int)Math.min((long)bb.capacity(), remaining);
                                bb.limit(size2);
                                if (sc.read(bb) < 0) {
                                    throw new EOFException();
                                }
                                bb.flip();
                                this.excerpt.write(bb);
                            }
                            this.excerpt.finish();
                            this.excerpt.index(index);
                            ChronicleSink.this.listener.onExcerpt(this.excerpt);
                        } while (!ChronicleSink.this.closed);
                    }
                }
                catch (IOException e) {
                    if (ChronicleSink.this.closed) break block10;
                    if (ChronicleSink.this.logger.isLoggable(Level.FINE)) {
                        ChronicleSink.this.logger.log(Level.FINE, "Lost connection to " + ChronicleSink.this.address + " retrying", e);
                    }
                    if (!ChronicleSink.this.logger.isLoggable(Level.INFO)) break block10;
                    ChronicleSink.this.logger.log(Level.INFO, "Lost connection to " + ChronicleSink.this.address + " retrying " + e);
                }
            }
            if (ChronicleSink.this.logger.isLoggable(Level.FINE)) {
                ChronicleSink.this.logger.log(Level.FINE, "Disconnected from " + ChronicleSink.this.address);
            }
        }

        private void readHeader(SocketChannel sc, @NotNull ByteBuffer bb) throws IOException {
            bb.position(0);
            bb.limit(12);
            IOTools.readFullyOrEOF(sc, bb);
        }

        @Nullable
        private SocketChannel createConnection() {
            if (ChronicleSink.this.closed) {
                return null;
            }
            while (true) {
                try {
                    SocketChannel sc = SocketChannel.open(ChronicleSink.this.address);
                    ByteBuffer bb = ByteBuffer.allocate(8);
                    bb.putLong(0, ChronicleSink.this.chronicle.size());
                    IOTools.writeAllOrEOF(sc, bb);
                    return sc;
                }
                catch (IOException e) {
                    if (ChronicleSink.this.logger.isLoggable(Level.FINE)) {
                        ChronicleSink.this.logger.log(Level.FINE, "Failed to connect to " + ChronicleSink.this.address + " retrying", e);
                        continue;
                    }
                    if (!ChronicleSink.this.logger.isLoggable(Level.INFO)) continue;
                    ChronicleSink.this.logger.log(Level.INFO, "Failed to connect to " + ChronicleSink.this.address + " retrying " + e);
                    if (!ChronicleSink.this.closed) continue;
                    return null;
                }
                break;
            }
        }
    }
}

