/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task.execute.spout;

import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.execute.spout.SpoutExecutors;
import com.alibaba.jstorm.utils.RotatingMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultipleThreadSpoutExecutors
extends SpoutExecutors {
    private static Logger LOG = LoggerFactory.getLogger(MultipleThreadSpoutExecutors.class);

    public MultipleThreadSpoutExecutors(Task task) {
        super(task);
        this.ackerRunnableThread = new AsyncLoopThread(new AckerRunnable(), false, 5, false);
    }

    @Override
    public void mkPending() {
        this.pending = new RotatingMap(3, null, false);
    }

    @Override
    public void init() throws Exception {
        super.init();
        this.ackerRunnableThread.start();
    }

    @Override
    public String getThreadName() {
        return this.idStr + "-" + MultipleThreadSpoutExecutors.class.getSimpleName();
    }

    @Override
    public void run() {
        if (!this.isFinishInit) {
            this.initWrapper();
        }
        super.nextTuple();
    }

    class AckerRunnable
    extends RunnableCallback {
        private AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();

        AckerRunnable() {
        }

        @Override
        public String getThreadName() {
            return MultipleThreadSpoutExecutors.this.idStr + "-" + AckerRunnable.class.getSimpleName();
        }

        @Override
        public void preRun() {
            WorkerClassLoader.switchThreadContext();
        }

        @Override
        public void postRun() {
            WorkerClassLoader.restoreThreadContext();
        }

        @Override
        public void run() {
            LOG.info("Successfully start Spout's acker thread " + MultipleThreadSpoutExecutors.this.idStr);
            while (!this.shutdown.get()) {
                try {
                    MultipleThreadSpoutExecutors.this.exeQueue.consumeBatchWhenAvailable(MultipleThreadSpoutExecutors.this);
                    MultipleThreadSpoutExecutors.this.processControlEvent();
                }
                catch (Exception e) {
                    if (this.shutdown.get()) continue;
                    LOG.error("Actor occur unknow exception ", (Throwable)e);
                    MultipleThreadSpoutExecutors.this.report_error.report(e);
                }
            }
            LOG.info("Successfully shutdown Spout's acker thread " + MultipleThreadSpoutExecutors.this.idStr);
        }

        @Override
        public Object getResult() {
            LOG.info("Begin to shutdown Spout's acker thread " + MultipleThreadSpoutExecutors.this.idStr);
            return -1;
        }
    }
}

