package io.debezium.pipeline.signal;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.document.DocumentReader;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.signal.actions.Log;
import io.debezium.pipeline.signal.actions.SignalAction;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.spi.schema.DataCollectionId;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceConnector;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/pipeline/signal/SignalProcessorTest.class */
public class SignalProcessorTest {
    private SignalProcessor<TestPartition, OffsetContext> signalProcess;
    private final DocumentReader documentReader = DocumentReader.defaultReader();
    private Offsets<TestPartition, OffsetContext> initialOffset;

    /* loaded from: input_file:io/debezium/pipeline/signal/SignalProcessorTest$TestOffset.class */
    private static class TestOffset extends CommonOffsetContext {
        TestOffset(BaseSourceInfo baseSourceInfo) {
            super(baseSourceInfo);
        }

        public Map<String, ?> getOffset() {
            return Map.of("LSN", 12345);
        }

        public Schema getSourceInfoSchema() {
            return null;
        }

        public boolean isSnapshotRunning() {
            return false;
        }

        public void preSnapshotStart() {
        }

        public void preSnapshotCompletion() {
        }

        public void event(DataCollectionId dataCollectionId, Instant instant) {
        }

        public TransactionContext getTransactionContext() {
            return null;
        }
    }

    /* loaded from: input_file:io/debezium/pipeline/signal/SignalProcessorTest$TestPartition.class */
    private static class TestPartition implements Partition {
        private TestPartition() {
        }

        public Map<String, String> getSourcePartition() {
            throw new UnsupportedOperationException();
        }
    }

    @Before
    public void setUp() {
        this.initialOffset = Offsets.of(new TestPartition(), new TestOffset(new BaseSourceInfo(baseConfig()) { // from class: io.debezium.pipeline.signal.SignalProcessorTest.1
            protected Instant timestamp() {
                return Instant.now();
            }

            protected String database() {
                return "test_db";
            }
        }));
    }

    @Test
    public void shouldExecuteLog() throws InterruptedException {
        SignalChannelReader signalChannelReader = (SignalChannelReader) Mockito.mock(SignalChannelReader.class);
        Mockito.when(signalChannelReader.name()).thenReturn("generic");
        Mockito.when(signalChannelReader.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message\": \"signallog {}\"}", -1L, Map.of("channelOffset", -1L))), new List[]{List.of()});
        LogInterceptor logInterceptor = new LogInterceptor((Class<?>) Log.class);
        this.signalProcess = new SignalProcessor<>(SourceConnector.class, baseConfig(), Map.of("log", new Log()), List.of(signalChannelReader), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(200L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            logInterceptor.containsMessage("signallog {LSN=12345}");
        });
        this.signalProcess.stop();
        Assertions.assertThat(logInterceptor.containsMessage("signallog {LSN=12345}")).isTrue();
    }

    @Test
    public void onlyEnabledConnectorShouldExecute() throws InterruptedException {
        SignalChannelReader signalChannelReader = (SignalChannelReader) Mockito.mock(SignalChannelReader.class);
        Mockito.when(signalChannelReader.name()).thenReturn("generic1");
        Mockito.when(signalChannelReader.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message\": \"signallog {}\"}", -1L, Map.of("channelOffset", -1L))), new List[]{List.of()});
        SignalChannelReader signalChannelReader2 = (SignalChannelReader) Mockito.mock(SignalChannelReader.class);
        Mockito.when(signalChannelReader2.name()).thenReturn("generic2");
        Mockito.when(signalChannelReader2.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message\": \"signallog {}\"}", -1L, Map.of("channelOffset", -1L))), new List[]{List.of()});
        LogInterceptor logInterceptor = new LogInterceptor((Class<?>) Log.class);
        this.signalProcess = new SignalProcessor<>(SourceConnector.class, baseConfig(Map.of(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS.name(), "generic1")), Map.of("log", new Log()), List.of(signalChannelReader, signalChannelReader2), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(300L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("signallog {LSN=12345}"));
        });
        this.signalProcess.stop();
        Assertions.assertThat(logInterceptor.countOccurrences("signallog {}")).isEqualTo(1L);
    }

    @Test
    public void shouldIgnoreInvalidSignalType() throws InterruptedException {
        SignalChannelReader signalChannelReader = (SignalChannelReader) Mockito.mock(SignalChannelReader.class);
        Mockito.when(signalChannelReader.name()).thenReturn("generic");
        Mockito.when(signalChannelReader.read()).thenReturn(List.of(new SignalRecord("log1", "invalidType", "{\"message\": \"signallog {}\"}", -1L, Map.of("channelOffset", -1L))), new List[]{List.of()});
        LogInterceptor logInterceptor = new LogInterceptor((Class<?>) SignalProcessor.class);
        this.signalProcess = new SignalProcessor<>(SourceConnector.class, baseConfig(), Map.of(), List.of(signalChannelReader), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(200L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(logInterceptor.containsMessage("Signal 'log1' has been received but the type 'invalidType' is not recognized")).isTrue();
        });
        this.signalProcess.stop();
    }

    @Test
    public void shouldIgnoreUnparseableData() {
        SignalChannelReader signalChannelReader = (SignalChannelReader) Mockito.mock(SignalChannelReader.class);
        Mockito.when(signalChannelReader.name()).thenReturn("generic");
        Mockito.when(signalChannelReader.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message: \"signallog\"}", -1L, Map.of("channelOffset", -1L))), new List[]{List.of()});
        LogInterceptor logInterceptor = new LogInterceptor((Class<?>) SignalProcessor.class);
        this.signalProcess = new SignalProcessor<>(SourceConnector.class, baseConfig(), Map.of("log", new Log()), List.of(signalChannelReader), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(40L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(logInterceptor.containsMessage("Signal 'log1' has been received but the data '{\"message: \"signallog\"}' cannot be parsed")).isTrue();
        });
    }

    @Test
    public void shouldRegisterAdditionalAction() {
        SignalChannelReader signalChannelReader = (SignalChannelReader) Mockito.mock(SignalChannelReader.class);
        Mockito.when(signalChannelReader.name()).thenReturn("generic");
        Mockito.when(signalChannelReader.read()).thenReturn(List.of(new SignalRecord("log1", "custom", "{\"v\": 5}", -1L, Map.of("channelOffset", -1L))), new List[]{List.of()});
        AtomicInteger atomicInteger = new AtomicInteger();
        SignalAction signalAction = signalPayload -> {
            atomicInteger.set(signalPayload.data.getInteger("v").intValue());
            return true;
        };
        this.signalProcess = new SignalProcessor<>(SourceConnector.class, baseConfig(), Map.of(), List.of(signalChannelReader), this.documentReader, this.initialOffset);
        this.signalProcess.registerSignalAction("custom", signalAction);
        this.signalProcess.start();
        Awaitility.await().atMost(40L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(atomicInteger.intValue()).isEqualTo(5);
        });
    }

    protected CommonConnectorConfig baseConfig() {
        return baseConfig(Map.of());
    }

    protected CommonConnectorConfig baseConfig(Map<String, Object> map) {
        Configuration.Builder with = Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").with(CommonConnectorConfig.TOPIC_PREFIX, "core").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 100).with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,generic");
        Objects.requireNonNull(with);
        map.forEach(with::with);
        return new CommonConnectorConfig(with.build(), 0) { // from class: io.debezium.pipeline.signal.SignalProcessorTest.2
            protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
                return null;
            }

            public String getContextName() {
                return null;
            }

            public String getConnectorName() {
                return null;
            }
        };
    }
}
