/*
 * Decompiled with CFR 0.152.
 */
package org.apache.heron.api.bolt;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Logger;
import org.apache.heron.api.bolt.BaseWindowedBolt;
import org.apache.heron.api.bolt.IOutputCollector;
import org.apache.heron.api.bolt.IRichBolt;
import org.apache.heron.api.bolt.IWindowedBolt;
import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.state.HashMapState;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.api.windowing.Event;
import org.apache.heron.api.windowing.EvictionPolicy;
import org.apache.heron.api.windowing.TimestampExtractor;
import org.apache.heron.api.windowing.TriggerPolicy;
import org.apache.heron.api.windowing.TupleWindowImpl;
import org.apache.heron.api.windowing.WaterMarkEventGenerator;
import org.apache.heron.api.windowing.WindowLifecycleListener;
import org.apache.heron.api.windowing.WindowManager;
import org.apache.heron.api.windowing.evictors.CountEvictionPolicy;
import org.apache.heron.api.windowing.evictors.TimeEvictionPolicy;
import org.apache.heron.api.windowing.evictors.WatermarkCountEvictionPolicy;
import org.apache.heron.api.windowing.evictors.WatermarkTimeEvictionPolicy;
import org.apache.heron.api.windowing.triggers.CountTriggerPolicy;
import org.apache.heron.api.windowing.triggers.TimeTriggerPolicy;
import org.apache.heron.api.windowing.triggers.WatermarkCountTriggerPolicy;
import org.apache.heron.api.windowing.triggers.WatermarkTimeTriggerPolicy;
import org.apache.heron.common.basics.TypeUtils;

