package acromusashi.stream.component.kestrel.spout;

import backtype.storm.spout.MultiScheme;
import backtype.storm.spout.RawMultiScheme;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import net.lag.kestrel.thrift.Item;
import org.apache.thrift7.TException;
import org.elasticsearch.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/component/kestrel/spout/KestrelSpout.class */
public class KestrelSpout extends BaseRichSpout {
    private static final long serialVersionUID = -4508764195597194300L;
    public static final long DEFAULT_BLACKLIST_TIME_MS = 60000;
    public static final int DEFAULT_BATCH_SIZE = 4000;
    private static final Logger logger = LoggerFactory.getLogger(KestrelSpout.class);
    protected long blackListTimeMs;
    protected int batchSize;
    protected int messageTimeoutMs;
    protected List<HostInfo> hostInfos;
    private String queueName;
    private transient SpoutOutputCollector collector;
    private MultiScheme messageScheme;
    private transient List<KestrelClientInfo> clientInfoList;
    private int emitIndex;
    private transient Queue<EmitItem> emitBuffer;

    public KestrelSpout(List<String> list, int i, String str, Scheme scheme) {
        this(list, i, str, (MultiScheme) new SchemeAsMultiScheme(scheme));
    }

    public KestrelSpout(List<String> list, String str, MultiScheme multiScheme) {
        this.blackListTimeMs = DEFAULT_BLACKLIST_TIME_MS;
        this.batchSize = DEFAULT_BATCH_SIZE;
        this.hostInfos = null;
        this.queueName = null;
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Must configure at least one host");
        }
        this.hostInfos = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String[] split = it.next().split(":");
            this.hostInfos.add(new HostInfo(split[0], Integer.parseInt(split[1])));
        }
        this.queueName = str;
        this.messageScheme = multiScheme;
    }

    public KestrelSpout(List<String> list, int i, String str, MultiScheme multiScheme) {
        this.blackListTimeMs = DEFAULT_BLACKLIST_TIME_MS;
        this.batchSize = DEFAULT_BATCH_SIZE;
        this.hostInfos = null;
        this.queueName = null;
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Must configure at least one host");
        }
        this.hostInfos = Lists.newArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            this.hostInfos.add(new HostInfo(it.next(), i));
        }
        this.queueName = str;
        this.messageScheme = multiScheme;
    }

    public KestrelSpout(String str, int i, String str2, Scheme scheme) {
        this(str, i, str2, (MultiScheme) new SchemeAsMultiScheme(scheme));
    }

    public KestrelSpout(String str, int i, String str2, MultiScheme multiScheme) {
        this(Lists.newArrayList(new String[]{str}), i, str2, multiScheme);
    }

    public KestrelSpout(String str, int i, String str2) {
        this(str, i, str2, (MultiScheme) new RawMultiScheme());
    }

    public KestrelSpout(List<String> list, int i, String str) {
        this(list, i, str, (MultiScheme) new RawMultiScheme());
    }

    public Fields getOutputFields() {
        return this.messageScheme.getOutputFields();
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.messageTimeoutMs = (int) TimeUnit.SECONDS.toMillis(((Number) map.get("topology.message.timeout.secs")).longValue());
        this.collector = spoutOutputCollector;
        this.emitIndex = 0;
        this.clientInfoList = Lists.newArrayList();
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        int thisTaskIndex = topologyContext.getThisTaskIndex();
        int size2 = this.hostInfos.size();
        if (size < size2) {
            for (HostInfo hostInfo : this.hostInfos) {
                this.clientInfoList.add(new KestrelClientInfo(hostInfo.getHost(), hostInfo.getPort()));
            }
        } else {
            HostInfo hostInfo2 = this.hostInfos.get(thisTaskIndex % size2);
            this.clientInfoList.add(new KestrelClientInfo(hostInfo2.getHost(), hostInfo2.getPort()));
        }
        this.emitBuffer = new LinkedList();
    }

    public void close() {
        Iterator<KestrelClientInfo> it = this.clientInfoList.iterator();
        while (it.hasNext()) {
            it.next().closeClient();
        }
        this.emitBuffer.clear();
        this.clientInfoList.clear();
    }

    public boolean bufferKestrelGet(int i) {
        KestrelClientInfo kestrelClientInfo = this.clientInfoList.get(i);
        if (System.currentTimeMillis() <= kestrelClientInfo.blacklistTillTimeMs.longValue()) {
            return false;
        }
        try {
            List<Item> list = kestrelClientInfo.getValidClient().get(this.queueName, DEFAULT_BATCH_SIZE, 0, this.messageTimeoutMs);
            HashSet newHashSet = Sets.newHashSet();
            for (Item item : list) {
                Iterable deserialize = this.messageScheme.deserialize(item.get_data());
                if (deserialize != null) {
                    Iterator it = deserialize.iterator();
                    while (it.hasNext()) {
                        if (!this.emitBuffer.offer(generateEmitItem((List) it.next(), new KestrelSourceId(i, item.get_id())))) {
                            throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
                        }
                    }
                } else {
                    newHashSet.add(Long.valueOf(item.get_id()));
                }
            }
            if (newHashSet.size() > 0) {
                try {
                    kestrelClientInfo.getClient().confirm(this.queueName, newHashSet);
                } catch (TException e) {
                    blacklist(kestrelClientInfo, e);
                }
            }
            return list.size() > 0;
        } catch (TException e2) {
            blacklist(kestrelClientInfo, e2);
            return false;
        }
    }

    public void tryEachKestrelUntilBufferFilled() {
        int i = 0;
        while (true) {
            if (i >= this.clientInfoList.size()) {
                break;
            }
            int size = (this.emitIndex + i) % this.clientInfoList.size();
            if (bufferKestrelGet(size)) {
                this.emitIndex = size;
                break;
            }
            i++;
        }
        this.emitIndex = (this.emitIndex + 1) % this.clientInfoList.size();
    }

    public void nextTuple() {
        if (isRestricted()) {
            return;
        }
        if (this.emitBuffer.isEmpty()) {
            tryEachKestrelUntilBufferFilled();
        }
        EmitItem poll = this.emitBuffer.poll();
        if (poll != null) {
            this.collector.emit(poll.getTuple(), poll.getSourceId());
        }
    }

    protected EmitItem generateEmitItem(List<Object> list, KestrelSourceId kestrelSourceId) {
        return new EmitItem(list, kestrelSourceId);
    }

    protected boolean isRestricted() {
        return false;
    }

    private void blacklist(KestrelClientInfo kestrelClientInfo, Throwable th) {
        logger.warn("Failed to read from Kestrel at " + kestrelClientInfo.host + ":" + kestrelClientInfo.port, th);
        kestrelClientInfo.closeClient();
        kestrelClientInfo.blacklistTillTimeMs = Long.valueOf(System.currentTimeMillis() + DEFAULT_BLACKLIST_TIME_MS);
        int indexOf = this.clientInfoList.indexOf(kestrelClientInfo);
        Iterator<EmitItem> it = this.emitBuffer.iterator();
        while (it.hasNext()) {
            if (it.next().getSourceId().getIndex() == indexOf) {
                it.remove();
            }
        }
    }

    public void ack(Object obj) {
        KestrelSourceId kestrelSourceId = (KestrelSourceId) obj;
        KestrelClientInfo kestrelClientInfo = this.clientInfoList.get(kestrelSourceId.getIndex());
        try {
            if (kestrelClientInfo.getClient() != null) {
                kestrelClientInfo.getClient().confirm(this.queueName, Sets.newHashSet(new Long[]{Long.valueOf(kestrelSourceId.getId())}));
            }
        } catch (TException e) {
            blacklist(kestrelClientInfo, e);
        }
    }

    public void fail(Object obj) {
        KestrelSourceId kestrelSourceId = (KestrelSourceId) obj;
        KestrelClientInfo kestrelClientInfo = this.clientInfoList.get(kestrelSourceId.getIndex());
        try {
            if (kestrelClientInfo.getClient() != null) {
                kestrelClientInfo.getClient().abort(this.queueName, Sets.newHashSet(new Long[]{Long.valueOf(kestrelSourceId.getId())}));
            }
        } catch (TException e) {
            blacklist(kestrelClientInfo, e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(getOutputFields());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getQueueName() {
        return this.queueName;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setQueueName(String str) {
        this.queueName = str;
    }
}
