/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.query.input;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.query.input.ProcessStreamReceiver;
import io.siddhi.core.query.input.stream.state.StreamPreStateProcessor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.selector.QuerySelector;

public class SingleProcessStreamReceiver
extends ProcessStreamReceiver {
    protected final String lockKey;
    protected ComplexEventChunk<StreamEvent> currentStreamEventChunk;
    private QuerySelector querySelector;

    public SingleProcessStreamReceiver(String streamId, String lockKey, SiddhiQueryContext siddhiQueryContext) {
        super(streamId, siddhiQueryContext);
        this.currentStreamEventChunk = new ComplexEventChunk(this.batchProcessingAllowed);
        this.lockKey = lockKey;
    }

    @Override
    public void setNext(Processor next) {
        super.setNext(next);
        this.querySelector = (QuerySelector)((StreamPreStateProcessor)next).getThisLastProcessor().getNextProcessor();
    }

    @Override
    public SingleProcessStreamReceiver clone(String key) {
        return new SingleProcessStreamReceiver(this.streamId + key, key, this.siddhiQueryContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void processAndClear(ComplexEventChunk<StreamEvent> streamEventChunk) {
        ComplexEventChunk<StateEvent> retEventChunk = new ComplexEventChunk<StateEvent>(false);
        String string = this.lockKey;
        synchronized (string) {
            while (streamEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                streamEventChunk.remove();
                this.stabilizeStates();
                this.currentStreamEventChunk.add(streamEvent);
                ComplexEventChunk<StateEvent> eventChunk = ((StreamPreStateProcessor)this.next).processAndReturn(this.currentStreamEventChunk);
                if (eventChunk.getFirst() != null) {
                    retEventChunk.add(eventChunk.getFirst());
                }
                eventChunk.clear();
                this.currentStreamEventChunk.clear();
            }
        }
        while (retEventChunk.hasNext()) {
            StateEvent stateEvent = (StateEvent)retEventChunk.next();
            retEventChunk.remove();
            this.querySelector.process(new ComplexEventChunk<StateEvent>(stateEvent, stateEvent, false));
        }
    }

    protected void stabilizeStates() {
    }
}

