package org.wso2.siddhi.core.query.processor.join;

import java.util.Iterator;
import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.event.AtomicEvent;
import org.wso2.siddhi.core.event.BundleEvent;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.ListAtomicEvent;
import org.wso2.siddhi.core.event.ListEvent;
import org.wso2.siddhi.core.event.StateEvent;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InStream;
import org.wso2.siddhi.core.executor.conditon.ConditionExecutor;
import org.wso2.siddhi.core.query.QueryPostProcessingElement;
import org.wso2.siddhi.core.query.processor.PreSelectProcessingElement;
import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
import org.wso2.siddhi.core.query.selector.QuerySelector;
import org.wso2.siddhi.core.util.LogHelper;

/* loaded from: input_file:org/wso2/siddhi/core/query/processor/join/JoinProcessor.class */
public abstract class JoinProcessor implements QueryPostProcessingElement, PreSelectProcessingElement {
    static final Logger log = Logger.getLogger(JoinProcessor.class);
    private WindowProcessor windowProcessor;
    protected ConditionExecutor onConditionExecutor;
    private boolean triggerEvent;
    private WindowProcessor oppositeWindowProcessor;
    protected long within = -1;
    private boolean distributedProcessing;
    private QuerySelector querySelector;
    private Lock lock;
    private boolean fromDB;

    public WindowProcessor getWindowProcessor() {
        return this.windowProcessor;
    }

    public JoinProcessor(ConditionExecutor conditionExecutor, boolean z, boolean z2, Lock lock, boolean z3) {
        this.fromDB = false;
        this.onConditionExecutor = conditionExecutor;
        this.triggerEvent = z;
        this.distributedProcessing = z2;
        this.lock = lock;
        this.fromDB = z3;
    }

    private boolean isEventsWithin(ComplexEvent complexEvent, ComplexEvent complexEvent2) {
        if (this.within <= -1) {
            return true;
        }
        long abs = Math.abs(complexEvent.getTimeStamp() - complexEvent2.getTimeStamp());
        return abs <= this.within && abs != Long.MIN_VALUE;
    }

    protected void sendEventList(ListAtomicEvent listAtomicEvent) {
        if (log.isDebugEnabled()) {
            log.debug(listAtomicEvent);
        }
        int activeEvents = listAtomicEvent.getActiveEvents();
        if (activeEvents > 1) {
            this.querySelector.process(listAtomicEvent);
        } else if (activeEvents == 1) {
            this.querySelector.process(listAtomicEvent.getEvent0());
        }
    }

    public void setOppositeWindowProcessor(WindowProcessor windowProcessor) {
        this.oppositeWindowProcessor = windowProcessor;
    }

    public void setWindowProcessor(WindowProcessor windowProcessor) {
        this.windowProcessor = windowProcessor;
    }

    public void setWithin(long j) {
        this.within = j;
    }

    @Override // org.wso2.siddhi.core.query.processor.PreSelectProcessingElement
    public void setNext(QuerySelector querySelector) {
        this.querySelector = querySelector;
    }

    @Override // org.wso2.siddhi.core.query.QueryPostProcessingElement
    public void process(AtomicEvent atomicEvent) {
        LogHelper.logMethod(log, atomicEvent);
        if ((atomicEvent instanceof Event) && triggerEventTypeCheck((Event) atomicEvent)) {
            if (!this.triggerEvent) {
                if (atomicEvent instanceof InStream) {
                    this.windowProcessor.process(atomicEvent);
                    return;
                }
                return;
            }
            acquireLock();
            ListAtomicEvent createNewListAtomicEvent = createNewListAtomicEvent();
            Iterator<StreamEvent> streamEventIterator = getStreamEventIterator((Event) atomicEvent);
            while (streamEventIterator.hasNext()) {
                StreamEvent next = streamEventIterator.next();
                if (next instanceof Event) {
                    if (!isEventsWithin((Event) atomicEvent, next)) {
                        break;
                    }
                    StateEvent createNewEvent = createNewEvent((Event) atomicEvent, (Event) next);
                    if (this.onConditionExecutor.execute(createNewEvent)) {
                        createNewListAtomicEvent.addEvent(createNewEvent);
                    }
                } else if (next instanceof ListEvent) {
                    for (Event event : ((ListEvent) next).getEvents()) {
                        if (isEventsWithin((Event) atomicEvent, next)) {
                            StateEvent createNewEvent2 = createNewEvent((Event) atomicEvent, event);
                            if (this.onConditionExecutor.execute(createNewEvent2)) {
                                createNewListAtomicEvent.addEvent(createNewEvent2);
                            }
                        }
                    }
                }
            }
            if (createNewListAtomicEvent.getActiveEvents() > 0) {
                sendEventList(createNewListAtomicEvent);
            }
            if (atomicEvent instanceof InStream) {
                this.windowProcessor.process(atomicEvent);
            }
            releaseLock();
        }
    }

