package com.alibaba.jstorm.daemon.worker;

import backtype.storm.Config;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.StreamInfo;
import backtype.storm.messaging.ControlMessage;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.IContext;
import backtype.storm.messaging.TransportFactory;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.serialization.KryoByteBufferSerializer;
import backtype.storm.serialization.KryoTupleDeserializer;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.ThriftTopologyUtils;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
import com.alibaba.jstorm.cluster.ClusterState;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.worker.UpdateListener;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormHealthReporter;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.LogUtils;
import com.alibaba.jstorm.zk.ZkTool;
import com.codahale.metrics.Gauge;
import com.esotericsoftware.kryo.Serializer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/daemon/worker/WorkerData.class */
public class WorkerData {
    private static Logger LOG = LoggerFactory.getLogger(WorkerData.class);
    private Map<Object, Object> conf;
    private Map<Object, Object> stormConf;
    private IContext context;
    private final String topologyId;
    private final String supervisorId;
    private final Integer port;
    private final String workerId;
    private ClusterState zkClusterstate;
    private StormClusterState zkCluster;
    private Set<Integer> taskids;
    private ConcurrentHashMap<WorkerSlot, IConnection> nodeportSocket;
    private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport;
    private ConcurrentSkipListSet<ResourceWorkerSlot> workerToResource;
    private volatile Set<Integer> outboundTasks;
    private Set<Integer> localTasks;
    private Set<Integer> localNodeTasks;
    private ConcurrentHashMap<Integer, DisruptorQueue> innerTaskTransfer;
    private ConcurrentHashMap<Integer, DisruptorQueue> controlQueues;
    private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
    private ConcurrentHashMap<Integer, String> tasksToComponent;
    private ConcurrentHashMap<String, List<Integer>> componentToSortedTasks;
    private Map<String, Object> defaultResources;
    private Map<String, Object> userResources;
    private Map<String, Object> executorData;
    private Map registeredMetrics;
    private StormTopology rawTopology;
    private StormTopology sysTopology;
    private ContextMaker contextMaker;
    private DisruptorQueue transferCtrlQueue;
    private DisruptorQueue sendingQueue;
    private List<TaskShutdownDameon> shutdownTasks;
    private ConcurrentHashMap<Integer, Boolean> outTaskStatus;
    public static final int THREAD_POOL_NUM = 4;
    private ScheduledExecutorService threadPool;
    private volatile Long assignmentTS;
    private volatile Assignment.AssignmentType assignmentType;
    private IConnection recvConnection;
    private JStormMetricsReporter metricReporter;
    private AsyncLoopThread healthReporterThread;
    private UpdateListener updateListener;
    private final AsyncLoopDefaultKill workHalt = new AsyncLoopDefaultKill();
    private AtomicReference<KryoTupleSerializer> atomKryoSerializer = new AtomicReference<>();
    private AtomicReference<KryoTupleDeserializer> atomKryoDeserializer = new AtomicReference<>();
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    private AtomicBoolean monitorEnable = new AtomicBoolean(true);
    private StatusType topologyStatus = StatusType.active;
    private AtomicBoolean workerOldStatus = new AtomicBoolean(false);
    private AtomicBoolean workeInitConnectionStatus = new AtomicBoolean(false);

