package org.apache.heron.api.bolt;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Logger;
import org.apache.heron.api.Config;
import org.apache.heron.api.bolt.BaseWindowedBolt;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.state.HashMapState;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.Event;
import org.apache.heron.api.windowing.EvictionPolicy;
import org.apache.heron.api.windowing.TimestampExtractor;
import org.apache.heron.api.windowing.TriggerPolicy;
import org.apache.heron.api.windowing.TupleWindowImpl;
import org.apache.heron.api.windowing.WaterMarkEventGenerator;
import org.apache.heron.api.windowing.WindowLifecycleListener;
import org.apache.heron.api.windowing.WindowManager;
import org.apache.heron.api.windowing.WindowingConfigs;
import org.apache.heron.api.windowing.evictors.CountEvictionPolicy;
import org.apache.heron.api.windowing.evictors.TimeEvictionPolicy;
import org.apache.heron.api.windowing.evictors.WatermarkCountEvictionPolicy;
import org.apache.heron.api.windowing.evictors.WatermarkTimeEvictionPolicy;
import org.apache.heron.api.windowing.triggers.CountTriggerPolicy;
import org.apache.heron.api.windowing.triggers.TimeTriggerPolicy;
import org.apache.heron.api.windowing.triggers.WatermarkCountTriggerPolicy;
import org.apache.heron.api.windowing.triggers.WatermarkTimeTriggerPolicy;
import org.apache.heron.common.basics.TypeUtils;

/* loaded from: input_file:org/apache/heron/api/bolt/WindowedBoltExecutor.class */
public class WindowedBoltExecutor implements IRichBolt, IStatefulComponent<Serializable, Serializable> {
    private static final long serialVersionUID = -9204275913034895392L;
    private static final Logger LOG = Logger.getLogger(WindowedBoltExecutor.class.getName());
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000;
    private static final int DEFAULT_MAX_LAG_MS = 0;
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private final IWindowedBolt bolt;
    private transient WindowedOutputCollector windowedOutputCollector;
    private transient WindowLifecycleListener<Tuple> listener;
    private transient WindowManager<Tuple> windowManager;
    private transient int maxLagMs;
    private TimestampExtractor timestampExtractor;
    private transient String lateTupleStream;
    private transient TriggerPolicy<Tuple, ?> triggerPolicy;
    private transient EvictionPolicy<Tuple, ?> evictionPolicy;
    private transient Long windowLengthDurationMs;
    private State<Serializable, Serializable> state;
    private static final String WINDOWING_INTERNAL_STATE = "windowing.internal.state";
    protected transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/heron/api/bolt/WindowedBoltExecutor$WindowedOutputCollector.class */
    public static class WindowedOutputCollector extends OutputCollector {
        private List<Tuple> inputTuples;

        WindowedOutputCollector(IOutputCollector iOutputCollector) {
            super(iOutputCollector);
        }

        void setContext(List<Tuple> list) {
            this.inputTuples = list;
        }

        @Override // org.apache.heron.api.bolt.OutputCollector
        public List<Integer> emit(String str, List<Object> list) {
            return emit(str, this.inputTuples, list);
        }

        @Override // org.apache.heron.api.bolt.OutputCollector
        public void emitDirect(int i, String str, List<Object> list) {
            emitDirect(i, str, this.inputTuples, list);
        }
    }

    public WindowedBoltExecutor(IWindowedBolt iWindowedBolt) {
        this.bolt = iWindowedBolt;
        this.timestampExtractor = iWindowedBolt.getTimestampExtractor();
    }

