/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.partition;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.state.MetaStateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.partition.PartitionRuntime;
import io.siddhi.core.partition.PartitionStreamReceiver;
import io.siddhi.core.partition.StreamPartitioner;
import io.siddhi.core.partition.executor.PartitionExecutor;
import io.siddhi.core.query.QueryRuntime;
import io.siddhi.core.query.QueryRuntimeImpl;
import io.siddhi.core.query.input.stream.join.JoinStreamRuntime;
import io.siddhi.core.query.input.stream.single.SingleStreamRuntime;
import io.siddhi.core.query.input.stream.state.StateStreamRuntime;
import io.siddhi.core.query.output.callback.InsertIntoStreamCallback;
import io.siddhi.core.query.output.callback.InsertIntoWindowCallback;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.core.util.parser.helper.DefinitionParserHelper;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.util.statistics.MemoryUsageTracker;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.exception.DuplicateAnnotationException;
import io.siddhi.query.api.execution.partition.Partition;
import io.siddhi.query.api.execution.query.Query;
import io.siddhi.query.api.execution.query.input.state.CountStateElement;
import io.siddhi.query.api.execution.query.input.state.EveryStateElement;
import io.siddhi.query.api.execution.query.input.state.LogicalStateElement;
import io.siddhi.query.api.execution.query.input.state.NextStateElement;
import io.siddhi.query.api.execution.query.input.state.StateElement;
import io.siddhi.query.api.execution.query.input.state.StreamStateElement;
import io.siddhi.query.api.execution.query.input.stream.BasicSingleInputStream;
import io.siddhi.query.api.execution.query.input.stream.JoinInputStream;
import io.siddhi.query.api.execution.query.input.stream.SingleInputStream;
import io.siddhi.query.api.execution.query.input.stream.StateInputStream;
import io.siddhi.query.api.execution.query.output.stream.InsertIntoStream;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.util.AnnotationHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

