package org.apache.flink.cep.nfa;

import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFAUtils;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/cep/nfa/NFAStatusChangeITCase.class */
public class NFAStatusChangeITCase {
    private SharedBuffer<Event> sharedBuffer;
    private SharedBufferAccessor<Event> sharedBufferAccessor;

    @Before
    public void init() {
        this.sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
        this.sharedBufferAccessor = this.sharedBuffer.getAccessor();
    }

    @After
    public void clear() throws Exception {
        this.sharedBufferAccessor.close();
    }

    @Test
    public void testNFAChange() throws Exception {
        NFA compile = NFAUtils.compile(Pattern.begin("start").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.4
            private static final long serialVersionUID = 1858562682635302605L;

            public boolean filter(Event event) throws Exception {
                return event.getName().equals("a");
            }
        }).followedByAny("middle").where(new IterativeCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.3
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event event, IterativeCondition.Context<Event> context) throws Exception {
                return event.getName().equals("b");
            }

            public /* bridge */ /* synthetic */ boolean filter(Object obj, IterativeCondition.Context context) throws Exception {
                return filter((Event) obj, (IterativeCondition.Context<Event>) context);
            }
        }).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.2
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event event, IterativeCondition.Context<Event> context) throws Exception {
                return event.getName().equals("d");
            }

            public /* bridge */ /* synthetic */ boolean filter(Object obj, IterativeCondition.Context context) throws Exception {
                return filter((Event) obj, (IterativeCondition.Context<Event>) context);
            }
        }).followedBy("end").where(new IterativeCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.1
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event event, IterativeCondition.Context<Event> context) throws Exception {
                return event.getName().equals("e");
            }

            public /* bridge */ /* synthetic */ boolean filter(Object obj, IterativeCondition.Context context) throws Exception {
                return filter((Event) obj, (IterativeCondition.Context<Event>) context);
            }
        }).within(Time.milliseconds(10L)), true);
        NFAState createInitialNFAState = compile.createInitialNFAState();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(1, "b", 1.0d), 1L);
        Assert.assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", createInitialNFAState.isStateChanged());
        createInitialNFAState.resetStateChanged();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(2, "a", 1.0d), 2L);
        Assert.assertTrue("NFA status should change as the event matches the take condition of the 'start' state", createInitialNFAState.isStateChanged());
        createInitialNFAState.resetStateChanged();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(3, "f", 1.0d), 3L);
        Assert.assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", createInitialNFAState.isStateChanged());
        createInitialNFAState.resetStateChanged();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(4, "f", 1.0d), 4L);
        Assert.assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", createInitialNFAState.isStateChanged());
        createInitialNFAState.resetStateChanged();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(5, "b", 1.0d), 5L);
        Assert.assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", createInitialNFAState.isStateChanged());
        createInitialNFAState.resetStateChanged();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(6, "d", 1.0d), 6L);
        Assert.assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", createInitialNFAState.isStateChanged());
        createInitialNFAState.resetStateChanged();
        compile.advanceTime(this.sharedBufferAccessor, createInitialNFAState, 8L);
        Assert.assertFalse("NFA status should not change as the timestamp is within the window", createInitialNFAState.isStateChanged());
        createInitialNFAState.resetStateChanged();
        Assert.assertTrue("NFA status should change as timeout happens", createInitialNFAState.isStateChanged() && !compile.advanceTime(this.sharedBufferAccessor, createInitialNFAState, 12L).isEmpty());
    }

    @Test
    public void testNFAChangedOnOneNewComputationState() throws Exception {
        NFA compile = NFAUtils.compile(Pattern.begin("start").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.7
            public boolean filter(Event event) throws Exception {
                return event.getName().equals("start");
            }
        }).followedBy("a*").where(new SimpleCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.6
            private static final long serialVersionUID = 1858562682635302605L;

            public boolean filter(Event event) throws Exception {
                return event.getName().equals("a");
            }
        }).oneOrMore().optional().next("end").where(new IterativeCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.5
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event event, IterativeCondition.Context<Event> context) throws Exception {
                return event.getName().equals("b");
            }

            public /* bridge */ /* synthetic */ boolean filter(Object obj, IterativeCondition.Context context) throws Exception {
                return filter((Event) obj, (IterativeCondition.Context<Event>) context);
            }
        }).within(Time.milliseconds(10L)), true);
        NFAState createInitialNFAState = compile.createInitialNFAState();
        createInitialNFAState.resetStateChanged();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(6, "start", 1.0d), 6L);
        createInitialNFAState.resetStateChanged();
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(6, "a", 1.0d), 7L);
        Assert.assertTrue(createInitialNFAState.isStateChanged());
    }

    @Test
    public void testNFAChangedOnTimeoutWithoutPrune() throws Exception {
        NFA compile = NFAUtils.compile(Pattern.begin("start").where(new IterativeCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.9
            public boolean filter(Event event, IterativeCondition.Context<Event> context) throws Exception {
                return event.getName().equals("start");
            }

            public /* bridge */ /* synthetic */ boolean filter(Object obj, IterativeCondition.Context context) throws Exception {
                return filter((Event) obj, (IterativeCondition.Context<Event>) context);
            }
        }).followedBy("end").where(new IterativeCondition<Event>() { // from class: org.apache.flink.cep.nfa.NFAStatusChangeITCase.8
            private static final long serialVersionUID = 8061969839441121955L;

            public boolean filter(Event event, IterativeCondition.Context<Event> context) throws Exception {
                return event.getName().equals("end");
            }

            public /* bridge */ /* synthetic */ boolean filter(Object obj, IterativeCondition.Context context) throws Exception {
                return filter((Event) obj, (IterativeCondition.Context<Event>) context);
            }
        }).within(Time.milliseconds(10L)), true);
        NFAState createInitialNFAState = compile.createInitialNFAState();
        createInitialNFAState.resetStateChanged();
        compile.advanceTime(this.sharedBufferAccessor, createInitialNFAState, 6L);
        compile.process(this.sharedBufferAccessor, createInitialNFAState, new Event(6, "start", 1.0d), 6L);
        createInitialNFAState.resetStateChanged();
        compile.advanceTime(this.sharedBufferAccessor, createInitialNFAState, 17L);
        Assert.assertTrue(createInitialNFAState.isStateChanged());
    }
}
