/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.messaging.local;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.storm.grouping.Load;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Context
implements IContext {
    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
    private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap();

    private static LocalServer getLocalServer(String nodeId, int port) {
        LocalServer tmp;
        String key = nodeId + "-" + port;
        LocalServer ret = _registry.get(key);
        if (ret == null && (tmp = _registry.putIfAbsent(key, ret = new LocalServer(port))) != null) {
            ret = tmp;
        }
        return ret;
    }

    @Override
    public void prepare(Map<String, Object> topoConf) {
    }

    @Override
    public IConnection bind(String storm_id, int port) {
        return Context.getLocalServer(storm_id, port);
    }

    @Override
    public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
        return new LocalClient(Context.getLocalServer(storm_id, port));
    }

    @Override
    public void term() {
    }

    private static class LocalClient
    implements IConnection {
        private final LocalServer _server;
        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
        private final ScheduledExecutorService _pendingFlusher;

        public LocalClient(LocalServer server) {
            this._server = server;
            this._pendingDueToUnregisteredServer = new LinkedBlockingQueue();
            this._pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){

                @Override
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable);
                    thread.setName("LocalClientFlusher-" + thread.getId());
                    thread.setDaemon(true);
                    return thread;
                }
            });
            this._pendingFlusher.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        this.flushPending();
                    }
                    catch (Throwable t) {
                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
                        throw new RuntimeException(t);
                    }
                }
            }, 5L, 5L, TimeUnit.SECONDS);
        }

        @Override
        public void registerRecv(IConnectionCallback cb) {
            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
        }

        @Override
        public void registerNewConnectionResponse(Supplier<Object> cb) {
            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
        }

        private void flushPending() {
            IConnectionCallback serverCb = this._server._cb;
            if (serverCb != null && !this._pendingDueToUnregisteredServer.isEmpty()) {
                ArrayList<TaskMessage> ret = new ArrayList<TaskMessage>();
                this._pendingDueToUnregisteredServer.drainTo(ret);
                serverCb.recv(ret);
            }
        }

        @Override
        public void send(Iterator<TaskMessage> msgs) {
            IConnectionCallback serverCb = this._server._cb;
            if (serverCb != null) {
                this.flushPending();
                ArrayList<TaskMessage> ret = new ArrayList<TaskMessage>();
                while (msgs.hasNext()) {
                    ret.add(msgs.next());
                }
                serverCb.recv(ret);
            } else {
                while (msgs.hasNext()) {
                    this._pendingDueToUnregisteredServer.add(msgs.next());
                }
            }
        }

        @Override
        public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
            return this._server.getLoad(tasks);
        }

        @Override
        public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
            this._server.sendLoadMetrics(taskToLoad);
        }

        @Override
        public void sendBackPressureStatus(BackPressureStatus bpStatus) {
            throw new RuntimeException("Local Client connection should not send BackPressure status");
        }

        @Override
        public int getPort() {
            return this._server.getPort();
        }

        @Override
        public void close() {
            this._pendingFlusher.shutdown();
            try {
                this._pendingFlusher.awaitTermination(5L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while awaiting flusher shutdown", e);
            }
        }
    }

    private static class LocalServer
    implements IConnection {
        final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap();
        final int port;
        volatile IConnectionCallback _cb;

        public LocalServer(int port) {
            this.port = port;
        }

        @Override
        public void registerRecv(IConnectionCallback cb) {
            this._cb = cb;
        }

        @Override
        public void registerNewConnectionResponse(Supplier<Object> cb) {
        }

        @Override
        public void send(Iterator<TaskMessage> msgs) {
            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
        }

        @Override
        public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
            HashMap<Integer, Load> ret = new HashMap<Integer, Load>();
            for (Integer task : tasks) {
                Double found = this._load.get(task);
                if (found == null) continue;
                ret.put(task, new Load(true, found, 0.0));
            }
            return ret;
        }

        @Override
        public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
            this._load.putAll(taskToLoad);
        }

        @Override
        public void sendBackPressureStatus(BackPressureStatus bpStatus) {
            throw new RuntimeException("Local Server connection should not send BackPressure status");
        }

        @Override
        public int getPort() {
            return this.port;
        }

        @Override
        public void close() {
        }
    }
}

