package org.apache.eagle.jobrunning.crawler;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.job.JobFilter;
import org.apache.eagle.jobrunning.callback.RunningJobCallback;
import org.apache.eagle.jobrunning.common.JobConstants;
import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
import org.apache.eagle.jobrunning.util.JobUtils;
import org.apache.eagle.jobrunning.yarn.model.AppInfo;
import org.apache.eagle.jobrunning.zkres.JobRunningZKStateManager;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eagle/jobrunning/crawler/RunningJobCrawlerImpl.class */
public class RunningJobCrawlerImpl implements RunningJobCrawler {
    protected RunningJobCrawlConfig.RunningJobEndpointConfig endpointConfig;
    protected RunningJobCrawlConfig.ControlConfig controlConfig;
    protected JobFilter jobFilter;
    private ResourceFetcher fetcher;
    private JobRunningZKStateManager zkStateManager;
    private Thread jobConfigProcessThread;
    private Thread jobCompleteInfoProcessThread;
    private Thread jobCompleteStatusCheckerThread;
    private Thread zkCleanupThread;
    private final RunningJobCallback callback;
    private ReadWriteLock readWriteLock;
    private BlockingQueue<JobContext> queueOfConfig;
    private BlockingQueue<JobContext> queueOfCompleteJobInfo;
    private static final int DEFAULT_CONFIG_THREAD_COUNT = 20;
    private static final Logger LOG = LoggerFactory.getLogger(RunningJobCrawlerImpl.class);
    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
    private Map<JobConstants.ResourceType, Map<String, JobContext>> processingJobMap = new ConcurrentHashMap();
    private final long DELAY_TO_UPDATE_COMPLETION_JOB_INFO = 300000;