public class WindowedBoltExecutor
implements IRichBolt,
IStatefulComponent<Serializable, Serializable> {
    private static final long serialVersionUID = -9204275913034895392L;
    private static final Logger LOG = Logger.getLogger(WindowedBoltExecutor.class.getName());
    private static final int DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000;
    private static final int DEFAULT_MAX_LAG_MS = 0;
    public static final String LATE_TUPLE_FIELD = "late_tuple";
    private final IWindowedBolt bolt;
    private transient WindowedOutputCollector windowedOutputCollector;
    private transient WindowLifecycleListener<Tuple> listener;
    private transient WindowManager<Tuple> windowManager;
    private transient int maxLagMs;
    private TimestampExtractor timestampExtractor;
    private transient String lateTupleStream;
    private transient TriggerPolicy<Tuple, ?> triggerPolicy;
    private transient EvictionPolicy<Tuple, ?> evictionPolicy;
    private transient Long windowLengthDurationMs;
    private State<Serializable, Serializable> state;
    private static final String WINDOWING_INTERNAL_STATE = "windowing.internal.state";
    protected transient WaterMarkEventGenerator<Tuple> waterMarkEventGenerator;

    public WindowedBoltExecutor(IWindowedBolt bolt) {
        this.bolt = bolt;
        this.timestampExtractor = bolt.getTimestampExtractor();
    }

    protected int getTopologyTimeoutMillis(Map<String, Object> topoConf) {
        boolean timeOutsEnabled;
        if (topoConf.get("topology.enable.message.timeouts") != null && !(timeOutsEnabled = Boolean.parseBoolean((String)topoConf.get("topology.enable.message.timeouts")))) {
            return Integer.MAX_VALUE;
        }
        int timeout = 0;
        if (topoConf.get("topology.message.timeout.secs") != null) {
            timeout = TypeUtils.getInteger(topoConf.get("topology.message.timeout.secs"));
        }
        return timeout * 1000;
    }

    private int getMaxSpoutPending(Map<String, Object> topoConf) {
        int maxPending = Integer.MAX_VALUE;
        if (topoConf.get("topology.max.spout.pending") != null) {
            maxPending = TypeUtils.getInteger(topoConf.get("topology.max.spout.pending"));
        }
        return maxPending;
    }

    private void ensureDurationLessThanTimeout(long duration, long timeout) {
        if (duration > timeout) {
            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration + " is more than " + "topology.message.timeout.secs" + " value " + timeout);
        }
    }

    private void ensureCountLessThanMaxPending(int count, int maxPending) {
        if (count > maxPending) {
            throw new IllegalArgumentException("Window count (length + sliding interval) value " + count + " is more than " + "topology.max.spout.pending" + " value " + maxPending);
        }
    }

    protected void validate(Map<String, Object> topoConf, BaseWindowedBolt.Count windowLengthCount, Long windowLengthDuration, BaseWindowedBolt.Count slidingIntervalCount, Long slidingIntervalDuration) {
        int topologyTimeout = this.getTopologyTimeoutMillis(topoConf);
        int maxSpoutPending = this.getMaxSpoutPending(topoConf);
        if (windowLengthCount == null && windowLengthDuration == null) {
            throw new IllegalArgumentException("Window length is not specified");
        }
        if (windowLengthDuration != null && slidingIntervalDuration != null) {
            this.ensureDurationLessThanTimeout(windowLengthDuration + slidingIntervalDuration, topologyTimeout);
        } else if (windowLengthDuration != null) {
            this.ensureDurationLessThanTimeout(windowLengthDuration, topologyTimeout);
        } else if (slidingIntervalDuration != null) {
            this.ensureDurationLessThanTimeout(slidingIntervalDuration, topologyTimeout);
        }
        if (windowLengthCount != null && slidingIntervalCount != null) {
            this.ensureCountLessThanMaxPending(windowLengthCount.value + slidingIntervalCount.value, maxSpoutPending);
        } else if (windowLengthCount != null) {
            this.ensureCountLessThanMaxPending(windowLengthCount.value, maxSpoutPending);
        } else if (slidingIntervalCount != null) {
            this.ensureCountLessThanMaxPending(slidingIntervalCount.value, maxSpoutPending);
        }
    }

    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map<String, Object> topoConf, TopologyContext context, Collection<Event<Tuple>> queue) {
        WindowManager<Tuple> manager = new WindowManager<Tuple>(lifecycleListener, queue);
        BaseWindowedBolt.Count windowLengthCount = null;
        Long slidingIntervalDurationMs = null;
        BaseWindowedBolt.Count slidingIntervalCount = null;
        if (topoConf.containsKey("topology.bolts.window.length.count")) {
            windowLengthCount = new BaseWindowedBolt.Count(((Number)topoConf.get("topology.bolts.window.length.count")).intValue());
        } else if (topoConf.containsKey("topology.bolts.window.length.duration.ms")) {
            this.windowLengthDurationMs = (Long)topoConf.get("topology.bolts.window.length.duration.ms");
        }
        if (topoConf.containsKey("topology.bolts.window.sliding.interval.count")) {
            slidingIntervalCount = new BaseWindowedBolt.Count(((Number)topoConf.get("topology.bolts.window.sliding.interval.count")).intValue());
        } else if (topoConf.containsKey("topology.bolts.window.sliding.interval.duration.ms")) {
            slidingIntervalDurationMs = (Long)topoConf.get("topology.bolts.window.sliding.interval.duration.ms");
        } else {
            slidingIntervalCount = new BaseWindowedBolt.Count(1);
        }
        if (this.timestampExtractor != null) {
            this.lateTupleStream = (String)topoConf.get("topology.bolts.late.tuple.stream");
            if (this.lateTupleStream != null && !context.getThisStreams().contains(this.lateTupleStream)) {
                throw new IllegalArgumentException("Stream for late tuples must be defined with the builder method withLateTupleStream");
            }
            this.maxLagMs = topoConf.containsKey("topology.bolts.tuple.timestamp.max.lag.ms") ? ((Number)topoConf.get("topology.bolts.tuple.timestamp.max.lag.ms")).intValue() : 0;
            long watermarkIntervalMs = topoConf.containsKey("topology.bolts.watermark.event.interval.ms") ? (long)((Number)topoConf.get("topology.bolts.watermark.event.interval.ms")).intValue() : 1000L;
            this.waterMarkEventGenerator = new WaterMarkEventGenerator<Tuple>(manager, watermarkIntervalMs, this.maxLagMs, this.getComponentStreams(context), topoConf);
        } else if (topoConf.containsKey("topology.bolts.late.tuple.stream")) {
            throw new IllegalArgumentException("Late tuple stream can be defined only when specifying a timestamp field");
        }
        boolean hasCustomTrigger = topoConf.containsKey("topology.bolts.window.custom.trigger");
        boolean hasCustomEvictor = topoConf.containsKey("topology.bolts.window.custom.evictor");
        if (hasCustomTrigger && hasCustomEvictor) {
            this.triggerPolicy = (TriggerPolicy)topoConf.get("topology.bolts.window.custom.trigger");
            this.evictionPolicy = (EvictionPolicy)topoConf.get("topology.bolts.window.custom.evictor");
        } else if (!hasCustomEvictor && !hasCustomTrigger) {
            this.validate(topoConf, windowLengthCount, this.windowLengthDurationMs, slidingIntervalCount, slidingIntervalDurationMs);
            this.evictionPolicy = this.getEvictionPolicy(windowLengthCount, this.windowLengthDurationMs);
            this.triggerPolicy = this.getTriggerPolicy(slidingIntervalCount, slidingIntervalDurationMs);
        } else {
            throw new IllegalArgumentException("If either a custom TriggerPolicy or EvictionPolicy is defined, both must be.");
        }
        this.triggerPolicy.setEvictionPolicy(this.evictionPolicy);
        this.triggerPolicy.setTopologyConfig(topoConf);
        this.triggerPolicy.setTriggerHandler(manager);
        this.triggerPolicy.setWindowManager(manager);
        manager.setEvictionPolicy(this.evictionPolicy);
        manager.setTriggerPolicy(this.triggerPolicy);
        if (this.state != null && this.state.get(WINDOWING_INTERNAL_STATE) != null && !((HashMapState)this.state.get(WINDOWING_INTERNAL_STATE)).isEmpty()) {
            manager.restoreState((Map)this.state.get(WINDOWING_INTERNAL_STATE));
        }
        return manager;
    }

    protected Map<String, Serializable> getState() {
        return this.windowManager.getState();
    }

    private Set<TopologyAPI.StreamId> getComponentStreams(TopologyContext context) {
        HashSet<TopologyAPI.StreamId> streams = new HashSet<TopologyAPI.StreamId>();
        for (TopologyAPI.StreamId streamId : context.getThisSources().keySet()) {
            streams.add(streamId);
        }
        return streams;
    }

    protected void start() {
        if (this.waterMarkEventGenerator != null) {
            LOG.fine("Starting waterMarkEventGenerator");
            this.waterMarkEventGenerator.start();
        }
        LOG.fine("Starting trigger policy");
        this.triggerPolicy.start();
    }

    private boolean isTupleTs() {
        return this.timestampExtractor != null;
    }

    private TriggerPolicy<Tuple, ?> getTriggerPolicy(BaseWindowedBolt.Count slidingIntervalCount, Long slidingIntervalDurationMs) {
        if (slidingIntervalCount != null) {
            if (this.isTupleTs()) {
                return new WatermarkCountTriggerPolicy<Tuple>(slidingIntervalCount.value);
            }
            return new CountTriggerPolicy<Tuple>(slidingIntervalCount.value);
        }
        if (this.isTupleTs()) {
            return new WatermarkTimeTriggerPolicy<Tuple>(slidingIntervalDurationMs);
        }
        return new TimeTriggerPolicy<Tuple>(slidingIntervalDurationMs);
    }

    private EvictionPolicy<Tuple, ?> getEvictionPolicy(BaseWindowedBolt.Count windowLengthCount, Long windowLengthDurationMs) {
        if (windowLengthCount != null) {
            if (this.isTupleTs()) {
                return new WatermarkCountEvictionPolicy<Tuple>(windowLengthCount.value);
            }
            return new CountEvictionPolicy<Tuple>(windowLengthCount.value);
        }
        if (this.isTupleTs()) {
            return new WatermarkTimeEvictionPolicy<Tuple>(windowLengthDurationMs, this.maxLagMs);
        }
        return new TimeEvictionPolicy<Tuple>(windowLengthDurationMs);
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.doPrepare(topoConf, context, collector, new ConcurrentLinkedQueue<Event<Tuple>>());
    }

    protected void doPrepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector, Collection<Event<Tuple>> queue) {
        Objects.requireNonNull(topoConf);
        Objects.requireNonNull(context);
        Objects.requireNonNull(collector);
        Objects.requireNonNull(queue);
        this.windowedOutputCollector = new WindowedOutputCollector(collector);
        this.bolt.prepare(topoConf, context, this.windowedOutputCollector);
        this.listener = this.newWindowLifecycleListener();
        this.windowManager = this.initWindowManager(this.listener, topoConf, context, queue);
        this.start();
        LOG.info(String.format("Initialized window manager %s", this.windowManager));
    }

    @Override
    public void execute(Tuple input) {
        if (this.isTupleTs()) {
            long ts = this.timestampExtractor.extractTimestamp(input);
            if (this.waterMarkEventGenerator.track(input.getSourceGlobalStreamId(), ts)) {
                this.windowManager.add(input, ts);
            } else {
                if (this.lateTupleStream != null) {
                    this.windowedOutputCollector.emit(this.lateTupleStream, input, (List<Object>)new Values(input));
                } else {
                    LOG.info(String.format("Received a late tuple %s with ts %d. This will not be processed.", input, ts));
                }
                this.windowedOutputCollector.ack(input);
            }
        } else {
            this.windowManager.add(input);
        }
    }

    @Override
    public void cleanup() {
        if (this.windowManager != null) {
            this.windowManager.shutdown();
        }
        this.bolt.cleanup();
    }

    WindowManager<Tuple> getWindowManager() {
        return this.windowManager;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String lateTupleStream = (String)this.getComponentConfiguration().get("topology.bolts.late.tuple.stream");
        if (lateTupleStream != null) {
            declarer.declareStream(lateTupleStream, new Fields(LATE_TUPLE_FIELD));
        }
        this.bolt.declareOutputFields(declarer);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this.bolt.getComponentConfiguration();
    }

    protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
        return new WindowLifecycleListener<Tuple>(){

            @Override
            public void onExpiry(List<Tuple> tuples) {
                for (Tuple tuple : tuples) {
                    WindowedBoltExecutor.this.windowedOutputCollector.ack(tuple);
                }
            }

            @Override
            public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
                WindowedBoltExecutor.this.windowedOutputCollector.setContext(tuples);
                WindowedBoltExecutor.this.boltExecute(tuples, newTuples, expiredTuples, timestamp);
            }
        };
    }

    protected void boltExecute(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples, Long timestamp) {
        this.bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples, this.getWindowStartTs(timestamp), timestamp));
    }

    private Long getWindowStartTs(Long endTs) {
        Long res = null;
        if (endTs != null && this.windowLengthDurationMs != null) {
            res = endTs - this.windowLengthDurationMs;
        }
        return res;
    }

    @Override
    public void initState(State<Serializable, Serializable> state) {
        if (state != null) {
            this.state = state;
            if (!this.state.containsKey(WINDOWING_INTERNAL_STATE)) {
                this.state.put((Serializable)((Object)WINDOWING_INTERNAL_STATE), new HashMapState());
            }
        }
    }

    @Override
    public void preSave(String checkpointId) {
        if (this.state != null) {
            ((HashMapState)this.state.get(WINDOWING_INTERNAL_STATE)).putAll(this.getWindowManager().getState());
        }
    }

    private static class WindowedOutputCollector
    extends OutputCollector {
        private List<Tuple> inputTuples;

        WindowedOutputCollector(IOutputCollector delegate) {
            super(delegate);
        }

        void setContext(List<Tuple> inputTuples) {
            this.inputTuples = inputTuples;
        }

        @Override
        public List<Integer> emit(String streamId, List<Object> tuple) {
            return this.emit(streamId, this.inputTuples, tuple);
        }

        @Override
        public void emitDirect(int taskId, String streamId, List<Object> tuple) {
            this.emitDirect(taskId, streamId, this.inputTuples, tuple);
        }
    }
}