    protected abstract boolean triggerEventTypeCheck(ComplexEvent complexEvent);

    @Override // org.wso2.siddhi.core.query.QueryPostProcessingElement
    public void process(BundleEvent bundleEvent) {
        ListEvent listEvent = (ListEvent) bundleEvent;
        if (triggerEventTypeCheck(listEvent)) {
            if (!this.triggerEvent) {
                if (listEvent instanceof InStream) {
                    this.windowProcessor.process(listEvent);
                    return;
                }
                return;
            }
            ListAtomicEvent createNewListAtomicEvent = createNewListAtomicEvent();
            if (log.isDebugEnabled()) {
                log.debug("Joining input events " + listEvent.getActiveEvents());
            }
            acquireLock();
            try {
                Iterator<StreamEvent> it = this.oppositeWindowProcessor.iterator();
                while (it.hasNext()) {
                    ComplexEvent complexEvent = (StreamEvent) it.next();
                    if (!isEventsWithin(listEvent, complexEvent)) {
                        break;
                    }
                    for (int i = 0; i < listEvent.getActiveEvents(); i++) {
                        ComplexEvent event = listEvent.getEvent(i);
                        if (complexEvent instanceof Event) {
                            AtomicEvent createNewEvent = createNewEvent(event, complexEvent);
                            if (this.onConditionExecutor.execute(createNewEvent)) {
                                createNewListAtomicEvent.addEvent(createNewEvent);
                            }
                        } else if (complexEvent instanceof ListEvent) {
                            for (int i2 = 0; i2 < ((ListEvent) complexEvent).getActiveEvents(); i2++) {
                                AtomicEvent createNewEvent2 = createNewEvent(event, ((ListEvent) complexEvent).getEvent(i2));
                                if (this.onConditionExecutor.execute(createNewEvent2)) {
                                    createNewListAtomicEvent.addEvent(createNewEvent2);
                                }
                            }
                        }
                    }
                }
                if (log.isDebugEnabled()) {
                    log.debug("Sending join output events " + createNewListAtomicEvent.getActiveEvents());
                }
                sendEventList(createNewListAtomicEvent);
                if (listEvent instanceof InStream) {
                    this.windowProcessor.process(listEvent);
                }
            } finally {
                releaseLock();
            }
        }
    }

    protected abstract ListAtomicEvent createNewListAtomicEvent();

    private Iterator<StreamEvent> getStreamEventIterator(Event event) {
        Iterator<StreamEvent> it;
        if (this.distributedProcessing) {
            StateEvent createNewEvent = createNewEvent(event, null);
            if (this.fromDB) {
                it = this.oppositeWindowProcessor.iterator(event, this.onConditionExecutor);
            } else {
                String constructFilterQuery = this.onConditionExecutor.constructFilterQuery(createNewEvent, 1);
                if (this.within > -1) {
                    constructFilterQuery = constructFilterQuery + " and ( timeStamp < " + (event.getTimeStamp() + this.within) + ")";
                }
                log.debug("Join sql predicate: " + constructFilterQuery);
                it = this.oppositeWindowProcessor.iterator(constructFilterQuery);
            }
        } else {
            it = this.fromDB ? event != null ? this.oppositeWindowProcessor.iterator(event, this.onConditionExecutor) : this.oppositeWindowProcessor.iterator() : this.oppositeWindowProcessor.iterator();
        }
        return it;
    }

    public void acquireLock() {
        if (this.lock != null) {
            if (log.isDebugEnabled()) {
                log.debug(this.lock + " trying to acquire join locked");
            }
            this.lock.lock();
            if (log.isDebugEnabled()) {
                log.debug(this.lock + " join locked");
            }
        }
    }

    public void releaseLock() {
        if (this.lock != null) {
            this.lock.unlock();
            if (log.isDebugEnabled()) {
                log.debug(this.lock + " join unlocked");
            }
        }
    }

    protected abstract StateEvent createNewEvent(ComplexEvent complexEvent, ComplexEvent complexEvent2);
}
