package eu.icolumbo.breeze;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/icolumbo/breeze/SpringSpout.class */
public class SpringSpout extends SpringComponent implements ConfiguredSpout {
    private static final Logger logger = LoggerFactory.getLogger(SpringSpout.class);
    private static final long serialVersionUID = 8;
    private SpoutOutputCollector collector;
    private final Map<Class<? extends Exception>, Long> delayExceptions;
    private FunctionSignature ackSignature;
    private FunctionSignature failSignature;
    private transient Method ackMethod;
    private transient Method failMethod;

    public SpringSpout(Class<?> cls, String str, String... strArr) {
        super(cls, str, strArr);
        this.delayExceptions = new HashMap();
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        logger.trace("{} Storm init", this);
        this.collector = spoutOutputCollector;
        super.init(map, topologyContext);
        try {
            if (this.ackSignature != null) {
                this.ackMethod = this.ackSignature.findMethod(this.beanType);
                logger.info("{} uses {} for transaction acknowledgement", this, this.ackMethod.toGenericString());
            }
            if (this.failSignature != null) {
                this.failMethod = this.failSignature.findMethod(this.beanType);
                logger.info("{} uses {} for transaction failures", this, this.failMethod.toGenericString());
            }
        } catch (ReflectiveOperationException e) {
            throw new IllegalStateException("Unusable transaction signature", e);
        }
    }

    public void nextTuple() {
        logger.trace("{} next", this);
        try {
            Object[] invoke = invoke(EMPTY_ARRAY);
            String outputStreamId = getOutputStreamId();
            logger.debug("{} provides {} tuples to stream {}", new Object[]{this, Integer.valueOf(invoke.length), outputStreamId});
            for (Object obj : invoke) {
                try {
                    Values mapping = getMapping(obj);
                    if (this.failSignature == null && this.ackSignature == null) {
                        logger.trace("Tuple emit");
                        this.collector.emit(outputStreamId, mapping);
                    } else {
                        logger.trace("Transactional tuple emit");
                        TransactionMessageId transactionMessageId = new TransactionMessageId();
                        if (this.failSignature != null) {
                            transactionMessageId.setFailParams(mapOutputFields(obj, this.failSignature.getArguments()));
                        }
                        if (this.ackSignature != null) {
                            transactionMessageId.setAckParams(mapOutputFields(obj, this.ackSignature.getArguments()));
                        }
                        this.collector.emit(outputStreamId, mapping, transactionMessageId);
                    }
                } catch (Exception e) {
                    throw new InvocationTargetException(e);
                }
            }
        } catch (IllegalAccessException e2) {
            throw new SecurityException(e2);
        } catch (InvocationTargetException e3) {
            Throwable cause = e3.getCause();
            Class<?> cls = cause.getClass();
            for (Map.Entry<Class<? extends Exception>, Long> entry : this.delayExceptions.entrySet()) {
                if (entry.getKey().isAssignableFrom(cls)) {
                    Long value = entry.getValue();
                    logger.info("{} triggers a {}ms delay", cls.getSimpleName(), value);
                    Utils.sleep(value.longValue());
                    return;
                }
            }
            this.collector.reportError(cause);
        }
    }

    public void close() {
    }

    public void activate() {
    }

    public void deactivate() {
    }

    public void ack(Object obj) {
        if (!(obj instanceof TransactionMessageId)) {
            logger.warn("Ack with unknown message ID: {}", obj);
            return;
        }
        Object[] ackParams = ((TransactionMessageId) obj).getAckParams();
        logger.trace("Ack with: {}", ackParams);
        try {
            invoke(this.ackMethod, ackParams);
        } catch (Exception e) {
            logger.error("Ack notification abort", e);
        }
    }

    public void fail(Object obj) {
        if (!(obj instanceof TransactionMessageId)) {
            logger.warn("Fail with unknown message ID: {}", obj);
            return;
        }
        Object[] failParams = ((TransactionMessageId) obj).getFailParams();
        logger.trace("Fail with: {}", failParams);
        try {
            invoke(this.failMethod, failParams);
        } catch (Exception e) {
            logger.error("Fail notification abort", e);
        }
    }

    public void setAckSignature(String str) {
        this.ackSignature = FunctionSignature.valueOf(str);
    }

    public void setFailSignature(String str) {
        this.failSignature = FunctionSignature.valueOf(str);
    }

    public void setDelayExceptions(Map<Class<? extends Exception>, Long> map) {
        this.delayExceptions.clear();
        for (Map.Entry<Class<? extends Exception>, Long> entry : map.entrySet()) {
            putDelayException(entry.getKey(), entry.getValue().longValue());
        }
    }

    public void putDelayException(Class<? extends Exception> cls, long j) {
        this.delayExceptions.put(cls, Long.valueOf(j));
    }

    @Override // eu.icolumbo.breeze.ConfiguredComponent
    public String toString() {
        StringBuilder sb = new StringBuilder("[spout '");
        sb.append(getId()).append("']");
        return sb.toString();
    }
}
