/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.drpc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.drpc.DRPCInvocationsClient;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.DistributedRPCInvocations;
import org.apache.storm.shade.org.json.simple.JSONValue;
import org.apache.storm.shade.org.json.simple.parser.ParseException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.thrift.TException;
import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServiceRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReturnResults
extends BaseRichBolt {
    public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
    static final long serialVersionUID = -774882142710631591L;
    OutputCollector _collector;
    boolean local;
    Map<String, Object> _conf;
    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this._conf = topoConf;
        this._collector = collector;
        this.local = topoConf.get("storm.cluster.mode").equals("local");
    }

    @Override
    public void execute(Tuple input) {
        String result2 = (String)input.getValue(0);
        String returnInfo = (String)input.getValue(1);
        if (returnInfo != null) {
            DistributedRPCInvocations.Iface client;
            Map retMap;
            try {
                retMap = (Map)JSONValue.parseWithException((String)returnInfo);
            }
            catch (ParseException e) {
                LOG.error("Parseing returnInfo failed", (Throwable)e);
                this._collector.fail(input);
                return;
            }
            final String host = (String)retMap.get("host");
            final int port = ObjectReader.getInt(retMap.get("port"));
            String id = (String)retMap.get("id");
            if (this.local) {
                client = (DistributedRPCInvocations.Iface)ServiceRegistry.getService(host);
            } else {
                ArrayList server = new ArrayList(){
                    {
                        this.add(host);
                        this.add(port);
                    }
                };
                if (!this._clients.containsKey(server)) {
                    try {
                        this._clients.put(server, new DRPCInvocationsClient(this._conf, host, port));
                    }
                    catch (TTransportException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                client = this._clients.get(server);
            }
            int retryCnt = 0;
            int maxRetries = 3;
            while (retryCnt < maxRetries) {
                ++retryCnt;
                try {
                    client.result(id, result2);
                    this._collector.ack(input);
                    break;
                }
                catch (AuthorizationException aze) {
                    LOG.error("Not authorized to return results to DRPC server", (Throwable)((Object)aze));
                    this._collector.fail(input);
                    throw new RuntimeException((Throwable)((Object)aze));
                }
                catch (TException tex) {
                    if (retryCnt >= maxRetries) {
                        LOG.error("Failed to return results to DRPC server", (Throwable)tex);
                        this._collector.fail(input);
                    }
                    this.reconnectClient((DRPCInvocationsClient)client);
                }
            }
        }
    }

    private void reconnectClient(DRPCInvocationsClient client) {
        if (client instanceof DRPCInvocationsClient) {
            try {
                LOG.info("reconnecting... ");
                client.reconnectClient();
            }
            catch (TException e2) {
                LOG.error("Failed to connect to DRPC server", (Throwable)e2);
            }
        }
    }

    @Override
    public void cleanup() {
        for (DRPCInvocationsClient c : this._clients.values()) {
            c.close();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

