package acromusashi.stream.spout;

import backtype.storm.task.TopologyContext;
import backtype.storm.utils.RotatingMap;
import java.text.MessageFormat;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.thrift7.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;

/* loaded from: input_file:acromusashi/stream/spout/DrpcTridentEmitter.class */
public abstract class DrpcTridentEmitter implements ITridentSpout.Emitter<Object> {
    private static final Logger logger = LoggerFactory.getLogger(DrpcTridentEmitter.class);
    private static final int ROTATE_SIZE = 3;
    protected String function;
    protected boolean prepared;
    protected RotatingMap<TransactionAttempt, DrpcRequestInfo> idsMap;
    protected long lastRotate;
    protected long rotateTime;
    protected Map stormConf;
    protected TopologyContext context;
    protected transient DrpcFetchHelper fetchHelper;

    public void initialize(Map map, TopologyContext topologyContext, String str) {
        this.context = topologyContext;
        this.stormConf = map;
        this.function = str;
        this.idsMap = new RotatingMap<>(ROTATE_SIZE);
        this.rotateTime = TimeUnit.SECONDS.toMillis(((Number) map.get("topology.message.timeout.secs")).intValue());
        this.lastRotate = getCurrentTime();
    }

    protected abstract void prepare(Map map, TopologyContext topologyContext);

    public void emitBatch(TransactionAttempt transactionAttempt, Object obj, TridentCollector tridentCollector) {
        long currentTime = getCurrentTime();
        if (currentTime - this.lastRotate > this.rotateTime) {
            for (Map.Entry entry : this.idsMap.rotate().entrySet()) {
                fail((TransactionAttempt) entry.getKey(), (DrpcRequestInfo) entry.getValue());
            }
            this.lastRotate = currentTime;
        }
        if (this.idsMap.containsKey(transactionAttempt)) {
            fail(transactionAttempt, (DrpcRequestInfo) this.idsMap.remove(transactionAttempt));
        }
        if (!this.prepared) {
            prepare(this.stormConf, this.context);
            this.fetchHelper = createFetchHelper();
            this.fetchHelper.initialize(this.stormConf, this.function);
            this.prepared = true;
        }
        DrpcRequestInfo drpcRequestInfo = null;
        try {
            drpcRequestInfo = this.fetchHelper.fetch();
        } catch (TException e) {
            logger.warn("DRPC fetch failed.", e);
        }
        if (drpcRequestInfo != null) {
            if (logger.isDebugEnabled()) {
                logger.debug(MessageFormat.format("Request info get succeeded. TransactionAttempt={0}, DrpcRequestInfo={1}", transactionAttempt, drpcRequestInfo));
            }
            emitTuples(drpcRequestInfo.getFuncArgs(), tridentCollector);
            this.idsMap.put(transactionAttempt, drpcRequestInfo);
        }
    }

    protected abstract void emitTuples(String str, TridentCollector tridentCollector);

    public void success(TransactionAttempt transactionAttempt) {
        DrpcRequestInfo drpcRequestInfo = (DrpcRequestInfo) this.idsMap.remove(transactionAttempt);
        if (drpcRequestInfo != null) {
            ack(transactionAttempt, drpcRequestInfo);
        }
    }

    protected void ack(TransactionAttempt transactionAttempt, DrpcRequestInfo drpcRequestInfo) {
        if (logger.isDebugEnabled()) {
            logger.debug(MessageFormat.format("Transaction succeeded. TransactionAttempt={0}, DrpcRequestInfo={1}", transactionAttempt, drpcRequestInfo));
        }
        try {
            this.fetchHelper.ack(drpcRequestInfo.getRequestId(), "Succeeded");
        } catch (TException e) {
            logger.warn(MessageFormat.format("Success notify failed. TransactionAttempt={0}, DrpcRequestInfo={1}", transactionAttempt, drpcRequestInfo));
        }
    }

    protected void fail(TransactionAttempt transactionAttempt, DrpcRequestInfo drpcRequestInfo) {
        if (logger.isDebugEnabled()) {
            logger.debug(MessageFormat.format("Transaction failed. TransactionAttempt={0}, DrpcRequestInfo={1}", transactionAttempt, drpcRequestInfo));
        }
        try {
            this.fetchHelper.fail(drpcRequestInfo.getRequestId());
        } catch (TException e) {
            logger.warn(MessageFormat.format("Fail notify failed. TransactionAttempt={0}, DrpcRequestInfo={1}", transactionAttempt, drpcRequestInfo));
        }
    }

    public void close() {
    }

    protected long getCurrentTime() {
        return System.currentTimeMillis();
    }

    protected DrpcFetchHelper createFetchHelper() {
        return new DrpcFetchHelper();
    }
}
