package com.alibaba.jstorm.drpc;

import backtype.storm.Config;
import backtype.storm.daemon.Shutdownable;
import backtype.storm.generated.DRPCExecutionException;
import backtype.storm.generated.DRPCRequest;
import backtype.storm.generated.DistributedRPC;
import backtype.storm.generated.DistributedRPCInvocations;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.utils.DefaultUncaughtExceptionHandler;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/drpc/Drpc.class */
public class Drpc implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
    private static final Logger LOG = LoggerFactory.getLogger(Drpc.class);
    private Map conf;
    private THsHaServer handlerServer;
    private THsHaServer invokeServer;
    private AsyncLoopThread clearThread;
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    private AtomicInteger ctr = new AtomicInteger(0);
    private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, String> idtoFunction = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, DRPCRequest> idtoRequest = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<>();

    public static void main(String[] strArr) throws Exception {
        LOG.info("Begin to start Drpc server");
        Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler());
        new Drpc().init();
    }

    private THsHaServer initHandlerServer(Map map, Drpc drpc) throws Exception {
        int intValue = JStormUtils.parseInt(map.get(Config.DRPC_PORT)).intValue();
        int intValue2 = JStormUtils.parseInt(map.get(Config.DRPC_WORKER_THREADS)).intValue();
        int intValue3 = JStormUtils.parseInt(map.get(Config.DRPC_QUEUE_SIZE)).intValue();
        LOG.info("Begin to init Handler Server " + intValue);
        THsHaServer.Args args = new THsHaServer.Args(new TNonblockingServerSocket(intValue));
        args.workerThreads(64);
        args.protocolFactory(new TBinaryProtocol.Factory());
        args.processor(new DistributedRPC.Processor(drpc));
        args.executorService(new ThreadPoolExecutor(intValue2, intValue2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(intValue3)));
        THsHaServer tHsHaServer = new THsHaServer(args);
        LOG.info("Successfully init Handler Server " + intValue);
        return tHsHaServer;
    }

    private THsHaServer initInvokeServer(Map map, Drpc drpc) throws Exception {
        int intValue = JStormUtils.parseInt(map.get(Config.DRPC_INVOCATIONS_PORT)).intValue();
        LOG.info("Begin to init Invoke Server " + intValue);
        THsHaServer.Args args = new THsHaServer.Args(new TNonblockingServerSocket(intValue));
        args.workerThreads(64);
        args.protocolFactory(new TBinaryProtocol.Factory());
        args.processor(new DistributedRPCInvocations.Processor(drpc));
        THsHaServer tHsHaServer = new THsHaServer(args);
        LOG.info("Successfully init Invoke Server " + intValue);
        return tHsHaServer;
    }

    private void initThrift() throws Exception {
        this.handlerServer = initHandlerServer(this.conf, this);
        this.invokeServer = initInvokeServer(this.conf, this);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.alibaba.jstorm.drpc.Drpc.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Drpc.this.shutdown();
                Drpc.this.handlerServer.stop();
                Drpc.this.invokeServer.stop();
            }
        });
        LOG.info("Starting Distributed RPC servers...");
        new Thread(new Runnable() { // from class: com.alibaba.jstorm.drpc.Drpc.2
            @Override // java.lang.Runnable
            public void run() {
                Drpc.this.invokeServer.serve();
            }
        }).start();
        this.handlerServer.serve();
    }

    private void initClearThread() {
        this.clearThread = new AsyncLoopThread(new ClearThread(this));
        LOG.info("Successfully start clear thread");
    }

    private void createPid(Map map) throws Exception {
        JStormServerUtils.createPid(StormConfig.drpcPids(map));
    }

    public void init() throws Exception {
        this.conf = StormConfig.read_storm_config();
        LOG.info("Configuration is \n" + this.conf);
        createPid(this.conf);
        initClearThread();
        initThrift();
    }

    @Override // backtype.storm.daemon.Shutdownable
    public void shutdown() {
        if (this.shutdown.getAndSet(true)) {
            LOG.info("Notify to quit drpc");
            return;
        }
        LOG.info("Begin to shutdown drpc");
        AsyncLoopRunnable.getShutdown().set(true);
        this.clearThread.interrupt();
        try {
            this.clearThread.join();
        } catch (InterruptedException e) {
        }
        LOG.info("Successfully cleanup clear thread");
        this.invokeServer.stop();
        LOG.info("Successfully stop invokeServer");
        this.handlerServer.stop();
        LOG.info("Successfully stop handlerServer");
    }

    public void cleanup(String str) {
        LOG.info("clean id " + str + " @ " + System.currentTimeMillis());
        this.idtoSem.remove(str);
        this.idtoResult.remove(str);
        this.idtoStart.remove(str);
        this.idtoFunction.remove(str);
        this.idtoRequest.remove(str);
    }

    @Override // backtype.storm.generated.DistributedRPC.Iface
    public String execute(String str, String str2) throws DRPCExecutionException, TException {
        LOG.info("Received DRPC request for " + str + " " + str2 + " at " + System.currentTimeMillis());
        int incrementAndGet = this.ctr.incrementAndGet();
        int i = incrementAndGet % 1000000000;
        if (incrementAndGet != i) {
            this.ctr.compareAndSet(incrementAndGet, i);
        }
        String valueOf = String.valueOf(i);
        Semaphore semaphore = new Semaphore(0);
        DRPCRequest dRPCRequest = new DRPCRequest(str2, valueOf);
        this.idtoStart.put(valueOf, Integer.valueOf(TimeUtils.current_time_secs()));
        this.idtoSem.put(valueOf, semaphore);
        this.idtoFunction.put(valueOf, str);
        this.idtoRequest.put(valueOf, dRPCRequest);
        acquireQueue(str).add(dRPCRequest);
        LOG.info("Waiting for DRPC request for " + str + " " + str2 + " at " + System.currentTimeMillis());
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            LOG.error("acquire fail ", e);
        }
        LOG.info("Acquired for DRPC request for " + str + " " + str2 + " at " + System.currentTimeMillis());
        Object obj = this.idtoResult.get(valueOf);
        if (!this.idtoResult.containsKey(valueOf)) {
            obj = new DRPCExecutionException("Request timed out");
        }
        LOG.info("Returning for DRPC request for " + str + " " + str2 + " at " + System.currentTimeMillis());
        cleanup(valueOf);
        if (obj instanceof DRPCExecutionException) {
            throw ((DRPCExecutionException) obj);
        }
        return String.valueOf(obj);
    }

    @Override // backtype.storm.generated.DistributedRPCInvocations.Iface
    public void result(String str, String str2) throws TException {
        Semaphore semaphore = this.idtoSem.get(str);
        LOG.info("Received result " + str2 + " for id " + str + " at " + System.currentTimeMillis());
        if (semaphore != null) {
            this.idtoResult.put(str, str2);
            semaphore.release();
        }
    }

    @Override // backtype.storm.generated.DistributedRPCInvocations.Iface
    public DRPCRequest fetchRequest(String str) throws TException {
        DRPCRequest poll = acquireQueue(str).poll();
        if (poll == null) {
            return new DRPCRequest("", "");
        }
        LOG.info("Fetched request for " + str + " at " + System.currentTimeMillis());
        return poll;
    }

    @Override // backtype.storm.generated.DistributedRPCInvocations.Iface
    public void failRequest(String str) throws TException {
        Semaphore semaphore = this.idtoSem.get(str);
        LOG.info("failRequest result  for id " + str + " at " + System.currentTimeMillis());
        if (semaphore != null) {
            this.idtoResult.put(str, new DRPCExecutionException("Request failed"));
            semaphore.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String str) {
        ConcurrentLinkedQueue<DRPCRequest> concurrentLinkedQueue = this.requestQueues.get(str);
        if (concurrentLinkedQueue == null) {
            concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
            this.requestQueues.put(str, concurrentLinkedQueue);
        }
        return concurrentLinkedQueue;
    }

    public ConcurrentHashMap<String, Semaphore> getIdtoSem() {
        return this.idtoSem;
    }

    public ConcurrentHashMap<String, Object> getIdtoResult() {
        return this.idtoResult;
    }

    public ConcurrentHashMap<String, Integer> getIdtoStart() {
        return this.idtoStart;
    }

    public ConcurrentHashMap<String, String> getIdtoFunction() {
        return this.idtoFunction;
    }

    public ConcurrentHashMap<String, DRPCRequest> getIdtoRequest() {
        return this.idtoRequest;
    }

    public AtomicBoolean isShutdown() {
        return this.shutdown;
    }

    public Map getConf() {
        return this.conf;
    }
}
