/*
 * Decompiled with CFR 0.152.
 */
package backtype.storm.drpc;

import backtype.storm.drpc.DRPCInvocationsClient;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.DistributedRPCInvocations;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.ServiceRegistry;
import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReturnResults
extends BaseRichBolt {
    static final long serialVersionUID = -774882142710631591L;
    public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
    OutputCollector _collector;
    boolean local;
    Map _conf;
    Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this._conf = stormConf;
        this._collector = collector;
        this.local = stormConf.get("storm.cluster.mode").equals("local");
    }

    @Override
    public void execute(Tuple input) {
        block13: {
            String result2 = (String)input.getValue(0);
            String returnInfo = (String)input.getValue(1);
            if (returnInfo != null) {
                DistributedRPCInvocations.Iface client;
                Map retMap = (Map)JSONValue.parse((String)returnInfo);
                final String host = (String)retMap.get("host");
                final int port = Utils.getInt(retMap.get("port"));
                String hostPort = host + ":" + port;
                String id = (String)retMap.get("id");
                if (this.local) {
                    client = (DistributedRPCInvocations.Iface)ServiceRegistry.getService(host);
                } else {
                    ArrayList server = new ArrayList(){
                        {
                            this.add(host);
                            this.add(port);
                        }
                    };
                    if (!this._clients.containsKey(server)) {
                        try {
                            this._clients.put(server, new DRPCInvocationsClient(this._conf, host, port));
                        }
                        catch (TTransportException ex) {
                            throw new RuntimeException(ex);
                        }
                    }
                    client = this._clients.get(server);
                }
                try {
                    client.result(id, result2);
                    this._collector.ack(input);
                }
                catch (AuthorizationException aze) {
                    LOG.error("Not authorized to return results to DRPC server " + hostPort, (Throwable)((Object)aze));
                    this._collector.fail(input);
                    if (!(client instanceof DRPCInvocationsClient)) break block13;
                    try {
                        LOG.info("reconnecting... ");
                        ((DRPCInvocationsClient)client).reconnectClient();
                    }
                    catch (TException e2) {
                        throw new RuntimeException(e2);
                    }
                }
                catch (TException e) {
                    LOG.error("Failed to return results to DRPC server " + hostPort, (Throwable)e);
                    this._collector.fail(input);
                    if (!(client instanceof DRPCInvocationsClient)) break block13;
                    try {
                        LOG.info("reconnecting... ");
                        ((DRPCInvocationsClient)client).reconnectClient();
                    }
                    catch (TException e2) {
                        throw new RuntimeException(e2);
                    }
                }
            }
        }
    }

    @Override
    public void cleanup() {
        for (DRPCInvocationsClient c : this._clients.values()) {
            c.close();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

