/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.notification;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.notification.Notification;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.notification.channels.LogNotificationChannel;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.schema.SchemaFactory;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.class)
public class NotificationServiceTest {
    public static final String NOTIFICATION_ID = UUID.fromString("a5dc3ab8-933d-4aae-a994-2e5f9d47acd2").toString();
    private final SourceRecord expectedRecord = this.buildRecord();
    private boolean isConsumerCalled = false;
    @Mock
    private CommonConnectorConfig connectorConfig;

    private SourceRecord buildRecord() {
        Schema keySchema = SchemaBuilder.struct().name("io.debezium.connector.common.NotificationKey").field("id", SchemaBuilder.STRING_SCHEMA).build();
        Schema valueSchema = SchemaBuilder.struct().name("io.debezium.connector.common.Notification").field("id", SchemaBuilder.STRING_SCHEMA).field("type", SchemaBuilder.STRING_SCHEMA).field("aggregate_type", SchemaBuilder.STRING_SCHEMA).field("additional_data", (Schema)SchemaBuilder.map((Schema)SchemaBuilder.STRING_SCHEMA, (Schema)SchemaBuilder.STRING_SCHEMA)).build();
        Struct key = new Struct(keySchema).put("id", (Object)NOTIFICATION_ID);
        Struct value = new Struct(valueSchema).put("id", (Object)NOTIFICATION_ID).put("type", (Object)"Test").put("aggregate_type", (Object)"Test").put("additional_data", Map.of("k1", "v1"));
        return new SourceRecord(Map.of(), Map.of(), "notificationTopic", null, keySchema, (Object)key, valueSchema, (Object)value);
    }

    @Test
    public void testNotificationWithLogNotificationChannel() {
        Mockito.when((Object)this.connectorConfig.getEnabledNotificationChannels()).thenReturn(List.of("log"));
        LogInterceptor logInterceptor = new LogInterceptor(LogNotificationChannel.class);
        NotificationService notificationService = new NotificationService(List.of(new LogNotificationChannel()), this.connectorConfig, new SchemaFactory(), this::consume);
        notificationService.notify(Notification.Builder.builder().withId(NOTIFICATION_ID).withType("Test").withAggregateType("Test").withAdditionalData(Map.of("Key1", "Value1")).build());
        Assertions.assertThat((boolean)logInterceptor.containsMessage("[Notification Service]  {aggregateType='Test', type='Test', additionalData={Key1=Value1}}")).isTrue();
    }

    @Test
    public void notificationSentOnKafkaChannelWillBeCorrectlyProcessed() {
        Mockito.when((Object)this.connectorConfig.getNotificationTopic()).thenReturn((Object)"io.debezium.notification");
        Mockito.when((Object)this.connectorConfig.getEnabledNotificationChannels()).thenReturn(List.of("sink"));
        NotificationService notificationService = new NotificationService(List.of(new SinkNotificationChannel()), this.connectorConfig, new SchemaFactory(), this::consume);
        notificationService.notify(Notification.Builder.builder().withId(NOTIFICATION_ID).withType("Test").withAggregateType("Test").withAdditionalData(Map.of("k1", "v1")).build());
        Assertions.assertThat((boolean)this.isConsumerCalled).isTrue();
    }

    private void consume(SourceRecord sourceRecord) {
        Struct value = (Struct)sourceRecord.value();
        Struct expectedValue = (Struct)this.expectedRecord.value();
        Assertions.assertThat((String)value.toString()).isEqualTo((Object)expectedValue.toString());
        Struct key = (Struct)sourceRecord.key();
        Struct expectedKey = (Struct)this.expectedRecord.key();
        Assertions.assertThat((String)key.toString()).isEqualTo((Object)expectedKey.toString());
        Assertions.assertThat((String)sourceRecord.topic()).isEqualTo((Object)"io.debezium.notification");
        this.isConsumerCalled = true;
    }
}

