package io.siddhi.core.partition;

import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.Event;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.event.stream.converter.StreamEventConverter;
import io.siddhi.core.event.stream.converter.StreamEventConverterFactory;
import io.siddhi.core.partition.executor.PartitionExecutor;
import io.siddhi.core.query.QueryRuntime;
import io.siddhi.core.query.QueryRuntimeImpl;
import io.siddhi.core.query.input.stream.StreamRuntime;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.query.api.definition.StreamDefinition;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:io/siddhi/core/partition/PartitionStreamReceiver.class */
public class PartitionStreamReceiver implements StreamJunction.Receiver {
    private final StreamEventFactory streamEventFactory;
    private StreamEventConverter streamEventConverter;
    private String streamId;
    private MetaStreamEvent metaStreamEvent;
    private StreamDefinition streamDefinition;
    private SiddhiAppContext siddhiAppContext;
    private PartitionRuntimeImpl partitionRuntime;
    private List<PartitionExecutor> partitionExecutors;
    private Map<String, StreamJunction> streamJunctionMap = new HashMap();

    public PartitionStreamReceiver(SiddhiAppContext siddhiAppContext, MetaStreamEvent metaStreamEvent, StreamDefinition streamDefinition, List<PartitionExecutor> list, PartitionRuntime partitionRuntime) {
        this.metaStreamEvent = metaStreamEvent;
        this.streamDefinition = streamDefinition;
        this.partitionRuntime = (PartitionRuntimeImpl) partitionRuntime;
        this.partitionExecutors = list;
        this.siddhiAppContext = siddhiAppContext;
        this.streamId = streamDefinition.getId();
        this.streamEventFactory = new StreamEventFactory(metaStreamEvent);
    }

    public void init() {
        this.streamEventConverter = StreamEventConverterFactory.constructEventConverter(this.metaStreamEvent);
    }

    @Override // io.siddhi.core.stream.StreamJunction.Receiver
    public String getStreamId() {
        return this.streamId;
    }

    @Override // io.siddhi.core.stream.StreamJunction.Receiver
    public void receive(ComplexEvent complexEvent) {
        if (this.partitionExecutors.size() != 0) {
            if (complexEvent.getNext() == null) {
                for (PartitionExecutor partitionExecutor : this.partitionExecutors) {
                    StreamEvent m18newInstance = this.streamEventFactory.m18newInstance();
                    this.streamEventConverter.convertComplexEvent(complexEvent, m18newInstance);
                    send(partitionExecutor.execute(m18newInstance), m18newInstance);
                }
                return;
            }
            ComplexEventChunk complexEventChunk = new ComplexEventChunk(false);
            complexEventChunk.add(complexEvent);
            ComplexEventChunk complexEventChunk2 = new ComplexEventChunk(false);
            String str = null;
            while (complexEventChunk.hasNext()) {
                ComplexEvent next = complexEventChunk.next();
                complexEventChunk.remove();
                StreamEvent m18newInstance2 = this.streamEventFactory.m18newInstance();
                this.streamEventConverter.convertComplexEvent(next, m18newInstance2);
                boolean z = false;
                Iterator<PartitionExecutor> it = this.partitionExecutors.iterator();
                while (it.hasNext()) {
                    Object execute = it.next().execute(m18newInstance2);
                    if (execute != null) {
                        if (str == null) {
                            str = execute;
                        } else if (!str.equals(execute)) {
                            if (z) {
                                send(str, complexEventChunk2.getFirst());
                                str = execute;
                                complexEventChunk2.clear();
                                StreamEvent m18newInstance3 = this.streamEventFactory.m18newInstance();
                                this.streamEventConverter.convertComplexEvent(next, m18newInstance3);
                                complexEventChunk2.add(m18newInstance3);
                            } else {
                                send(str, complexEventChunk2.getFirst());
                                str = execute;
                                complexEventChunk2.clear();
                            }
                        }
                        if (!z) {
                            complexEventChunk2.add(m18newInstance2);
                        }
                        z = true;
                    }
                }
            }
            send(str, complexEventChunk2.getFirst());
            complexEventChunk2.clear();
            return;
        }
        ComplexEventChunk complexEventChunk3 = new ComplexEventChunk(false);
        ComplexEvent complexEvent2 = complexEvent;
        while (true) {
            ComplexEvent complexEvent3 = complexEvent2;
            if (complexEvent3 == null) {
                send(complexEventChunk3.getFirst());
                return;
            }
            StreamEvent m18newInstance4 = this.streamEventFactory.m18newInstance();
            this.streamEventConverter.convertComplexEvent(complexEvent3, m18newInstance4);
            complexEventChunk3.add(m18newInstance4);
            complexEvent2 = complexEvent3.getNext();
        }
    }

    @Override // io.siddhi.core.stream.StreamJunction.Receiver
    public void receive(Event event) {
        StreamEvent m18newInstance = this.streamEventFactory.m18newInstance();
        this.streamEventConverter.convertEvent(event, m18newInstance);
        Iterator<PartitionExecutor> it = this.partitionExecutors.iterator();
        while (it.hasNext()) {
            send(it.next().execute(m18newInstance), m18newInstance);
        }
        if (this.partitionExecutors.size() == 0) {
            send(m18newInstance);
        }
    }

    @Override // io.siddhi.core.stream.StreamJunction.Receiver
    public void receive(long j, Object[] objArr) {
        StreamEvent m18newInstance = this.streamEventFactory.m18newInstance();
        this.streamEventConverter.convertData(j, objArr, m18newInstance);
        if (this.partitionExecutors.size() == 0) {
            send(m18newInstance);
            return;
        }
        Iterator<PartitionExecutor> it = this.partitionExecutors.iterator();
        while (it.hasNext()) {
            send(it.next().execute(m18newInstance), m18newInstance);
        }
    }

