/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Extension(name="deduplicate", namespace="unique", description="Removes duplicate events based on the `unique.key` parameter that arrive within the `time.interval` gap from one another.", parameters={@Parameter(name="unique.key", description="Parameter to uniquely identify events.", type={DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic=true), @Parameter(name="time.interval", description="The sliding time period within which the duplicate events are dropped.", type={DataType.INT, DataType.LONG})}, parameterOverloads={@ParameterOverload(parameterNames={"unique.key", "time.interval"})}, examples={@Example(syntax="define stream TemperatureStream (sensorId string, temperature double)\n\nfrom TemperatureStream#unique:deduplicate(sensorId, 30 sec)\nselect *\ninsert into UniqueTemperatureStream;", description="Query that removes duplicate events of `TemperatureStream` stream based on `sensorId` attribute when they arrive within 30 seconds.")})
public class DeduplicateStreamProcessor
extends StreamProcessor<DeduplicateState>
implements SchedulingProcessor {
    private Scheduler scheduler;
    private ExpressionExecutor uniqueExpressionExecutor;
    private long timestampInterval;

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    protected StateFactory<DeduplicateState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean outputExpectsExpiredEvents, boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) {
        this.timestampInterval = (Long)((ConstantExpressionExecutor)attributeExpressionExecutors[1]).getValue();
        this.uniqueExpressionExecutor = attributeExpressionExecutors[0];
        return () -> new DeduplicateState();
    }

    public List<Attribute> getReturnAttributes() {
        return new ArrayList<Attribute>();
    }

    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, DeduplicateState state) {
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
            if (!state.process(streamEvent, streamEventCloner)) continue;
            streamEventChunk.remove();
        }
        nextProcessor.process(streamEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public void start() {
    }

    public void stop() {
    }

    class DeduplicateState
    extends State {
        private Map<String, StreamEvent> map = new LinkedHashMap<String, StreamEvent>();

        DeduplicateState() {
        }

        synchronized boolean process(StreamEvent streamEvent, StreamEventCloner streamEventCloner) {
            Iterator<StreamEvent> iterator = this.map.values().iterator();
            while (iterator.hasNext()) {
                StreamEvent existingEvent = iterator.next();
                if (streamEvent.getTimestamp() - existingEvent.getTimestamp() <= DeduplicateStreamProcessor.this.timestampInterval) break;
                iterator.remove();
            }
            if (streamEvent.getType() != ComplexEvent.Type.TIMER) {
                StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
                String key = DeduplicateStreamProcessor.this.uniqueExpressionExecutor.execute((ComplexEvent)streamEvent).toString();
                StreamEvent existingEvent = this.map.get(key);
                if (null == existingEvent) {
                    this.map.put(key, clonedEvent);
                    DeduplicateStreamProcessor.this.scheduler.notifyAt(streamEvent.getTimestamp() + DeduplicateStreamProcessor.this.timestampInterval);
                    return false;
                }
            }
            return true;
        }

        public synchronized boolean canDestroy() {
            return this.map.isEmpty();
        }

        public synchronized Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            LinkedHashMap<String, StreamEvent> newMap = new LinkedHashMap<String, StreamEvent>();
            newMap.putAll(this.map);
            state.put("map", newMap);
            return state;
        }

        public synchronized void restore(Map<String, Object> state) {
            LinkedHashMap<String, StreamEvent> newMap = new LinkedHashMap<String, StreamEvent>();
            newMap.putAll((Map)state.get("map"));
            this.map = newMap;
        }
    }
}

