/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.query.selector.attribute.aggregator;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.exception.OperationNotSupportedException;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.selector.attribute.aggregator.AttributeAggregatorExecutor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.Attribute;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Extension(name="unionSet", namespace="", description="Union multiple sets. \n This attribute aggregator maintains a union of sets. The given input set is put into the union set and the union set is returned.", parameters={@Parameter(name="set", description="The java.util.Set object that needs to be added into the union set.", type={DataType.OBJECT})}, returnAttributes={@ReturnAttribute(description="Returns a java.util.Set object which is the union of aggregated sets", type={DataType.OBJECT})}, examples={@Example(syntax="from stockStream \nselect createSet(symbol) as initialSet \ninsert into initStream \n\nfrom initStream#window.timeBatch(10 sec) \nselect unionSet(initialSet) as distinctSymbols \ninsert into distinctStockStream;", description="distinctStockStream will return the set object which contains the distinct set of stock symbols received during a sliding window of 10 seconds.")})
public class UnionSetAttributeAggregatorExecutor
extends AttributeAggregatorExecutor<AggregatorState> {
    @Override
    protected StateFactory<AggregatorState> init(ExpressionExecutor[] attributeExpressionExecutors, ProcessingMode processingMode, boolean outputExpectsExpiredEvents, ConfigReader configReader, SiddhiQueryContext siddhiQueryContext) {
        if (attributeExpressionExecutors.length != 1) {
            throw new OperationNotSupportedException("unionSet aggregator has to have exactly 1 parameter, currently " + attributeExpressionExecutors.length + " parameters provided");
        }
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.OBJECT) {
            throw new OperationNotSupportedException("Parameter passed to unionSet aggregator should be of type object but found: " + attributeExpressionExecutors[0].getReturnType());
        }
        return () -> new AggregatorState(processingMode, outputExpectsExpiredEvents);
    }

    @Override
    public Attribute.Type getReturnType() {
        return Attribute.Type.OBJECT;
    }

    @Override
    public Object processAdd(Object data, AggregatorState state) {
        Set inputSet = (Set)data;
        for (Object o : inputSet) {
            state.set.add(o);
            if (state.counter == null) continue;
            Integer currentCount = (Integer)state.counter.get(o);
            if (currentCount == null) {
                state.counter.put(o, 1);
                continue;
            }
            state.counter.put(o, currentCount + 1);
        }
        HashSet returnSet = new HashSet();
        returnSet.addAll(state.set);
        return returnSet;
    }

    @Override
    public Object processAdd(Object[] data, AggregatorState state) {
        return null;
    }

    @Override
    public Object processRemove(Object data, AggregatorState state) {
        Set newSet = (Set)data;
        for (Object o : newSet) {
            if (state.counter != null) {
                Integer currentCount = (Integer)state.counter.get(o);
                if (currentCount == null) continue;
                if (currentCount == 0) {
                    throw new IllegalStateException("Error occurred when removing element from union-set for element: " + o.toString());
                }
                if (currentCount == 1) {
                    state.set.remove(o);
                    continue;
                }
                state.counter.put(o, currentCount - 1);
                continue;
            }
            state.set.remove(o);
        }
        HashSet returnSet = new HashSet();
        returnSet.addAll(state.set);
        return returnSet;
    }

    @Override
    public Object processRemove(Object[] data, AggregatorState state) {
        return null;
    }

    @Override
    public Object reset(AggregatorState state) {
        state.set.clear();
        if (state.counter != null) {
            state.counter.clear();
        }
        HashSet returnSet = new HashSet();
        return returnSet;
    }

    class AggregatorState
    extends State {
        private Map<Object, Integer> counter = null;
        private Set set = new HashSet();

        public AggregatorState(ProcessingMode processingMode, boolean outputExpectsExpiredEvents) {
            if (processingMode == ProcessingMode.SLIDE || outputExpectsExpiredEvents) {
                this.counter = new HashMap<Object, Integer>();
            }
        }

        @Override
        public boolean canDestroy() {
            return this.set.isEmpty();
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            state.put("Set", this.set);
            state.put("Counter", this.counter);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            this.set = (Set)state.get("Set");
            this.counter = (Map)state.get("Counter");
        }
    }
}

