/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.signal;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.document.DocumentReader;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.actions.Log;
import io.debezium.pipeline.signal.actions.SignalAction;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.spi.schema.DataCollectionId;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceConnector;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class SignalProcessorTest {
    private SignalProcessor<TestPartition, OffsetContext> signalProcess;
    private final DocumentReader documentReader = DocumentReader.defaultReader();
    private Offsets<TestPartition, OffsetContext> initialOffset;

    @Before
    public void setUp() {
        TestOffset testOffset = new TestOffset(new BaseSourceInfo(this.baseConfig()){

            protected Instant timestamp() {
                return Instant.now();
            }

            protected String database() {
                return "test_db";
            }
        });
        this.initialOffset = Offsets.of((Partition)new TestPartition(), (OffsetContext)testOffset);
    }

    @Test
    public void shouldExecuteLog() throws InterruptedException {
        SignalChannelReader genericChannel = (SignalChannelReader)Mockito.mock(SignalChannelReader.class);
        Mockito.when((Object)genericChannel.name()).thenReturn((Object)"generic");
        Mockito.when((Object)genericChannel.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message\": \"signallog {}\"}", Long.valueOf(-1L), Map.of("channelOffset", -1L))), (Object[])new List[]{List.of()});
        LogInterceptor log = new LogInterceptor(Log.class);
        this.signalProcess = new SignalProcessor(SourceConnector.class, this.baseConfig(), Map.of("log", new Log()), List.of(genericChannel), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(200L, TimeUnit.MILLISECONDS).untilAsserted(() -> log.containsMessage("signallog {LSN=12345}"));
        this.signalProcess.stop();
        Assertions.assertThat((boolean)log.containsMessage("signallog {LSN=12345}")).isTrue();
    }

    @Test
    public void onlyEnabledConnectorShouldExecute() throws InterruptedException {
        SignalChannelReader genericChannel1 = (SignalChannelReader)Mockito.mock(SignalChannelReader.class);
        Mockito.when((Object)genericChannel1.name()).thenReturn((Object)"generic1");
        Mockito.when((Object)genericChannel1.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message\": \"signallog {}\"}", Long.valueOf(-1L), Map.of("channelOffset", -1L))), (Object[])new List[]{List.of()});
        SignalChannelReader genericChannel2 = (SignalChannelReader)Mockito.mock(SignalChannelReader.class);
        Mockito.when((Object)genericChannel2.name()).thenReturn((Object)"generic2");
        Mockito.when((Object)genericChannel2.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message\": \"signallog {}\"}", Long.valueOf(-1L), Map.of("channelOffset", -1L))), (Object[])new List[]{List.of()});
        LogInterceptor log = new LogInterceptor(Log.class);
        this.signalProcess = new SignalProcessor(SourceConnector.class, this.baseConfig(Map.of(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS.name(), "generic1")), Map.of("log", new Log()), List.of(genericChannel1, genericChannel2), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(300L, TimeUnit.MILLISECONDS).until(() -> log.containsMessage("signallog {LSN=12345}"));
        this.signalProcess.stop();
        Assertions.assertThat((long)log.countOccurrences("signallog {}")).isEqualTo(1L);
    }

    @Test
    public void shouldIgnoreInvalidSignalType() throws InterruptedException {
        SignalChannelReader genericChannel = (SignalChannelReader)Mockito.mock(SignalChannelReader.class);
        Mockito.when((Object)genericChannel.name()).thenReturn((Object)"generic");
        Mockito.when((Object)genericChannel.read()).thenReturn(List.of(new SignalRecord("log1", "invalidType", "{\"message\": \"signallog {}\"}", Long.valueOf(-1L), Map.of("channelOffset", -1L))), (Object[])new List[]{List.of()});
        LogInterceptor log = new LogInterceptor(SignalProcessor.class);
        this.signalProcess = new SignalProcessor(SourceConnector.class, this.baseConfig(), Map.of(), List.of(genericChannel), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(200L, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertThat((boolean)log.containsMessage("Signal 'log1' has been received but the type 'invalidType' is not recognized")).isTrue());
        this.signalProcess.stop();
    }

    @Test
    public void shouldIgnoreUnparseableData() {
        SignalChannelReader genericChannel = (SignalChannelReader)Mockito.mock(SignalChannelReader.class);
        Mockito.when((Object)genericChannel.name()).thenReturn((Object)"generic");
        Mockito.when((Object)genericChannel.read()).thenReturn(List.of(new SignalRecord("log1", "log", "{\"message: \"signallog\"}", Long.valueOf(-1L), Map.of("channelOffset", -1L))), (Object[])new List[]{List.of()});
        LogInterceptor log = new LogInterceptor(SignalProcessor.class);
        this.signalProcess = new SignalProcessor(SourceConnector.class, this.baseConfig(), Map.of("log", new Log()), List.of(genericChannel), this.documentReader, this.initialOffset);
        this.signalProcess.start();
        Awaitility.await().atMost(40L, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertThat((boolean)log.containsMessage("Signal 'log1' has been received but the data '{\"message: \"signallog\"}' cannot be parsed")).isTrue());
    }

    @Test
    public void shouldRegisterAdditionalAction() {
        SignalChannelReader genericChannel = (SignalChannelReader)Mockito.mock(SignalChannelReader.class);
        Mockito.when((Object)genericChannel.name()).thenReturn((Object)"generic");
        Mockito.when((Object)genericChannel.read()).thenReturn(List.of(new SignalRecord("log1", "custom", "{\"v\": 5}", Long.valueOf(-1L), Map.of("channelOffset", -1L))), (Object[])new List[]{List.of()});
        AtomicInteger called = new AtomicInteger();
        SignalAction testAction = signalPayload -> {
            called.set(signalPayload.data.getInteger((CharSequence)"v"));
            return true;
        };
        this.signalProcess = new SignalProcessor(SourceConnector.class, this.baseConfig(), Map.of(), List.of(genericChannel), this.documentReader, this.initialOffset);
        this.signalProcess.registerSignalAction("custom", testAction);
        this.signalProcess.start();
        Awaitility.await().atMost(40L, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertThat((int)called.intValue()).isEqualTo(5));
    }

    protected CommonConnectorConfig baseConfig() {
        return this.baseConfig(Map.of());
    }

    protected CommonConnectorConfig baseConfig(Map<String, Object> additionalConfig) {
        Configuration.Builder confBuilder = (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal")).with(CommonConnectorConfig.TOPIC_PREFIX, "core")).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 100)).with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,generic");
        additionalConfig.forEach((arg_0, arg_1) -> ((Configuration.Builder)confBuilder).with(arg_0, arg_1));
        return new CommonConnectorConfig(confBuilder.build(), 0){

            protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
                return null;
            }

            public String getContextName() {
                return null;
            }

            public String getConnectorName() {
                return null;
            }
        };
    }

    private static class TestOffset
    extends CommonOffsetContext {
        TestOffset(BaseSourceInfo sourceInfo) {
            super(sourceInfo);
        }

        public Map<String, ?> getOffset() {
            return Map.of("LSN", 12345);
        }

        public Schema getSourceInfoSchema() {
            return null;
        }

        public boolean isSnapshotRunning() {
            return false;
        }

        public void preSnapshotStart() {
        }

        public void preSnapshotCompletion() {
        }

        public void event(DataCollectionId collectionId, Instant timestamp) {
        }

        public TransactionContext getTransactionContext() {
            return null;
        }
    }

    private static class TestPartition
    implements Partition {
        private TestPartition() {
        }

        public Map<String, String> getSourcePartition() {
            throw new UnsupportedOperationException();
        }
    }
}

