/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.drpc;

import backtype.storm.daemon.Shutdownable;
import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.generated.DRPCRequest;
import backtype.storm.generated.DistributedRPC;
import backtype.storm.generated.DistributedRPCInvocations;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.drpc.ClearThread;
import com.alibaba.jstorm.utils.DefaultUncaughtExceptionHandler;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Drpc
implements DistributedRPC.Iface,
DistributedRPCInvocations.Iface,
Shutdownable {
    private static final Logger LOG = LoggerFactory.getLogger(Drpc.class);
    private Map conf;
    private THsHaServer handlerServer;
    private THsHaServer invokeServer;
    private AsyncLoopThread clearThread;
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    private AtomicInteger ctr = new AtomicInteger(0);
    private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap();
    private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap();
    private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap();
    private ConcurrentHashMap<String, String> idtoFunction = new ConcurrentHashMap();
    private ConcurrentHashMap<String, DRPCRequest> idtoRequest = new ConcurrentHashMap();
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap();

    public static void main(String[] args) throws Exception {
        LOG.info("Begin to start Drpc server");
        Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
        Drpc service = new Drpc();
        service.init();
    }

    private THsHaServer initHandlerServer(Map conf, Drpc service) throws Exception {
        int port = JStormUtils.parseInt(conf.get("drpc.port"));
        int workerThreadNum = JStormUtils.parseInt(conf.get("drpc.worker.threads"));
        int queueSize = JStormUtils.parseInt(conf.get("drpc.queue.size"));
        LOG.info("Begin to init Handler Server " + port);
        TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
        THsHaServer.Args targs = new THsHaServer.Args((TNonblockingServerTransport)socket);
        targs.workerThreads(64);
        targs.protocolFactory((TProtocolFactory)new TBinaryProtocol.Factory());
        targs.processor(new DistributedRPC.Processor<Drpc>(service));
        ThreadPoolExecutor executor = new ThreadPoolExecutor(workerThreadNum, workerThreadNum, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
        targs.executorService((ExecutorService)executor);
        THsHaServer handlerServer = new THsHaServer(targs);
        LOG.info("Successfully init Handler Server " + port);
        return handlerServer;
    }

    private THsHaServer initInvokeServer(Map conf, Drpc service) throws Exception {
        int port = JStormUtils.parseInt(conf.get("drpc.invocations.port"));
        LOG.info("Begin to init Invoke Server " + port);
        TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
        THsHaServer.Args targsInvoke = new THsHaServer.Args((TNonblockingServerTransport)socket);
        targsInvoke.workerThreads(64);
        targsInvoke.protocolFactory((TProtocolFactory)new TBinaryProtocol.Factory());
        targsInvoke.processor(new DistributedRPCInvocations.Processor<Drpc>(service));
        THsHaServer invokeServer = new THsHaServer(targsInvoke);
        LOG.info("Successfully init Invoke Server " + port);
        return invokeServer;
    }

    private void initThrift() throws Exception {
        this.handlerServer = this.initHandlerServer(this.conf, this);
        this.invokeServer = this.initInvokeServer(this.conf, this);
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                Drpc.this.shutdown();
                Drpc.this.handlerServer.stop();
                Drpc.this.invokeServer.stop();
            }
        });
        LOG.info("Starting Distributed RPC servers...");
        new Thread(new Runnable(){

            @Override
            public void run() {
                Drpc.this.invokeServer.serve();
            }
        }).start();
        this.handlerServer.serve();
    }

    private void initClearThread() {
        this.clearThread = new AsyncLoopThread(new ClearThread(this));
        LOG.info("Successfully start clear thread");
    }

    private void createPid(Map conf) throws Exception {
        String pidDir = StormConfig.drpcPids(conf);
        JStormServerUtils.createPid(pidDir);
    }

    public void init() throws Exception {
        this.conf = StormConfig.read_storm_config();
        LOG.info("Configuration is \n" + this.conf);
        this.createPid(this.conf);
        this.initClearThread();
        this.initThrift();
    }

    @Override
    public void shutdown() {
        if (this.shutdown.getAndSet(true)) {
            LOG.info("Notify to quit drpc");
            return;
        }
        LOG.info("Begin to shutdown drpc");
        AsyncLoopRunnable.getShutdown().set(true);
        this.clearThread.interrupt();
        try {
            this.clearThread.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOG.info("Successfully cleanup clear thread");
        this.invokeServer.stop();
        LOG.info("Successfully stop invokeServer");
        this.handlerServer.stop();
        LOG.info("Successfully stop handlerServer");
    }

    public void cleanup(String id) {
        LOG.info("clean id " + id + " @ " + System.currentTimeMillis());
        this.idtoSem.remove(id);
        this.idtoResult.remove(id);
        this.idtoStart.remove(id);
        this.idtoFunction.remove(id);
        this.idtoRequest.remove(id);
    }

    @Override
    public String execute(String function, String args) throws DRPCExecutionException, TException {
        LOG.info("Received DRPC request for " + function + " " + args + " at " + System.currentTimeMillis());
        int idinc = this.ctr.incrementAndGet();
        int maxvalue = 1000000000;
        int newid = idinc % maxvalue;
        if (idinc != newid) {
            this.ctr.compareAndSet(idinc, newid);
        }
        String strid = String.valueOf(newid);
        Semaphore sem = new Semaphore(0);
        DRPCRequest req = new DRPCRequest(args, strid);
        this.idtoStart.put(strid, TimeUtils.current_time_secs());
        this.idtoSem.put(strid, sem);
        this.idtoFunction.put(strid, function);
        this.idtoRequest.put(strid, req);
        ConcurrentLinkedQueue<DRPCRequest> queue = this.acquireQueue(function);
        queue.add(req);
        LOG.info("Waiting for DRPC request for " + function + " " + args + " at " + System.currentTimeMillis());
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            LOG.error("acquire fail ", (Throwable)e);
        }
        LOG.info("Acquired for DRPC request for " + function + " " + args + " at " + System.currentTimeMillis());
        Object result2 = this.idtoResult.get(strid);
        if (!this.idtoResult.containsKey(strid)) {
            result2 = new DRPCExecutionException("Request timed out");
        }
        LOG.info("Returning for DRPC request for " + function + " " + args + " at " + System.currentTimeMillis());
        this.cleanup(strid);
        if (result2 instanceof DRPCExecutionException) {
            throw (DRPCExecutionException)result2;
        }
        return String.valueOf(result2);
    }

    @Override
    public void result(String id, String result2) throws TException {
        Semaphore sem = this.idtoSem.get(id);
        LOG.info("Received result " + result2 + " for id " + id + " at " + System.currentTimeMillis());
        if (sem != null) {
            this.idtoResult.put(id, result2);
            sem.release();
        }
    }

    @Override
    public DRPCRequest fetchRequest(String functionName) throws TException {
        ConcurrentLinkedQueue<DRPCRequest> queue = this.acquireQueue(functionName);
        DRPCRequest req = queue.poll();
        if (req != null) {
            LOG.info("Fetched request for " + functionName + " at " + System.currentTimeMillis());
            return req;
        }
        return new DRPCRequest("", "");
    }

    @Override
    public void failRequest(String id) throws TException {
        Semaphore sem = this.idtoSem.get(id);
        LOG.info("failRequest result  for id " + id + " at " + System.currentTimeMillis());
        if (sem != null) {
            this.idtoResult.put(id, new DRPCExecutionException("Request failed"));
            sem.release();
        }
    }

    protected ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
        ConcurrentLinkedQueue<DRPCRequest> reqQueue = this.requestQueues.get(function);
        if (reqQueue == null) {
            reqQueue = new ConcurrentLinkedQueue();
            this.requestQueues.put(function, reqQueue);
        }
        return reqQueue;
    }

    public ConcurrentHashMap<String, Semaphore> getIdtoSem() {
        return this.idtoSem;
    }

    public ConcurrentHashMap<String, Object> getIdtoResult() {
        return this.idtoResult;
    }

    public ConcurrentHashMap<String, Integer> getIdtoStart() {
        return this.idtoStart;
    }

    public ConcurrentHashMap<String, String> getIdtoFunction() {
        return this.idtoFunction;
    }

    public ConcurrentHashMap<String, DRPCRequest> getIdtoRequest() {
        return this.idtoRequest;
    }

    public AtomicBoolean isShutdown() {
        return this.shutdown;
    }

    public Map getConf() {
        return this.conf;
    }
}

