package com.alibaba.jstorm.event;

import com.alibaba.jstorm.callback.RunnableCallback;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/event/EventManagerImp.class */
public class EventManagerImp extends RunnableCallback implements EventManager {
    private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class);
    private AtomicInteger added = new AtomicInteger();
    private AtomicInteger processed = new AtomicInteger();
    private LinkedBlockingQueue<RunnableCallback> queue = new LinkedBlockingQueue<>();
    private Exception e;

    public void proccessinc() {
        this.processed.incrementAndGet();
    }

    @Override // com.alibaba.jstorm.event.EventManager
    public void add(RunnableCallback runnableCallback) {
        this.added.incrementAndGet();
        this.queue.add(runnableCallback);
    }

    @Override // com.alibaba.jstorm.event.EventManager
    public boolean waiting() {
        return this.processed.get() == this.added.get();
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback
    public Exception error() {
        return this.e;
    }

    @Override // com.alibaba.jstorm.callback.RunnableCallback, java.lang.Runnable
    public void run() {
        try {
            RunnableCallback take = this.queue.take();
            if (take == null) {
                return;
            }
            take.run();
            this.e = take.error();
            proccessinc();
        } catch (InterruptedException e) {
            LOG.info("InterruptedException when processing event ");
        }
    }
}