public class PartitionRuntimeImpl
implements PartitionRuntime {
    private final StateHolder<PartitionState> stateHolder;
    private long purgeExecutionInterval = 300000L;
    private boolean purgingEnabled = false;
    private long purgeIdlePeriod = 0L;
    private String partitionName;
    private Partition partition;
    private ConcurrentMap<String, StreamJunction> localStreamJunctionMap = new ConcurrentHashMap<String, StreamJunction>();
    private ConcurrentMap<String, StreamJunction> innerPartitionStreamReceiverStreamJunctionMap = new ConcurrentHashMap<String, StreamJunction>();
    private ConcurrentMap<String, AbstractDefinition> localStreamDefinitionMap = new ConcurrentHashMap<String, AbstractDefinition>();
    private ConcurrentMap<String, AbstractDefinition> streamDefinitionMap;
    private ConcurrentMap<String, AbstractDefinition> windowDefinitionMap;
    private ConcurrentMap<String, StreamJunction> streamJunctionMap;
    private List<QueryRuntime> queryRuntimeList = new ArrayList<QueryRuntime>();
    private ConcurrentMap<String, PartitionStreamReceiver> partitionStreamReceivers = new ConcurrentHashMap<String, PartitionStreamReceiver>();
    private SiddhiAppContext siddhiAppContext;

    public PartitionRuntimeImpl(ConcurrentMap<String, AbstractDefinition> streamDefinitionMap, ConcurrentMap<String, AbstractDefinition> windowDefinitionMap, ConcurrentMap<String, StreamJunction> streamJunctionMap, Partition partition, int partitionIndex, SiddhiAppContext siddhiAppContext) {
        Annotation purge;
        this.siddhiAppContext = siddhiAppContext;
        if (partition.getPartitionTypeMap().isEmpty()) {
            throw new SiddhiAppCreationException("Partition must have at least one partition executor. But found none.");
        }
        try {
            Element element = AnnotationHelper.getAnnotationElement((String)"info", (String)"name", (List)partition.getAnnotations());
            if (element != null) {
                this.partitionName = element.getValue();
            }
        }
        catch (DuplicateAnnotationException e) {
            throw new DuplicateAnnotationException(e.getMessageWithOutContext() + " for the same Query " + partition.toString(), (Throwable)e, e.getQueryContextStartIndex(), e.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
        }
        if (this.partitionName == null) {
            this.partitionName = "partition_" + partitionIndex;
        }
        if ((purge = AnnotationHelper.getAnnotation((String)"purge", (List)partition.getAnnotations())) != null) {
            String purgeEnable;
            if (purge.getElement("enable") != null) {
                purgeEnable = purge.getElement("enable");
                if (!"true".equalsIgnoreCase(purgeEnable) && !"false".equalsIgnoreCase(purgeEnable)) {
                    throw new SiddhiAppCreationException("Invalid value for enable: " + purgeEnable + ". Please use 'true' or 'false'");
                }
            } else {
                throw new SiddhiAppCreationException("Annotation @purge is missing element 'enable'");
            }
            this.purgingEnabled = Boolean.parseBoolean(purgeEnable);
            if (purge.getElement("idle.period") == null) {
                throw new SiddhiAppCreationException("Annotation @purge is missing element 'idle.period'");
            }
            String purgeIdle = purge.getElement("idle.period");
            this.purgeIdlePeriod = Expression.Time.timeToLong((String)purgeIdle);
            if (purge.getElement("interval") != null) {
                String interval = purge.getElement("interval");
                this.purgeExecutionInterval = Expression.Time.timeToLong((String)interval);
            }
        }
        this.partition = partition;
        this.streamDefinitionMap = streamDefinitionMap;
        this.windowDefinitionMap = windowDefinitionMap;
        this.streamJunctionMap = streamJunctionMap;
        this.stateHolder = siddhiAppContext.generateStateHolder(this.partitionName, () -> new PartitionState());
    }

    public void addQuery(QueryRuntimeImpl metaQueryRuntime) {
        StreamJunction outputStreamJunction;
        String id;
        StreamDefinition streamDefinition;
        Query query = metaQueryRuntime.getQuery();
        if (query.getOutputStream() instanceof InsertIntoStream && metaQueryRuntime.getOutputCallback() instanceof InsertIntoStreamCallback) {
            InsertIntoStreamCallback insertIntoStreamCallback = (InsertIntoStreamCallback)metaQueryRuntime.getOutputCallback();
            streamDefinition = insertIntoStreamCallback.getOutputStreamDefinition();
            id = streamDefinition.getId();
            if (((InsertIntoStream)query.getOutputStream()).isInnerStream()) {
                metaQueryRuntime.setToLocalStream(true);
                this.localStreamDefinitionMap.putIfAbsent(id, (AbstractDefinition)streamDefinition);
                DefinitionParserHelper.validateOutputStream(streamDefinition, (AbstractDefinition)this.localStreamDefinitionMap.get(id));
                outputStreamJunction = (StreamJunction)this.localStreamJunctionMap.get(id);
                if (outputStreamJunction == null) {
                    outputStreamJunction = new StreamJunction(streamDefinition, this.siddhiAppContext.getExecutorService(), this.siddhiAppContext.getBufferSize(), null, this.siddhiAppContext);
                    this.localStreamJunctionMap.putIfAbsent(id, outputStreamJunction);
                }
                insertIntoStreamCallback.init((StreamJunction)this.localStreamJunctionMap.get(id));
            } else {
                this.streamDefinitionMap.putIfAbsent(id, (AbstractDefinition)streamDefinition);
                DefinitionParserHelper.validateOutputStream(streamDefinition, (AbstractDefinition)this.streamDefinitionMap.get(id));
                outputStreamJunction = (StreamJunction)this.streamJunctionMap.get(id);
                if (outputStreamJunction == null) {
                    outputStreamJunction = new StreamJunction(streamDefinition, this.siddhiAppContext.getExecutorService(), this.siddhiAppContext.getBufferSize(), null, this.siddhiAppContext);
                    this.streamJunctionMap.putIfAbsent(id, outputStreamJunction);
                }
                insertIntoStreamCallback.init((StreamJunction)this.streamJunctionMap.get(id));
            }
        } else if (query.getOutputStream() instanceof InsertIntoStream && metaQueryRuntime.getOutputCallback() instanceof InsertIntoWindowCallback) {
            InsertIntoWindowCallback insertIntoWindowCallback = (InsertIntoWindowCallback)metaQueryRuntime.getOutputCallback();
            streamDefinition = insertIntoWindowCallback.getOutputStreamDefinition();
            id = streamDefinition.getId();
            DefinitionParserHelper.validateOutputStream(streamDefinition, (AbstractDefinition)this.windowDefinitionMap.get(id));
            outputStreamJunction = (StreamJunction)this.streamJunctionMap.get(id);
            if (outputStreamJunction == null) {
                outputStreamJunction = new StreamJunction(streamDefinition, this.siddhiAppContext.getExecutorService(), this.siddhiAppContext.getBufferSize(), null, this.siddhiAppContext);
                this.streamJunctionMap.putIfAbsent(id, outputStreamJunction);
            }
            insertIntoWindowCallback.getWindow().setPublisher(((StreamJunction)this.streamJunctionMap.get(insertIntoWindowCallback.getOutputStreamDefinition().getId())).constructPublisher());
        }
        if (metaQueryRuntime.isFromLocalStream()) {
            for (int i = 0; i < metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().size(); ++i) {
                String streamId = metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().get(i).getProcessStreamReceiver().getStreamId();
                if (!streamId.startsWith("#")) continue;
                StreamDefinition streamDefinition2 = (StreamDefinition)this.localStreamDefinitionMap.get(streamId);
                StreamJunction streamJunction = (StreamJunction)this.localStreamJunctionMap.get(streamId);
                if (streamJunction == null) {
                    streamJunction = new StreamJunction(streamDefinition2, this.siddhiAppContext.getExecutorService(), this.siddhiAppContext.getBufferSize(), null, this.siddhiAppContext);
                    this.localStreamJunctionMap.put(streamId, streamJunction);
                }
                streamJunction.subscribe(metaQueryRuntime.getStreamRuntime().getSingleStreamRuntimes().get(i).getProcessStreamReceiver());
            }
        }
        this.queryRuntimeList.add(metaQueryRuntime);
    }

    public void addPartitionReceiver(QueryRuntimeImpl queryRuntime, List<VariableExpressionExecutor> executors, MetaStateEvent metaEvent) {
        Query query = queryRuntime.getQuery();
        List<List<PartitionExecutor>> partitionExecutors = new StreamPartitioner(query.getInputStream(), this.partition, metaEvent, executors, queryRuntime.getSiddhiQueryContext()).getPartitionExecutorLists();
        if (queryRuntime.getStreamRuntime() instanceof SingleStreamRuntime) {
            SingleInputStream singleInputStream = (SingleInputStream)query.getInputStream();
            this.addPartitionReceiver(singleInputStream.getStreamId(), singleInputStream.isInnerStream(), metaEvent.getMetaStreamEvent(0), partitionExecutors.get(0));
        } else if (queryRuntime.getStreamRuntime() instanceof JoinStreamRuntime) {
            SingleInputStream leftSingleInputStream = (SingleInputStream)((JoinInputStream)query.getInputStream()).getLeftInputStream();
            this.addPartitionReceiver(leftSingleInputStream.getStreamId(), leftSingleInputStream.isInnerStream(), metaEvent.getMetaStreamEvent(0), partitionExecutors.get(0));
            SingleInputStream rightSingleInputStream = (SingleInputStream)((JoinInputStream)query.getInputStream()).getRightInputStream();
            this.addPartitionReceiver(rightSingleInputStream.getStreamId(), rightSingleInputStream.isInnerStream(), metaEvent.getMetaStreamEvent(1), partitionExecutors.get(1));
        } else if (queryRuntime.getStreamRuntime() instanceof StateStreamRuntime) {
            StateElement stateElement = ((StateInputStream)query.getInputStream()).getStateElement();
            this.addPartitionReceiverForStateElement(stateElement, metaEvent, partitionExecutors, 0);
        }
    }

    private int addPartitionReceiverForStateElement(StateElement stateElement, MetaStateEvent metaEvent, List<List<PartitionExecutor>> partitionExecutors, int executorIndex) {
        if (stateElement instanceof EveryStateElement) {
            return this.addPartitionReceiverForStateElement(((EveryStateElement)stateElement).getStateElement(), metaEvent, partitionExecutors, executorIndex);
        }
        if (stateElement instanceof NextStateElement) {
            executorIndex = this.addPartitionReceiverForStateElement(((NextStateElement)stateElement).getStateElement(), metaEvent, partitionExecutors, executorIndex);
            return this.addPartitionReceiverForStateElement(((NextStateElement)stateElement).getNextStateElement(), metaEvent, partitionExecutors, executorIndex);
        }
        if (stateElement instanceof CountStateElement) {
            return this.addPartitionReceiverForStateElement((StateElement)((CountStateElement)stateElement).getStreamStateElement(), metaEvent, partitionExecutors, executorIndex);
        }
        if (stateElement instanceof LogicalStateElement) {
            executorIndex = this.addPartitionReceiverForStateElement((StateElement)((LogicalStateElement)stateElement).getStreamStateElement1(), metaEvent, partitionExecutors, executorIndex);
            return this.addPartitionReceiverForStateElement((StateElement)((LogicalStateElement)stateElement).getStreamStateElement2(), metaEvent, partitionExecutors, executorIndex);
        }
        BasicSingleInputStream singleInputStream = ((StreamStateElement)stateElement).getBasicSingleInputStream();
        this.addPartitionReceiver(singleInputStream.getStreamId(), singleInputStream.isInnerStream(), metaEvent.getMetaStreamEvent(executorIndex), partitionExecutors.get(executorIndex));
        return ++executorIndex;
    }

    private void addPartitionReceiver(String streamId, boolean isInnerStream, MetaStreamEvent metaStreamEvent, List<PartitionExecutor> partitionExecutors) {
        if (!this.partitionStreamReceivers.containsKey(streamId) && !isInnerStream && metaStreamEvent.getEventType() == MetaStreamEvent.EventType.DEFAULT) {
            StreamDefinition streamDefinition = (StreamDefinition)this.streamDefinitionMap.get(streamId);
            if (streamDefinition == null) {
                streamDefinition = (StreamDefinition)this.windowDefinitionMap.get(streamId);
            }
            PartitionStreamReceiver partitionStreamReceiver = new PartitionStreamReceiver(this.siddhiAppContext, metaStreamEvent, streamDefinition, partitionExecutors, this);
            this.partitionStreamReceivers.put(partitionStreamReceiver.getStreamId(), partitionStreamReceiver);
            ((StreamJunction)this.streamJunctionMap.get(partitionStreamReceiver.getStreamId())).subscribe(partitionStreamReceiver);
        }
    }

    private StreamJunction createStreamJunction(StreamDefinition streamDefinition) {
        return new StreamJunction(streamDefinition, this.siddhiAppContext.getExecutorService(), this.siddhiAppContext.getBufferSize(), null, this.siddhiAppContext);
    }

    public void addInnerpartitionStreamReceiverStreamJunction(String key, StreamJunction streamJunction) {
        this.innerPartitionStreamReceiverStreamJunctionMap.put(key, streamJunction);
    }

    public ConcurrentMap<String, StreamJunction> getInnerPartitionStreamReceiverStreamJunctionMap() {
        return this.innerPartitionStreamReceiverStreamJunctionMap;
    }

    public void init() {
        for (PartitionStreamReceiver partitionStreamReceiver : this.partitionStreamReceivers.values()) {
            partitionStreamReceiver.addStreamJunction(this.queryRuntimeList);
            partitionStreamReceiver.init();
        }
    }

    @Override
    public String getPartitionName() {
        return this.partitionName;
    }

    public ConcurrentMap<String, AbstractDefinition> getLocalStreamDefinitionMap() {
        return this.localStreamDefinitionMap;
    }

    public ConcurrentMap<String, StreamJunction> getLocalStreamJunctionMap() {
        return this.localStreamJunctionMap;
    }

    public void setMemoryUsageTracker(MemoryUsageTracker memoryUsageTracker) {
        for (QueryRuntime queryRuntime : this.queryRuntimeList) {
            QueryParserHelper.registerMemoryUsageTracking(queryRuntime.getQueryId(), queryRuntime, "Queries", this.siddhiAppContext, memoryUsageTracker);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initPartition() {
        block10: {
            PartitionState state = this.stateHolder.getState();
            try {
                Long time = (Long)state.partitionKeys.get(SiddhiAppContext.getPartitionFlowId());
                if (time == null) {
                    PartitionState partitionState = state;
                    synchronized (partitionState) {
                        time = (Long)state.partitionKeys.get(SiddhiAppContext.getPartitionFlowId());
                        if (time == null) {
                            for (QueryRuntime queryRuntime : this.queryRuntimeList) {
                                ((QueryRuntimeImpl)queryRuntime).initPartition();
                            }
                        }
                        state.partitionKeys.put(SiddhiAppContext.getPartitionFlowId(), this.siddhiAppContext.getTimestampGenerator().currentTime());
                        break block10;
                    }
                }
                state.partitionKeys.put(SiddhiAppContext.getPartitionFlowId(), this.siddhiAppContext.getTimestampGenerator().currentTime());
            }
            finally {
                this.stateHolder.returnState(state);
            }
        }
        if (this.purgingEnabled) {
            this.siddhiAppContext.getScheduledExecutorService().scheduleWithFixedDelay(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    long currentTime = PartitionRuntimeImpl.this.siddhiAppContext.getTimestampGenerator().currentTime();
                    PartitionState state = (PartitionState)PartitionRuntimeImpl.this.stateHolder.getState();
                    try {
                        PartitionState partitionState = state;
                        synchronized (partitionState) {
                            HashMap partitions = new HashMap(state.partitionKeys);
                            for (Map.Entry partition : partitions.entrySet()) {
                                if ((Long)partition.getValue() + PartitionRuntimeImpl.this.purgeIdlePeriod >= currentTime) continue;
                                state.partitionKeys.remove(partition.getKey());
                                SiddhiAppContext.startPartitionFlow((String)partition.getKey());
                                try {
                                    for (QueryRuntime queryRuntime : PartitionRuntimeImpl.this.queryRuntimeList) {
                                        Map<String, StateHolder> elementHolderMap = PartitionRuntimeImpl.this.siddhiAppContext.getSnapshotService().getStateHolderMap(PartitionRuntimeImpl.this.partitionName, queryRuntime.getQueryId());
                                        for (StateHolder stateHolder : elementHolderMap.values()) {
                                            stateHolder.cleanGroupByStates();
                                        }
                                    }
                                }
                                finally {
                                    SiddhiAppContext.stopPartitionFlow();
                                }
                            }
                        }
                    }
                    finally {
                        PartitionRuntimeImpl.this.stateHolder.returnState(state);
                    }
                }
            }, this.purgeExecutionInterval, this.purgeExecutionInterval, TimeUnit.MILLISECONDS);
        }
    }

    public Set<String> getPartitionKeys() {
        PartitionState state = this.stateHolder.getState();
        try {
            HashSet<String> hashSet = new HashSet<String>(state.partitionKeys.keySet());
            return hashSet;
        }
        finally {
            this.stateHolder.returnState(state);
        }
    }

    @Override
    public Collection<QueryRuntime> getQueries() {
        return this.queryRuntimeList;
    }

    public class PartitionState
    extends State {
        private Map<String, Long> partitionKeys = new ConcurrentHashMap<String, Long>();

        @Override
        public boolean canDestroy() {
            return this.partitionKeys.isEmpty();
        }

        @Override
        public Map<String, Object> snapshot() {
            HashMap<String, Object> state = new HashMap<String, Object>();
            state.put("PartitionKeys", this.partitionKeys);
            return state;
        }

        @Override
        public void restore(Map<String, Object> state) {
            this.partitionKeys = (Map)state.get("PartitionKeys");
        }
    }
}