    protected int getTopologyTimeoutMillis(Map<String, Object> map) {
        if (map.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) != null && !Boolean.parseBoolean((String) map.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS))) {
            return Integer.MAX_VALUE;
        }
        int i = DEFAULT_MAX_LAG_MS;
        if (map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) != null) {
            i = TypeUtils.getInteger(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        }
        return i * DEFAULT_WATERMARK_EVENT_INTERVAL_MS;
    }

    private int getMaxSpoutPending(Map<String, Object> map) {
        int i = Integer.MAX_VALUE;
        if (map.get(Config.TOPOLOGY_MAX_SPOUT_PENDING) != null) {
            i = TypeUtils.getInteger(map.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)).intValue();
        }
        return i;
    }

    private void ensureDurationLessThanTimeout(long j, long j2) {
        if (j > j2) {
            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + j + " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS + " value " + j2);
        }
    }

    private void ensureCountLessThanMaxPending(int i, int i2) {
        if (i > i2) {
            throw new IllegalArgumentException("Window count (length + sliding interval) value " + i + " is more than " + Config.TOPOLOGY_MAX_SPOUT_PENDING + " value " + i2);
        }
    }

    protected void validate(Map<String, Object> map, BaseWindowedBolt.Count count, Long l, BaseWindowedBolt.Count count2, Long l2) {
        int topologyTimeoutMillis = getTopologyTimeoutMillis(map);
        int maxSpoutPending = getMaxSpoutPending(map);
        if (count == null && l == null) {
            throw new IllegalArgumentException("Window length is not specified");
        }
        if (l != null && l2 != null) {
            ensureDurationLessThanTimeout(l.longValue() + l2.longValue(), topologyTimeoutMillis);
        } else if (l != null) {
            ensureDurationLessThanTimeout(l.longValue(), topologyTimeoutMillis);
        } else if (l2 != null) {
            ensureDurationLessThanTimeout(l2.longValue(), topologyTimeoutMillis);
        }
        if (count != null && count2 != null) {
            ensureCountLessThanMaxPending(count.value + count2.value, maxSpoutPending);
        } else if (count != null) {
            ensureCountLessThanMaxPending(count.value, maxSpoutPending);
        } else if (count2 != null) {
            ensureCountLessThanMaxPending(count2.value, maxSpoutPending);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> windowLifecycleListener, Map<String, Object> map, TopologyContext topologyContext, Collection<Event<Tuple>> collection) {
        WindowManager<Tuple> windowManager = new WindowManager<>(windowLifecycleListener, collection);
        BaseWindowedBolt.Count count = DEFAULT_MAX_LAG_MS;
        Long l = DEFAULT_MAX_LAG_MS;
        BaseWindowedBolt.Count count2 = DEFAULT_MAX_LAG_MS;
        if (map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
            count = new BaseWindowedBolt.Count(((Number) map.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
        } else if (map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
            this.windowLengthDurationMs = (Long) map.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS);
        }
        if (map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
            count2 = new BaseWindowedBolt.Count(((Number) map.get(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
        } else if (map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
            l = (Long) map.get(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS);
        } else {
            count2 = new BaseWindowedBolt.Count(1);
        }
        if (this.timestampExtractor != null) {
            this.lateTupleStream = (String) map.get(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
            if (this.lateTupleStream != null && !topologyContext.getThisStreams().contains(this.lateTupleStream)) {
                throw new IllegalArgumentException("Stream for late tuples must be defined with the builder method withLateTupleStream");
            }
            if (map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)) {
                this.maxLagMs = ((Number) map.get(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS)).intValue();
            } else {
                this.maxLagMs = DEFAULT_MAX_LAG_MS;
            }
            this.waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS) ? ((Number) map.get(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS)).intValue() : 1000L, this.maxLagMs, getComponentStreams(topologyContext), map);
        } else if (map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM)) {
            throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
        }
        boolean containsKey = map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER);
        boolean containsKey2 = map.containsKey(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR);
        if (containsKey && containsKey2) {
            this.triggerPolicy = (TriggerPolicy) map.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_TRIGGER);
            this.evictionPolicy = (EvictionPolicy) map.get(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_CUSTOM_EVICTOR);
        } else {
            if (containsKey2 || containsKey) {
                throw new IllegalArgumentException("If either a custom TriggerPolicy or EvictionPolicy is defined, both must be.");
            }
            validate(map, count, this.windowLengthDurationMs, count2, l);
            this.evictionPolicy = getEvictionPolicy(count, this.windowLengthDurationMs);
            this.triggerPolicy = getTriggerPolicy(count2, l);
        }
        this.triggerPolicy.setEvictionPolicy(this.evictionPolicy);
        this.triggerPolicy.setTopologyConfig(map);
        this.triggerPolicy.setTriggerHandler(windowManager);
        this.triggerPolicy.setWindowManager(windowManager);
        windowManager.setEvictionPolicy(this.evictionPolicy);
        windowManager.setTriggerPolicy(this.triggerPolicy);
        if (this.state != null && this.state.get(WINDOWING_INTERNAL_STATE) != 0 && !((HashMapState) this.state.get(WINDOWING_INTERNAL_STATE)).isEmpty()) {
            windowManager.restoreState((Map) this.state.get(WINDOWING_INTERNAL_STATE));
        }
        return windowManager;
    }

    protected Map<String, Serializable> getState() {
        return this.windowManager.getState();
    }

    private Set<TopologyAPI.StreamId> getComponentStreams(TopologyContext topologyContext) {
        HashSet hashSet = new HashSet();
        Iterator<TopologyAPI.StreamId> it = topologyContext.getThisSources().keySet().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next());
        }
        return hashSet;
    }

    protected void start() {
        if (this.waterMarkEventGenerator != null) {
            LOG.fine("Starting waterMarkEventGenerator");
            this.waterMarkEventGenerator.start();
        }
        LOG.fine("Starting trigger policy");
        this.triggerPolicy.start();
    }

    private boolean isTupleTs() {
        return this.timestampExtractor != null;
    }

    private TriggerPolicy<Tuple, ?> getTriggerPolicy(BaseWindowedBolt.Count count, Long l) {
        return count != null ? isTupleTs() ? new WatermarkCountTriggerPolicy(count.value) : new CountTriggerPolicy(count.value) : isTupleTs() ? new WatermarkTimeTriggerPolicy(l.longValue()) : new TimeTriggerPolicy(l.longValue());
    }

    private EvictionPolicy<Tuple, ?> getEvictionPolicy(BaseWindowedBolt.Count count, Long l) {
        return count != null ? isTupleTs() ? new WatermarkCountEvictionPolicy(count.value) : new CountEvictionPolicy(count.value) : isTupleTs() ? new WatermarkTimeEvictionPolicy(l.longValue(), this.maxLagMs) : new TimeEvictionPolicy(l.longValue());
    }

    @Override // org.apache.heron.api.bolt.IBolt
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        doPrepare(map, topologyContext, outputCollector, new ConcurrentLinkedQueue());
    }

    protected void doPrepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector, Collection<Event<Tuple>> collection) {
        Objects.requireNonNull(map);
        Objects.requireNonNull(topologyContext);
        Objects.requireNonNull(outputCollector);
        Objects.requireNonNull(collection);
        this.windowedOutputCollector = new WindowedOutputCollector(outputCollector);
        this.bolt.prepare(map, topologyContext, this.windowedOutputCollector);
        this.listener = newWindowLifecycleListener();
        this.windowManager = initWindowManager(this.listener, map, topologyContext, collection);
        start();
        LOG.info(String.format("Initialized window manager %s", this.windowManager));
    }

    @Override // org.apache.heron.api.bolt.IBolt
    public void execute(Tuple tuple) {
        if (!isTupleTs()) {
            this.windowManager.add((WindowManager<Tuple>) tuple);
            return;
        }
        long extractTimestamp = this.timestampExtractor.extractTimestamp(tuple);
        if (this.waterMarkEventGenerator.track(tuple.getSourceGlobalStreamId(), extractTimestamp)) {
            this.windowManager.add(tuple, extractTimestamp);
            return;
        }
        if (this.lateTupleStream != null) {
            this.windowedOutputCollector.emit(this.lateTupleStream, tuple, new Values(tuple));
        } else {
            LOG.info(String.format("Received a late tuple %s with ts %d. This will not be processed.", tuple, Long.valueOf(extractTimestamp)));
        }
        this.windowedOutputCollector.ack(tuple);
    }

    @Override // org.apache.heron.api.bolt.IBolt
    public void cleanup() {
        if (this.windowManager != null) {
            this.windowManager.shutdown();
        }
        this.bolt.cleanup();
    }

    WindowManager<Tuple> getWindowManager() {
        return this.windowManager;
    }

    @Override // org.apache.heron.api.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        String str = (String) getComponentConfiguration().get(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM);
        if (str != null) {
            outputFieldsDeclarer.declareStream(str, new Fields(LATE_TUPLE_FIELD));
        }
        this.bolt.declareOutputFields(outputFieldsDeclarer);
    }

    @Override // org.apache.heron.api.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this.bolt.getComponentConfiguration();
    }

    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>() { // from class: org.apache.heron.api.bolt.WindowedBoltExecutor.1
            @Override // org.apache.heron.api.windowing.WindowLifecycleListener
            public void onExpiry(List<Tuple> list) {
                Iterator<Tuple> it = list.iterator();
                while (it.hasNext()) {
                    WindowedBoltExecutor.this.windowedOutputCollector.ack(it.next());
                }
            }

            @Override // org.apache.heron.api.windowing.WindowLifecycleListener
            public void onActivation(List<Tuple> list, List<Tuple> list2, List<Tuple> list3, Long l) {
                WindowedBoltExecutor.this.windowedOutputCollector.setContext(list);
                WindowedBoltExecutor.this.boltExecute(list, list2, list3, l);
            }
        };
    }

    protected void boltExecute(List<Tuple> list, List<Tuple> list2, List<Tuple> list3, Long l) {
        this.bolt.execute(new TupleWindowImpl(list, list2, list3, getWindowStartTs(l), l));
    }

    private Long getWindowStartTs(Long l) {
        Long l2 = DEFAULT_MAX_LAG_MS;
        if (l != null && this.windowLengthDurationMs != null) {
            l2 = Long.valueOf(l.longValue() - this.windowLengthDurationMs.longValue());
        }
        return l2;
    }

    public void initState(State<Serializable, Serializable> state) {
        if (state != null) {
            this.state = state;
            if (this.state.containsKey(WINDOWING_INTERNAL_STATE)) {
                return;
            }
            this.state.put(WINDOWING_INTERNAL_STATE, new HashMapState());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void preSave(String str) {
        if (this.state != null) {
            ((HashMapState) this.state.get(WINDOWING_INTERNAL_STATE)).putAll(getWindowManager().getState());
        }
    }
}