    @Override // io.siddhi.core.stream.StreamJunction.Receiver
    public void receive(Event[] eventArr) {
        if (this.partitionExecutors.size() == 0) {
            StreamEvent m18newInstance = this.streamEventFactory.m18newInstance();
            this.streamEventConverter.convertEvent(eventArr[0], m18newInstance);
            StreamEvent streamEvent = m18newInstance;
            for (int i = 1; i < eventArr.length; i++) {
                StreamEvent m18newInstance2 = this.streamEventFactory.m18newInstance();
                this.streamEventConverter.convertEvent(eventArr[i], m18newInstance2);
                streamEvent.setNext(m18newInstance2);
                streamEvent = m18newInstance2;
            }
            send(m18newInstance);
            return;
        }
        String str = null;
        ComplexEvent complexEvent = null;
        StreamEvent streamEvent2 = null;
        for (Event event : eventArr) {
            StreamEvent m18newInstance3 = this.streamEventFactory.m18newInstance();
            this.streamEventConverter.convertEvent(event, m18newInstance3);
            Iterator<PartitionExecutor> it = this.partitionExecutors.iterator();
            while (it.hasNext()) {
                String execute = it.next().execute(m18newInstance3);
                if (execute != null) {
                    if (str == null) {
                        str = execute;
                        complexEvent = m18newInstance3;
                    } else if (execute.equals(str)) {
                        streamEvent2.setNext(m18newInstance3);
                    } else {
                        send(str, complexEvent);
                        str = execute;
                        complexEvent = m18newInstance3;
                    }
                    streamEvent2 = m18newInstance3;
                }
            }
        }
        send(str, complexEvent);
    }

    @Override // io.siddhi.core.stream.StreamJunction.Receiver
    public void receive(List<Event> list) {
        if (this.partitionExecutors.size() == 0) {
            StreamEvent streamEvent = null;
            StreamEvent streamEvent2 = null;
            for (Event event : list) {
                StreamEvent m18newInstance = this.streamEventFactory.m18newInstance();
                this.streamEventConverter.convertEvent(event, m18newInstance);
                if (streamEvent == null) {
                    streamEvent = m18newInstance;
                } else {
                    streamEvent2.setNext(m18newInstance);
                }
                streamEvent2 = m18newInstance;
            }
            send(streamEvent);
            return;
        }
        String str = null;
        StreamEvent streamEvent3 = null;
        StreamEvent streamEvent4 = null;
        for (Event event2 : list) {
            StreamEvent m18newInstance2 = this.streamEventFactory.m18newInstance();
            this.streamEventConverter.convertEvent(event2, m18newInstance2);
            Iterator<PartitionExecutor> it = this.partitionExecutors.iterator();
            while (it.hasNext()) {
                String execute = it.next().execute(m18newInstance2);
                if (execute != null) {
                    if (str == null) {
                        str = execute;
                        streamEvent3 = m18newInstance2;
                    } else if (execute.equals(str)) {
                        streamEvent4.setNext(m18newInstance2);
                    } else {
                        send(str, streamEvent3);
                        str = execute;
                        streamEvent3 = m18newInstance2;
                    }
                    streamEvent4 = m18newInstance2;
                }
            }
        }
        send(str, streamEvent3);
    }

    private void send(String str, ComplexEvent complexEvent) {
        if (str != null) {
            SiddhiAppContext.startPartitionFlow(str);
            try {
                this.partitionRuntime.initPartition();
                this.streamJunctionMap.get(this.streamId).sendEvent(complexEvent);
            } finally {
                SiddhiAppContext.stopPartitionFlow();
            }
        }
    }

    private void send(ComplexEvent complexEvent) {
        Iterator<String> it = this.partitionRuntime.getPartitionKeys().iterator();
        while (it.hasNext()) {
            SiddhiAppContext.startPartitionFlow(it.next());
            try {
                this.streamJunctionMap.get(this.streamId).sendEvent(complexEvent);
                SiddhiAppContext.stopPartitionFlow();
            } catch (Throwable th) {
                SiddhiAppContext.stopPartitionFlow();
                throw th;
            }
        }
    }

    public void addStreamJunction(List<QueryRuntime> list) {
        StreamJunction streamJunction = this.streamJunctionMap.get(this.streamId);
        if (streamJunction == null) {
            streamJunction = this.partitionRuntime.getInnerPartitionStreamReceiverStreamJunctionMap().get(this.streamId);
            if (streamJunction == null) {
                streamJunction = createStreamJunction();
                this.partitionRuntime.addInnerpartitionStreamReceiverStreamJunction(this.streamId, streamJunction);
            }
            this.streamJunctionMap.put(this.streamId, streamJunction);
        }
        for (QueryRuntime queryRuntime : list) {
            StreamRuntime streamRuntime = ((QueryRuntimeImpl) queryRuntime).getStreamRuntime();
            for (int i = 0; i < ((QueryRuntimeImpl) queryRuntime).getInputStreamId().size(); i++) {
                if (streamRuntime.getSingleStreamRuntimes().get(i).getProcessStreamReceiver().getStreamId().equals(this.streamId)) {
                    streamJunction.subscribe(streamRuntime.getSingleStreamRuntimes().get(i).getProcessStreamReceiver());
                }
            }
        }
    }

    private StreamJunction createStreamJunction() {
        return new StreamJunction(this.streamDefinition, this.siddhiAppContext.getExecutorService(), this.siddhiAppContext.getBufferSize(), null, this.siddhiAppContext);
    }
}
