package io.siddhi.extension.execution.unique;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.SchedulingProcessor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.Scheduler;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Extension(name = "deduplicate", namespace = "unique", description = "Removes duplicate events based on the `unique.key` parameter that arrive within the `time.interval` gap from one another.", parameters = {@Parameter(name = "unique.key", description = "Parameter to uniquely identify events.", type = {DataType.INT, DataType.LONG, DataType.FLOAT, DataType.BOOL, DataType.DOUBLE, DataType.STRING}, dynamic = true), @Parameter(name = "time.interval", description = "The sliding time period within which the duplicate events are dropped.", type = {DataType.INT, DataType.LONG})}, parameterOverloads = {@ParameterOverload(parameterNames = {"unique.key", "time.interval"})}, examples = {@Example(syntax = "define stream TemperatureStream (sensorId string, temperature double)\n\nfrom TemperatureStream#unique:deduplicate(sensorId, 30 sec)\nselect *\ninsert into UniqueTemperatureStream;", description = "Query that removes duplicate events of `TemperatureStream` stream based on `sensorId` attribute when they arrive within 30 seconds.")})
/* loaded from: input_file:io/siddhi/extension/execution/unique/DeduplicateStreamProcessor.class */
public class DeduplicateStreamProcessor extends StreamProcessor<DeduplicateState> implements SchedulingProcessor {
    private Scheduler scheduler;
    private ExpressionExecutor uniqueExpressionExecutor;
    private long timestampInterval;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/execution/unique/DeduplicateStreamProcessor$DeduplicateState.class */
    public class DeduplicateState extends State {
        private Map<String, StreamEvent> map = new LinkedHashMap();

        DeduplicateState() {
        }

        synchronized boolean process(StreamEvent streamEvent, StreamEventCloner streamEventCloner) {
            Iterator<StreamEvent> it = this.map.values().iterator();
            while (it.hasNext()) {
                if (streamEvent.getTimestamp() - it.next().getTimestamp() <= DeduplicateStreamProcessor.this.timestampInterval) {
                    break;
                }
                it.remove();
            }
            if (streamEvent.getType() == ComplexEvent.Type.TIMER) {
                return true;
            }
            StreamEvent copyStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
            String obj = DeduplicateStreamProcessor.this.uniqueExpressionExecutor.execute(streamEvent).toString();
            if (null != this.map.get(obj)) {
                return true;
            }
            this.map.put(obj, copyStreamEvent);
            DeduplicateStreamProcessor.this.scheduler.notifyAt(streamEvent.getTimestamp() + DeduplicateStreamProcessor.this.timestampInterval);
            return false;
        }

        public synchronized boolean canDestroy() {
            return this.map.isEmpty();
        }

        public synchronized Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.putAll(this.map);
            hashMap.put("map", linkedHashMap);
            return hashMap;
        }

        public synchronized void restore(Map<String, Object> map) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.putAll((Map) map.get("map"));
            this.map = linkedHashMap;
        }
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    protected StateFactory<DeduplicateState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        this.timestampInterval = ((Long) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).longValue();
        this.uniqueExpressionExecutor = expressionExecutorArr[0];
        return () -> {
            return new DeduplicateState();
        };
    }

    public List<Attribute> getReturnAttributes() {
        return new ArrayList();
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, DeduplicateState deduplicateState) {
        while (complexEventChunk.hasNext()) {
            if (deduplicateState.process((StreamEvent) complexEventChunk.next(), streamEventCloner)) {
                complexEventChunk.remove();
            }
        }
        processor.process(complexEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public void start() {
    }

    public void stop() {
    }

    protected /* bridge */ /* synthetic */ void process(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        process((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (DeduplicateState) state);
    }
}