    public WorkerData(Map map, IContext iContext, String str, String str2, int i, String str3, String str4) throws Exception {
        this.conf = map;
        this.context = iContext;
        this.topologyId = str;
        this.supervisorId = str2;
        this.port = Integer.valueOf(i);
        this.workerId = str3;
        if (StormConfig.cluster_mode(map).equals("distributed")) {
            JStormServerUtils.createPid(StormConfig.worker_pids_root(map, str3));
        }
        this.zkClusterstate = ZkTool.mk_distributed_cluster_state(map);
        this.zkCluster = Cluster.mk_storm_cluster_state(this.zkClusterstate);
        Map<? extends Object, ? extends Object> read_supervisor_topology_conf = StormConfig.read_supervisor_topology_conf(map, str);
        this.stormConf = new HashMap();
        this.stormConf.putAll(map);
        this.stormConf.putAll(read_supervisor_topology_conf);
        JStormDebugger.update(this.stormConf);
        registerUpdateListeners();
        JStormMetrics.setTopologyId(str);
        JStormMetrics.setPort(i);
        JStormMetrics.setDebug(ConfigExtension.isEnableMetricDebug(this.stormConf));
        JStormMetrics.enabled = ConfigExtension.isEnableMetrics(this.stormConf);
        JStormMetrics.enableStreamMetrics = ConfigExtension.isEnableStreamMetrics(this.stormConf);
        JStormMetrics.addDebugMetrics(ConfigExtension.getDebugMetricNames(this.stormConf));
        AsmMetric.setSampleRate(ConfigExtension.getMetricSampleRate(this.stormConf));
        ConfigExtension.setLocalSupervisorId(this.stormConf, this.supervisorId);
        ConfigExtension.setLocalWorkerId(this.stormConf, this.workerId);
        ConfigExtension.setLocalWorkerPort(this.stormConf, i);
        ControlMessage.setPort(i);
        JStormMetrics.registerWorkerTopologyMetric(JStormMetrics.workerMetricName(MetricDef.CPU_USED_RATIO, MetricType.GAUGE), new AsmGauge(new Gauge<Double>() { // from class: com.alibaba.jstorm.daemon.worker.WorkerData.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m1139getValue() {
                return JStormUtils.getCpuUsage();
            }
        }));
        JStormMetrics.registerWorkerTopologyMetric(JStormMetrics.workerMetricName(MetricDef.HEAP_MEMORY_USED, MetricType.GAUGE), new AsmGauge(new Gauge<Double>() { // from class: com.alibaba.jstorm.daemon.worker.WorkerData.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m1140getValue() {
                return Double.valueOf(JStormUtils.getJVMHeapMemory());
            }
        }));
        JStormMetrics.registerWorkerTopologyMetric(JStormMetrics.workerMetricName(MetricDef.MEMORY_USED, MetricType.GAUGE), new AsmGauge(new Gauge<Double>() { // from class: com.alibaba.jstorm.daemon.worker.WorkerData.3
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m1141getValue() {
                return JStormUtils.getMemUsage();
            }
        }));
        LOG.info("Worker Configuration " + this.stormConf);
        try {
            boolean isEnableTopologyClassLoader = ConfigExtension.isEnableTopologyClassLoader(this.stormConf);
            boolean isEnableClassloaderDebug = ConfigExtension.isEnableClassloaderDebug(this.stormConf);
            if (str4 == null && isEnableTopologyClassLoader && !map.get(Config.STORM_CLUSTER_MODE).equals("local")) {
                LOG.error("enable classloader, but not app jar");
                throw new InvalidParameterException();
            }
            URL[] urlArr = new URL[0];
            if (str4 != null) {
                String[] split = str4.split(":");
                HashSet hashSet = new HashSet();
                for (String str5 : split) {
                    if (!StringUtils.isBlank(str5)) {
                        hashSet.add(new URL("File:" + str5));
                    }
                }
                urlArr = (URL[]) hashSet.toArray(new URL[0]);
            }
            WorkerClassLoader.mkInstance(urlArr, ClassLoader.getSystemClassLoader(), ClassLoader.getSystemClassLoader().getParent(), isEnableTopologyClassLoader, isEnableClassloaderDebug);
            Config.registerSerialization(this.stormConf, "java.nio.HeapByteBuffer", (Class<? extends Serializer>) KryoByteBufferSerializer.class);
            if (this.context == null) {
                this.context = TransportFactory.makeContext(this.stormConf);
            }
            boolean isDisruptorUseSleep = ConfigExtension.isDisruptorUseSleep(this.stormConf);
            DisruptorQueue.setUseSleep(isDisruptorUseSleep);
            boolean topologyBufferSizeLimited = ConfigExtension.getTopologyBufferSizeLimited(this.stormConf);
            DisruptorQueue.setLimited(topologyBufferSizeLimited);
            LOG.info("Disruptor use sleep:" + isDisruptorUseSleep + ", limited size:" + topologyBufferSizeLimited);
            int intValue = JStormUtils.parseInt(this.stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256).intValue();
            WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(this.stormConf);
            this.transferCtrlQueue = DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI, intValue, waitStrategy);
            JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.SEND_QUEUE, MetricType.GAUGE), new AsmGauge(new QueueGauge(this.transferCtrlQueue, MetricDef.SEND_QUEUE)));
            this.sendingQueue = DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI, Utils.getInt(this.stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE)).intValue(), waitStrategy);
            this.nodeportSocket = new ConcurrentHashMap<>();
            this.taskNodeport = new ConcurrentHashMap<>();
            this.workerToResource = new ConcurrentSkipListSet<>();
            this.innerTaskTransfer = new ConcurrentHashMap<>();
            this.controlQueues = new ConcurrentHashMap<>();
            this.deserializeQueues = new ConcurrentHashMap<>();
            this.tasksToComponent = new ConcurrentHashMap<>();
            this.componentToSortedTasks = new ConcurrentHashMap<>();
            Assignment assignment_info = this.zkCluster.assignment_info(this.topologyId, null);
            if (assignment_info == null) {
                String str6 = "Failed to get Assignment of " + this.topologyId;
                LOG.error(str6);
                throw new RuntimeException(str6);
            }
            this.workerToResource.addAll(assignment_info.getWorkers());
            this.taskids = assignment_info.getCurrentWorkerTasks(this.supervisorId, i);
            if (this.taskids.size() == 0) {
                throw new RuntimeException("No tasks running current workers");
            }
            LOG.info("Current worker taskList:" + this.taskids);
            this.rawTopology = StormConfig.read_supervisor_topology_code(map, str);
            this.sysTopology = Common.system_topology(this.stormConf, this.rawTopology);
            generateMaps();
            this.contextMaker = new ContextMaker(this);
            this.outTaskStatus = new ConcurrentHashMap<>();
            this.threadPool = Executors.newScheduledThreadPool(4);
            TimerTrigger.setScheduledExecutorService(this.threadPool);
            if (!StormConfig.local_mode(this.stormConf)) {
                this.healthReporterThread = new AsyncLoopThread(new JStormHealthReporter(this));
            }
            try {
                Long valueOf = Long.valueOf(StormConfig.read_supervisor_topology_timestamp(map, str));
                this.assignmentTS = Long.valueOf(valueOf == null ? System.currentTimeMillis() : valueOf.longValue());
            } catch (FileNotFoundException e) {
                this.assignmentTS = Long.valueOf(System.currentTimeMillis());
            }
            this.outboundTasks = new HashSet();
            updateKryoSerializer();
            LOG.info("Successfully create WorkerData");
        } catch (Exception e2) {
            LOG.error("init jarClassLoader error!", e2);
            throw new InvalidParameterException();
        }
    }

    private void registerUpdateListeners() {
        this.updateListener = new UpdateListener();
        this.updateListener.registerUpdater(new UpdateListener.IUpdater() { // from class: com.alibaba.jstorm.daemon.worker.WorkerData.4
            @Override // com.alibaba.jstorm.daemon.worker.UpdateListener.IUpdater
            public void update(Map map) {
                WorkerData.this.metricReporter.updateMetricConfig(map);
            }
        });
        this.updateListener.registerUpdater(new UpdateListener.IUpdater() { // from class: com.alibaba.jstorm.daemon.worker.WorkerData.5
            @Override // com.alibaba.jstorm.daemon.worker.UpdateListener.IUpdater
            public void update(Map map) {
                JStormDebugger.update(map);
            }
        });
        this.updateListener.registerUpdater(new UpdateListener.IUpdater() { // from class: com.alibaba.jstorm.daemon.worker.WorkerData.6
            @Override // com.alibaba.jstorm.daemon.worker.UpdateListener.IUpdater
            public void update(Map map) {
                LogUtils.update(map);
            }
        });
    }

    public UpdateListener getUpdateListener() {
        return this.updateListener;
    }

    public void updateKryoSerializer() {
        GeneralTopologyContext generalTopologyContext = new GeneralTopologyContext(this.sysTopology, this.stormConf, this.tasksToComponent, this.componentToSortedTasks, generatecomponentToStreamToFields(this.sysTopology), this.topologyId);
        KryoTupleDeserializer kryoTupleDeserializer = new KryoTupleDeserializer(this.stormConf, generalTopologyContext);
        KryoTupleSerializer kryoTupleSerializer = new KryoTupleSerializer(this.stormConf, generalTopologyContext);
        this.atomKryoDeserializer.getAndSet(kryoTupleDeserializer);
        this.atomKryoSerializer.getAndSet(kryoTupleSerializer);
    }

    private void generateMaps() throws Exception {
        updateTaskComponentMap();
        this.defaultResources = new HashMap();
        this.userResources = new HashMap();
        this.executorData = new HashMap();
        this.registeredMetrics = new HashMap();
    }

    public Map<Object, Object> getRawConf() {
        return this.conf;
    }

    public AtomicBoolean getShutdown() {
        return this.shutdown;
    }

    public StatusType getTopologyStatus() {
        return this.topologyStatus;
    }

    public void setTopologyStatus(StatusType statusType) {
        this.topologyStatus = statusType;
    }

    public Map<Object, Object> getConf() {
        return this.stormConf;
    }

    public Map<Object, Object> getStormConf() {
        return this.stormConf;
    }

    public IContext getContext() {
        return this.context;
    }

    public String getTopologyId() {
        return this.topologyId;
    }

    public String getSupervisorId() {
        return this.supervisorId;
    }

    public Integer getPort() {
        return this.port;
    }

    public String getWorkerId() {
        return this.workerId;
    }

    public ClusterState getZkClusterstate() {
        return this.zkClusterstate;
    }

    public StormClusterState getZkCluster() {
        return this.zkCluster;
    }

    public Set<Integer> getTaskids() {
        return this.taskids;
    }

    public ConcurrentHashMap<WorkerSlot, IConnection> getNodeportSocket() {
        return this.nodeportSocket;
    }

    public ConcurrentHashMap<Integer, WorkerSlot> getTaskNodeport() {
        return this.taskNodeport;
    }

    public ConcurrentSkipListSet<ResourceWorkerSlot> getWorkerToResource() {
        ConcurrentSkipListSet<ResourceWorkerSlot> concurrentSkipListSet;
        synchronized (this.workerToResource) {
            concurrentSkipListSet = this.workerToResource;
        }
        return concurrentSkipListSet;
    }

    public void updateWorkerToResource(Set<ResourceWorkerSlot> set) {
        synchronized (this.workerToResource) {
            ConcurrentSkipListSet<ResourceWorkerSlot> clone = this.workerToResource.clone();
            clone.removeAll(set);
            if (clone.size() > 0) {
                this.workerToResource.removeAll(set);
            }
            this.workerToResource.addAll(set);
        }
    }

    public ConcurrentHashMap<Integer, DisruptorQueue> getInnerTaskTransfer() {
        return this.innerTaskTransfer;
    }

    public ConcurrentHashMap<Integer, DisruptorQueue> getDeserializeQueues() {
        return this.deserializeQueues;
    }

    public ConcurrentHashMap<Integer, DisruptorQueue> getControlQueues() {
        return this.controlQueues;
    }

    public ConcurrentHashMap<Integer, String> getTasksToComponent() {
        return this.tasksToComponent;
    }

    public StormTopology getRawTopology() {
        return this.rawTopology;
    }

    public StormTopology getSysTopology() {
        return this.sysTopology;
    }

    public ContextMaker getContextMaker() {
        return this.contextMaker;
    }

    public AsyncLoopDefaultKill getWorkHalt() {
        return this.workHalt;
    }

    public DisruptorQueue getTransferCtrlQueue() {
        return this.transferCtrlQueue;
    }

    public DisruptorQueue getSendingQueue() {
        return this.sendingQueue;
    }

    public Map<String, List<Integer>> getComponentToSortedTasks() {
        return this.componentToSortedTasks;
    }

    public Map<String, Object> getDefaultResources() {
        return this.defaultResources;
    }

    public Map<String, Object> getUserResources() {
        return this.userResources;
    }

    public Map<String, Object> getExecutorData() {
        return this.executorData;
    }

    public Map getRegisteredMetrics() {
        return this.registeredMetrics;
    }

    public List<TaskShutdownDameon> getShutdownTasks() {
        return this.shutdownTasks;
    }

    public void setShutdownTasks(List<TaskShutdownDameon> list) {
        this.shutdownTasks = list;
    }

    public void addShutdownTask(TaskShutdownDameon taskShutdownDameon) {
        this.shutdownTasks.add(taskShutdownDameon);
    }

    public List<TaskShutdownDameon> getShutdownDaemonbyTaskIds(Set<Integer> set) {
        ArrayList arrayList = new ArrayList();
        for (TaskShutdownDameon taskShutdownDameon : this.shutdownTasks) {
            if (set.contains(Integer.valueOf(taskShutdownDameon.getTaskId()))) {
                arrayList.add(taskShutdownDameon);
            }
        }
        return arrayList;
    }

    public AtomicBoolean getWorkerOldStatus() {
        return this.workerOldStatus;
    }

    public AtomicBoolean getWorkeInitConnectionStatus() {
        return this.workeInitConnectionStatus;
    }

    public Set<Integer> getLocalTasks() {
        return this.localTasks;
    }

    public void setLocalTasks(Set<Integer> set) {
        this.localTasks = set;
    }

    public void initOutboundTaskStatus(Set<Integer> set) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            this.outTaskStatus.put(it.next(), false);
        }
    }

    public Map<Integer, Boolean> getOutboundTaskStatus() {
        return this.outTaskStatus;
    }

    public void addOutboundTaskStatusIfAbsent(Integer num) {
        this.outTaskStatus.putIfAbsent(num, false);
    }

    public void removeOutboundTaskStatus(Integer num) {
        this.outTaskStatus.remove(num);
    }

    public void updateOutboundTaskStatus(Integer num, boolean z) {
        this.outTaskStatus.put(num, Boolean.valueOf(z));
    }

    public boolean isOutboundTaskActive(Integer num) {
        if (this.outTaskStatus.get(num) != null) {
            return this.outTaskStatus.get(num).booleanValue();
        }
        return false;
    }

    public ScheduledExecutorService getThreadPool() {
        return this.threadPool;
    }

    public void setAssignmentTs(Long l) {
        this.assignmentTS = l;
    }

    public Long getAssignmentTs() {
        return this.assignmentTS;
    }

    public void setAssignmentType(Assignment.AssignmentType assignmentType) {
        this.assignmentType = assignmentType;
    }

    public Assignment.AssignmentType getAssignmentType() {
        return this.assignmentType;
    }

    public void updateWorkerData(Assignment assignment) throws Exception {
        updateTaskIds(assignment);
        updateTaskComponentMap();
        updateStormTopology();
    }

    public void updateTaskIds(Assignment assignment) {
        this.taskids.clear();
        this.taskids.addAll(assignment.getCurrentWorkerTasks(this.supervisorId, this.port.intValue()));
    }

    public Set<Integer> getLocalNodeTasks() {
        return this.localNodeTasks;
    }

    public void setLocalNodeTasks(Set<Integer> set) {
        this.localNodeTasks = set;
    }

    public void setOutboundTasks(Set<Integer> set) {
        this.outboundTasks = set;
    }

    public Set<Integer> getOutboundTasks() {
        return this.outboundTasks;
    }

    private void updateTaskComponentMap() throws Exception {
        Map<Integer, String> taskToComponent = Common.getTaskToComponent(Cluster.get_all_taskInfo(this.zkCluster, this.topologyId));
        this.tasksToComponent.putAll(taskToComponent);
        LOG.info("Updated tasksToComponentMap:" + this.tasksToComponent);
        this.componentToSortedTasks.putAll(JStormUtils.reverse_map(taskToComponent));
        Iterator<Map.Entry<String, List<Integer>>> it = this.componentToSortedTasks.entrySet().iterator();
        while (it.hasNext()) {
            Collections.sort(it.next().getValue());
        }
    }

    private void updateStormTopology() {
        try {
            StormTopology read_supervisor_topology_code = StormConfig.read_supervisor_topology_code(this.conf, this.topologyId);
            StormTopology system_topology = Common.system_topology(this.stormConf, this.rawTopology);
            updateTopology(this.rawTopology, read_supervisor_topology_code);
            updateTopology(this.sysTopology, system_topology);
        } catch (InvalidTopologyException e) {
            LOG.error("Failed to update sysTopology for " + this.topologyId, e);
        } catch (IOException e2) {
            LOG.error("Failed to read supervisor topology code for " + this.topologyId, e2);
        }
    }

    private void updateTopology(StormTopology stormTopology, StormTopology stormTopology2) {
        stormTopology.set_bolts(stormTopology2.get_bolts());
        stormTopology.set_spouts(stormTopology2.get_spouts());
        stormTopology.set_state_spouts(stormTopology2.get_state_spouts());
    }

    public AtomicBoolean getMonitorEnable() {
        return this.monitorEnable;
    }

    public IConnection getRecvConnection() {
        return this.recvConnection;
    }

    public void setRecvConnection(IConnection iConnection) {
        this.recvConnection = iConnection;
    }

    public JStormMetricsReporter getMetricsReporter() {
        return this.metricReporter;
    }

    public void setMetricsReporter(JStormMetricsReporter jStormMetricsReporter) {
        this.metricReporter = jStormMetricsReporter;
    }

    public HashMap<String, Map<String, Fields>> generatecomponentToStreamToFields(StormTopology stormTopology) {
        HashMap<String, Map<String, Fields>> hashMap = new HashMap<>();
        for (String str : ThriftTopologyUtils.getComponentIds(stormTopology)) {
            HashMap hashMap2 = new HashMap();
            for (Map.Entry<String, StreamInfo> entry : ThriftTopologyUtils.getComponentCommon(stormTopology, str).get_streams().entrySet()) {
                hashMap2.put(entry.getKey(), new Fields(entry.getValue().get_output_fields()));
            }
            hashMap.put(str, hashMap2);
        }
        return hashMap;
    }

    public AtomicReference<KryoTupleDeserializer> getAtomKryoDeserializer() {
        return this.atomKryoDeserializer;
    }

    public AtomicReference<KryoTupleSerializer> getAtomKryoSerializer() {
        return this.atomKryoSerializer;
    }
}
