package org.apache.eagle.jobrunning.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.eagle.job.JobFilterByPartition;
import org.apache.eagle.job.JobPartitioner;
import org.apache.eagle.jobrunning.callback.DefaultRunningJobInputStreamCallback;
import org.apache.eagle.jobrunning.callback.RunningJobCallback;
import org.apache.eagle.jobrunning.callback.RunningJobMessageId;
import org.apache.eagle.jobrunning.common.JobConstants;
import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
import org.apache.eagle.jobrunning.crawler.JobContext;
import org.apache.eagle.jobrunning.crawler.RunningJobCrawler;
import org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl;
import org.apache.eagle.jobrunning.zkres.JobRunningZKStateManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eagle/jobrunning/storm/JobRunningSpout.class */
public class JobRunningSpout extends BaseRichSpout {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(JobRunningSpout.class);
    private RunningJobCrawlConfig config;
    private JobRunningZKStateManager zkStateManager;
    private transient RunningJobCrawler crawler;
    private JobRunningSpoutCollectorInterceptor interceptor;
    private RunningJobCallback callback;
    private ReadWriteLock readWriteLock;
    private static final int DEFAULT_WAIT_SECONDS_BETWEEN_ROUNDS = 10;

    public JobRunningSpout(RunningJobCrawlConfig runningJobCrawlConfig) {
        this(runningJobCrawlConfig, new JobRunningSpoutCollectorInterceptor());
    }

    public JobRunningSpout(RunningJobCrawlConfig runningJobCrawlConfig, JobRunningSpoutCollectorInterceptor jobRunningSpoutCollectorInterceptor) {
        this.config = runningJobCrawlConfig;
        this.interceptor = jobRunningSpoutCollectorInterceptor;
        this.callback = new DefaultRunningJobInputStreamCallback(jobRunningSpoutCollectorInterceptor);
        this.readWriteLock = new ReentrantReadWriteLock();
    }

    private int calculatePartitionId(TopologyContext topologyContext) {
        int thisTaskId = topologyContext.getThisTaskId();
        int i = 0;
        Iterator it = topologyContext.getComponentTasks(topologyContext.getComponentId(thisTaskId)).iterator();
        while (it.hasNext()) {
            if (((Integer) it.next()).intValue() == thisTaskId) {
                return i;
            }
            i++;
        }
        throw new IllegalStateException();
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        int calculatePartitionId = calculatePartitionId(topologyContext);
        if (calculatePartitionId < 0 || calculatePartitionId > this.config.controlConfig.numTotalPartitions) {
            throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " + calculatePartitionId + " and numTotalPartitions " + this.config.controlConfig.numTotalPartitions);
        }
        Class<? extends JobPartitioner> cls = this.config.controlConfig.partitionerCls;
        try {
            JobFilterByPartition jobFilterByPartition = new JobFilterByPartition(cls.newInstance(), this.config.controlConfig.numTotalPartitions, calculatePartitionId);
            this.interceptor.setSpoutOutputCollector(spoutOutputCollector);
            try {
                this.zkStateManager = new JobRunningZKStateManager(this.config);
                this.crawler = new RunningJobCrawlerImpl(this.config, this.zkStateManager, this.callback, jobFilterByPartition, this.readWriteLock);
            } catch (Exception e) {
                LOG.error("failing creating crawler driver");
                throw new IllegalStateException(e);
            }
        } catch (Exception e2) {
            LOG.error("failing instantiating job partitioner class " + cls.getCanonicalName());
            throw new IllegalStateException(e2);
        }
    }

    public void nextTuple() {
        try {
            this.crawler.crawl();
        } catch (Exception e) {
            LOG.error("fail crawling running job and continue ...", e);
        }
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void ack(Object obj) {
        RunningJobMessageId runningJobMessageId = (RunningJobMessageId) obj;
        JobConstants.ResourceType resourceType = runningJobMessageId.type;
        LOG.info("Ack on messageId: " + runningJobMessageId.toString());
        switch (resourceType) {
            case JOB_CONFIGURATION:
            case JOB_COMPLETE_INFO:
                try {
                    this.readWriteLock.readLock().lock();
                    this.zkStateManager.addProcessedJob(resourceType, runningJobMessageId.jobID);
                    this.crawler.removeFromProcessingList(resourceType, new JobContext(runningJobMessageId.jobID, null, null));
                    try {
                        this.readWriteLock.readLock().unlock();
                        LOG.info("Read lock released");
                        return;
                    } catch (Throwable th) {
                        LOG.error("Fail to release Read lock", th);
                        return;
                    }
                } catch (Throwable th2) {
                    try {
                        this.readWriteLock.readLock().unlock();
                        LOG.info("Read lock released");
                    } catch (Throwable th3) {
                        LOG.error("Fail to release Read lock", th3);
                    }
                    throw th2;
                }
            default:
                return;
        }
    }

    public void fail(Object obj) {
        RunningJobMessageId runningJobMessageId = (RunningJobMessageId) obj;
        JobConstants.ResourceType resourceType = runningJobMessageId.type;
        if (resourceType.equals(JobConstants.ResourceType.JOB_COMPLETE_INFO) || resourceType.equals(JobConstants.ResourceType.JOB_CONFIGURATION)) {
            try {
                this.readWriteLock.readLock().lock();
                this.crawler.removeFromProcessingList(resourceType, new JobContext(runningJobMessageId.jobID, null, null));
                try {
                    this.readWriteLock.readLock().unlock();
                    LOG.info("Read lock released");
                } catch (Throwable th) {
                    LOG.error("Fail to release Read lock", th);
                }
            } catch (Throwable th2) {
                try {
                    this.readWriteLock.readLock().unlock();
                    LOG.info("Read lock released");
                } catch (Throwable th3) {
                    LOG.error("Fail to release Read lock", th3);
                }
                throw th2;
            }
        }
    }

    public void deactivate() {
    }

    public void close() {
    }
}