    public RunningJobCrawlerImpl(RunningJobCrawlConfig runningJobCrawlConfig, JobRunningZKStateManager jobRunningZKStateManager, RunningJobCallback runningJobCallback, JobFilter jobFilter, ReadWriteLock readWriteLock) {
        this.endpointConfig = runningJobCrawlConfig.endPointConfig;
        this.controlConfig = runningJobCrawlConfig.controlConfig;
        this.callback = runningJobCallback;
        this.fetcher = new RMResourceFetcher(this.endpointConfig);
        this.jobFilter = jobFilter;
        this.readWriteLock = readWriteLock;
        if (runningJobCrawlConfig.controlConfig.jobInfoEnabled) {
            this.jobCompleteInfoProcessThread = new Thread() { // from class: org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    RunningJobCrawlerImpl.this.startCompleteJobInfoProcessThread();
                }
            };
            this.jobCompleteInfoProcessThread.setName("JobCompleteInfo-process-thread");
            this.jobCompleteInfoProcessThread.setDaemon(true);
            this.jobCompleteStatusCheckerThread = new Thread() { // from class: org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    RunningJobCrawlerImpl.this.startCompleteStatusCheckerThread();
                }
            };
            this.jobCompleteStatusCheckerThread.setName("JobComplete-statusChecker-thread");
            this.jobCompleteStatusCheckerThread.setDaemon(true);
        }
        if (runningJobCrawlConfig.controlConfig.jobConfigEnabled) {
            this.jobConfigProcessThread = new Thread() { // from class: org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    RunningJobCrawlerImpl.this.startJobConfigProcessThread();
                }
            };
            this.jobConfigProcessThread.setName("JobConfig-process-thread");
            this.jobConfigProcessThread.setDaemon(true);
        }
        this.zkCleanupThread = new Thread() { // from class: org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                RunningJobCrawlerImpl.this.startzkCleanupThread();
            }
        };
        this.zkCleanupThread.setName("zk-cleanup-thread");
        this.zkCleanupThread.setDaemon(true);
        this.zkStateManager = jobRunningZKStateManager;
        this.processingJobMap.put(JobConstants.ResourceType.JOB_CONFIGURATION, new ConcurrentHashMap());
        this.processingJobMap.put(JobConstants.ResourceType.JOB_COMPLETE_INFO, new ConcurrentHashMap());
        this.queueOfConfig = new ArrayBlockingQueue(this.controlConfig.sizeOfJobConfigQueue);
        this.queueOfCompleteJobInfo = new ArrayBlockingQueue(this.controlConfig.sizeOfJobCompletedInfoQueue);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startJobConfigProcessThread() {
        LOG.info("Job Config crawler main thread started, pool size: 20");
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CONFIG_THREAD_COUNT, DEFAULT_CONFIG_THREAD_COUNT, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: org.apache.eagle.jobrunning.crawler.RunningJobCrawlerImpl.5
            private final AtomicInteger count = new AtomicInteger(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                this.count.incrementAndGet();
                Thread newThread = Executors.defaultThreadFactory().newThread(runnable);
                newThread.setName("config-crawler-workthread-" + this.count.get());
                return newThread;
            }
        });
        while (true) {
            try {
                JobContext take = this.queueOfConfig.take();
                LOG.info("queueOfConfig size: " + this.queueOfConfig.size());
                threadPoolExecutor.execute(new ConfigWorkTask(new JobContext(take), this.fetcher, this.callback, this));
            } catch (InterruptedException e) {
                LOG.warn("Got an InterruptedException: " + e.getMessage());
            } catch (RejectedExecutionException e2) {
                LOG.warn("Got RejectedExecutionException: " + e2.getMessage());
            } catch (Throwable th) {
                LOG.warn("Got an throwable t, " + th.getMessage());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startCompleteJobInfoProcessThread() {
        while (true) {
            JobContext jobContext = null;
            try {
                jobContext = this.queueOfCompleteJobInfo.take();
            } catch (InterruptedException e) {
            }
            while (System.currentTimeMillis() < jobContext.fetchedTime.longValue() + 300000) {
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e2) {
                }
            }
            try {
                this.callback.onJobRunningInformation(jobContext, JobConstants.ResourceType.JOB_COMPLETE_INFO, this.fetcher.getResource(JobConstants.ResourceType.JOB_COMPLETE_INFO, JobUtils.getAppIDByJobID(jobContext.jobId)));
            } catch (Exception e3) {
                if (e3.getMessage().contains("Server returned HTTP response code: 500")) {
                    LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
                } else {
                    LOG.error("Got an exception when fetching resource ", e3);
                }
            }
        }
    }

    public void startCompleteStatusCheckerThread() {
        while (true) {
            try {
                List<Object> resource = this.fetcher.getResource(JobConstants.ResourceType.JOB_LIST, JobConstants.JobState.COMPLETED.name());
                if (resource == null) {
                    LOG.warn("Current Completed Job List is Empty");
                } else {
                    List<AppInfo> list = (List) resource.get(0);
                    HashSet hashSet = new HashSet();
                    for (AppInfo appInfo : list) {
                        if (JobConstants.YarnApplicationType.MAPREDUCE.name().equals(appInfo.getApplicationType()) && this.jobFilter.accept(appInfo.getUser())) {
                            if (System.currentTimeMillis() - appInfo.getFinishedTime() < this.controlConfig.completedJobOutofDateTimeInMin * 60000) {
                                hashSet.add(new JobContext(JobUtils.getJobIDByAppID(appInfo.getId()), appInfo.getUser(), Long.valueOf(System.currentTimeMillis())));
                            }
                        }
                    }
                    if (this.controlConfig.jobConfigEnabled) {
                        addIntoProcessingQueueAndList(hashSet, this.queueOfConfig, JobConstants.ResourceType.JOB_CONFIGURATION);
                    }
                    if (this.controlConfig.jobInfoEnabled) {
                        addIntoProcessingQueueAndList(hashSet, this.queueOfCompleteJobInfo, JobConstants.ResourceType.JOB_COMPLETE_INFO);
                    }
                    Thread.sleep(20000L);
                }
            } catch (Throwable th) {
                LOG.error("Got a throwable in fetching job completed list :", th);
            }
        }
    }

    public void startzkCleanupThread() {
        LOG.info("zk cleanup thread started");
        while (true) {
            try {
                String format = DateTimeUtil.format(System.currentTimeMillis() - (this.controlConfig.zkCleanupTimeInday * 86400000), JobRunningZKStateManager.DATE_FORMAT_PATTERN);
                this.zkStateManager.truncateJobBefore(JobConstants.ResourceType.JOB_CONFIGURATION, format);
                this.zkStateManager.truncateJobBefore(JobConstants.ResourceType.JOB_COMPLETE_INFO, format);
                Thread.sleep(1800000L);
            } catch (Throwable th) {
                LOG.warn("Got an throwable, t: ", th);
            }
        }
    }

    public void addIntoProcessingQueueAndList(Set<JobContext> set, BlockingQueue<JobContext> blockingQueue, JobConstants.ResourceType resourceType) {
        try {
            this.readWriteLock.writeLock().lock();
            LOG.info("Write lock acquired");
            List<String> readProcessedJobs = this.zkStateManager.readProcessedJobs(resourceType);
            readProcessedJobs.addAll(extractJobList(resourceType));
            for (JobContext jobContext : set) {
                if (!readProcessedJobs.contains(jobContext.jobId)) {
                    addIntoProcessingList(resourceType, jobContext);
                    blockingQueue.add(jobContext);
                }
            }
        } finally {
            try {
                this.readWriteLock.writeLock().unlock();
                LOG.info("Write lock released");
            } catch (Throwable th) {
                LOG.error("Fail to release Write lock", th);
            }
        }
    }

    private List<String> extractJobList(JobConstants.ResourceType resourceType) {
        return Arrays.asList(this.processingJobMap.get(resourceType).keySet().toArray(new String[0]));
    }

    @Override // org.apache.eagle.jobrunning.crawler.RunningJobCrawler
    public void crawl() throws Exception {
        if (this.jobConfigProcessThread != null && !this.jobConfigProcessThread.isAlive()) {
            this.jobConfigProcessThread.start();
        }
        if (this.jobCompleteInfoProcessThread != null && !this.jobCompleteInfoProcessThread.isAlive()) {
            this.jobCompleteInfoProcessThread.start();
        }
        if (this.jobCompleteStatusCheckerThread != null && !this.jobCompleteStatusCheckerThread.isAlive()) {
            this.jobCompleteStatusCheckerThread.start();
        }
        if (!this.zkCleanupThread.isAlive()) {
            this.zkCleanupThread.start();
        }
        List<Object> resource = this.fetcher.getResource(JobConstants.ResourceType.JOB_LIST, JobConstants.JobState.RUNNING.name());
        if (resource == null) {
            LOG.warn("Current Running Job List is Empty");
            return;
        }
        List<AppInfo> list = (List) resource.get(0);
        LOG.info("Current Running Job List size : " + list.size());
        HashSet hashSet = new HashSet();
        for (AppInfo appInfo : list) {
            if (JobConstants.YarnApplicationType.MAPREDUCE.name().equals(appInfo.getApplicationType()) && this.jobFilter.accept(appInfo.getUser())) {
                hashSet.add(new JobContext(JobUtils.getJobIDByAppID(appInfo.getId()), appInfo.getUser(), Long.valueOf(System.currentTimeMillis())));
            }
        }
        if (this.controlConfig.jobConfigEnabled) {
            addIntoProcessingQueueAndList(hashSet, this.queueOfConfig, JobConstants.ResourceType.JOB_CONFIGURATION);
        }
        if (this.controlConfig.jobInfoEnabled) {
            for (JobContext jobContext : hashSet) {
                try {
                    this.callback.onJobRunningInformation(jobContext, JobConstants.ResourceType.JOB_RUNNING_INFO, this.fetcher.getResource(JobConstants.ResourceType.JOB_RUNNING_INFO, JobUtils.getAppIDByJobID(jobContext.jobId)));
                } catch (Exception e) {
                    if (e.getMessage().contains("Server returned HTTP response code: 500")) {
                        LOG.warn("The server returns 500 error, it's probably caused by job ACL setting, going to skip this job");
                    } else {
                        LOG.error("Got an exception when fetching resource, jobId: " + jobContext.jobId, e);
                    }
                }
            }
        }
    }

    @Override // org.apache.eagle.jobrunning.crawler.RunningJobCrawler
    public void addIntoProcessingList(JobConstants.ResourceType resourceType, JobContext jobContext) {
        this.processingJobMap.get(resourceType).put(jobContext.jobId, jobContext);
    }

    @Override // org.apache.eagle.jobrunning.crawler.RunningJobCrawler
    public void removeFromProcessingList(JobConstants.ResourceType resourceType, JobContext jobContext) {
        this.processingJobMap.get(resourceType).remove(jobContext.jobId);
    }

    static {
        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
    }
}
