/*
 * Decompiled with CFR 0.152.
 */
package org.gridkit.zerormi;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.gridkit.zerormi.DuplexStream;
import org.gridkit.zerormi.IOHelper;
import org.gridkit.zerormi.RemoteMessage;
import org.gridkit.zerormi.RmiChannel;
import org.gridkit.zerormi.RmiChannel1;
import org.gridkit.zerormi.RmiMarshaler;
import org.gridkit.zerormi.SmartRmiMarshaler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RmiGateway {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmiGateway.class);
    private final RmiChannel channel;
    private final ExecutorService executor;
    private boolean connected = false;
    private boolean terminated = false;
    private String name;
    private DuplexStream socket;
    private RmiObjectInputStream in;
    private RmiObjectOutputStream out;
    private ExecutorService service;
    private CounterAgent remote;
    private Thread readerThread;
    private StreamErrorHandler streamErrorHandler = new StreamErrorHandler(){

        @Override
        public void streamError(DuplexStream socket, Object stream, Exception error) {
            RmiGateway.this.shutdown();
        }

        @Override
        public void streamClosed(DuplexStream socket, Object stream) {
            RmiGateway.this.shutdown();
        }
    };

    public RmiGateway(String name) {
        this(name, new SmartRmiMarshaler());
    }

    private ExecutorService createRmiExecutor() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){
            int counter = 1;

            @Override
            public synchronized Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("RMI[" + RmiGateway.this.name + "]-worker-" + this.counter++);
                t.setDaemon(true);
                return t;
            }
        });
    }

    public RmiGateway(String name, RmiMarshaler marshaler) {
        this.executor = this.createRmiExecutor();
        this.channel = new RmiChannel1(new MessageOut(), this.executor, marshaler);
        this.service = new RemoteExecutionService();
        this.name = name;
    }

    public ExecutorService getRemoteExecutorService() {
        return this.service;
    }

    public void setStreamErrorHandler(StreamErrorHandler errorHandler) {
        this.streamErrorHandler = errorHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disconnect() {
        Thread readerThread = null;
        RmiGateway rmiGateway = this;
        synchronized (rmiGateway) {
            if (this.connected) {
                LOGGER.info("RMI gateway [" + this.name + "] disconneted.");
                readerThread = this.readerThread;
                try {
                    this.out.writeObject("close");
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.in.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.out.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.socket.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.in = null;
                this.out = null;
                this.socket = null;
                this.connected = false;
            }
        }
        if (readerThread != null) {
            readerThread.interrupt();
            try {
                readerThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public synchronized boolean isConnected() {
        return this.connected && !this.terminated && !this.socket.isClosed();
    }

    public synchronized void shutdown() {
        if (this.terminated) {
            return;
        }
        LOGGER.info("RMI gateway [" + this.name + "] terminated.");
        this.terminated = true;
        try {
            this.out.writeObject("close");
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.out.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.in.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.socket.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.service.shutdown();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.channel.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.executor.shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void connect(DuplexStream socket) throws IOException {
        if (this.socket != null) {
            throw new IllegalStateException("Already connected");
        }
        try {
            this.socket = socket;
            this.out = new RmiObjectOutputStream(socket.getOutput());
            LocalAgent localAgent = new LocalAgent();
            this.channel.exportObject(CounterAgent.class, localAgent);
            RmiObjectOutputStream rmiObjectOutputStream = this.out;
            synchronized (rmiObjectOutputStream) {
                this.out.writeUnshared(localAgent);
                this.out.reset();
                this.out.flush();
            }
            this.in = new RmiObjectInputStream(socket.getInput());
            this.remote = (CounterAgent)this.in.readObject();
            this.readerThread = new SocketReader();
            this.readerThread.setName("RMI-Receiver: " + socket);
            this.readerThread.start();
            this.connected = true;
        }
        catch (Exception e) {
            try {
                if (this.in != null) {
                    this.in.close();
                }
            }
            catch (IOException e1) {
                // empty catch block
            }
            try {
                if (this.out != null) {
                    this.out.close();
                }
            }
            catch (IOException e1) {
                // empty catch block
            }
            try {
                if (this.socket != null) {
                    this.socket.close();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.in = null;
            this.out = null;
            this.socket = null;
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            throw new RuntimeException(e);
        }
    }

    private class LocalAgent
    implements CounterAgent {
        private LocalAgent() {
        }

        @Override
        public <T> T remoteCall(Callable<T> callable) throws Exception {
            return callable.call();
        }
    }

    public static interface CounterAgent
    extends Remote {
        public <T> T remoteCall(Callable<T> var1) throws RemoteException, Exception;
    }

    public static class CallableRunnableWrapper<T>
    implements Callable<T>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private Runnable runnable;
        private T result;

        public CallableRunnableWrapper() {
        }

        public CallableRunnableWrapper(Runnable runnable, T result) {
            this.runnable = runnable;
            this.result = result;
        }

        @Override
        public T call() throws Exception {
            this.runnable.run();
            return this.result;
        }
    }

    private class RemoteExecutionService
    extends AbstractExecutorService {
        private final ExecutorService threadPool;

        private RemoteExecutionService() {
            this.threadPool = RmiGateway.this.executor;
        }

        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            return this.submit(new CallableRunnableWrapper<T>(task, result));
        }

        @Override
        public Future<?> submit(Runnable task) {
            return this.submit(new CallableRunnableWrapper<Object>(task, null));
        }

        @Override
        public <T> Future<T> submit(Callable<T> task) {
            task = this.wrap(task);
            return this.threadPool.submit(task);
        }

        @Override
        public void execute(Runnable command) {
            this.submit(new CallableRunnableWrapper<Object>(command, null));
        }

        private <T> Callable<T> wrap(final Callable<T> task) {
            return new Callable<T>(){

                @Override
                public T call() throws Exception {
                    return RmiGateway.this.remote.remoteCall(task);
                }
            };
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isShutdown() {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isTerminated() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void shutdown() {
            RmiGateway.this.shutdown();
        }

        @Override
        public List<Runnable> shutdownNow() {
            throw new UnsupportedOperationException();
        }
    }

    public static interface StreamErrorHandler {
        public void streamError(DuplexStream var1, Object var2, Exception var3);

        public void streamClosed(DuplexStream var1, Object var2);
    }

    private class MessageOut
    implements RmiChannel1.OutputChannel {
        private MessageOut() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void send(RemoteMessage message) throws IOException {
            try {
                RmiObjectOutputStream rmiObjectOutputStream = RmiGateway.this.out;
                synchronized (rmiObjectOutputStream) {
                    RmiGateway.this.out.writeUnshared(message);
                    RmiGateway.this.out.reset();
                }
            }
            catch (NullPointerException e) {
                if (RmiGateway.this.out == null) {
                    throw new IOException("RMI gatway [" + RmiGateway.this.name + "] channel is not connected");
                }
                throw e;
            }
            catch (IOException e) {
                DuplexStream socket = RmiGateway.this.socket;
                RmiObjectOutputStream out = RmiGateway.this.out;
                RmiGateway.this.disconnect();
                RmiGateway.this.streamErrorHandler.streamError(socket, out, e);
                throw e;
            }
        }
    }

    private class RmiObjectOutputStream
    extends ObjectOutputStream {
        public RmiObjectOutputStream(OutputStream in) throws IOException {
            super(in);
            this.enableReplaceObject(true);
        }

        @Override
        protected Object replaceObject(Object obj) throws IOException {
            Object r = RmiGateway.this.channel.streamReplaceObject(obj);
            return r;
        }
    }

    private class RmiObjectInputStream
    extends ObjectInputStream {
        public RmiObjectInputStream(InputStream in) throws IOException {
            super(in);
            this.enableResolveObject(true);
        }

        @Override
        protected Object resolveObject(Object obj) throws IOException {
            Object r = RmiGateway.this.channel.streamResolveObject(obj);
            return r;
        }

        public String toString() {
            return "RmiObjectInputStream[" + RmiGateway.this.name + "]";
        }
    }

    private final class SocketReader
    extends Thread
    implements Closeable {
        private SocketReader() {
        }

        @Override
        public void interrupt() {
            super.interrupt();
            this.close();
        }

        @Override
        public void close() {
            try {
                if (RmiGateway.this.in != null) {
                    RmiGateway.this.in.close();
                }
            }
            catch (IOException e) {
                // empty catch block
            }
            try {
                if (RmiGateway.this.socket != null) {
                    RmiGateway.this.socket.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        @Override
        public void run() {
            RmiObjectInputStream chin = RmiGateway.this.in;
            try {
                while (true) {
                    Object message;
                    if ((message = chin.readObject()) == null) {
                        continue;
                    }
                    if ("close".equals(message)) {
                        LOGGER.info("RMI gateway [" + RmiGateway.this.name + "], remote side has requested termination");
                        RmiGateway.this.shutdown();
                        continue;
                    }
                    RmiGateway.this.channel.handleMessage((RemoteMessage)message);
                }
            }
            catch (Exception e) {
                if (IOHelper.isSocketTerminationException(e)) {
                    LOGGER.debug("RMI stream, socket has been discontinued [" + RmiGateway.this.socket + "] - " + e.toString());
                } else {
                    LOGGER.error("RMI stream read exception [" + RmiGateway.this.socket + "]", (Throwable)e);
                }
                DuplexStream socket = RmiGateway.this.socket;
                RmiObjectInputStream in = RmiGateway.this.in;
                RmiGateway.this.readerThread = null;
                LOGGER.debug("disconnecting");
                RmiGateway.this.disconnect();
                if (IOHelper.isSocketTerminationException(e)) {
                    RmiGateway.this.streamErrorHandler.streamClosed(socket, in);
                } else {
                    RmiGateway.this.streamErrorHandler.streamError(socket, in, e);
                }
                return;
            }
        }
    }
}

