package com.espertech.esperio.kafka;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EPStatementState;
import com.espertech.esper.client.EPStatementStateListener;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
import com.espertech.esper.client.util.JSONEventRenderer;
import com.espertech.esper.epl.annotation.AnnotationUtil;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:production/esperio-kafka/com/espertech/esperio/kafka/EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class */
public class EsperIOKafkaOutputFlowControllerByAnnotatedStmt implements EsperIOKafkaOutputFlowController {
    private static final Logger log = LoggerFactory.getLogger(EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class);
    private KafkaProducer producer;
    private EPServiceProvider engine;
    private Set<String> topics = new LinkedHashSet();

    /* loaded from: input_file:production/esperio-kafka/com/espertech/esperio/kafka/EsperIOKafkaOutputFlowControllerByAnnotatedStmt$KafkaOutputDefaultListener.class */
    public static class KafkaOutputDefaultListener implements UpdateListener {
        private final JSONEventRenderer jsonEventRenderer;
        private final KafkaProducer producer;
        private final Set<String> topics;

        public KafkaOutputDefaultListener(EPServiceProvider ePServiceProvider, EPStatement ePStatement, KafkaProducer kafkaProducer, Set<String> set) {
            this.jsonEventRenderer = ePServiceProvider.getEPRuntime().getEventRenderer().getJSONRenderer(ePStatement.getEventType());
            this.producer = kafkaProducer;
            this.topics = set;
        }

        @Override // com.espertech.esper.client.UpdateListener
        public void update(EventBean[] eventBeanArr, EventBean[] eventBeanArr2) {
            if (eventBeanArr == null) {
                return;
            }
            for (EventBean eventBean : eventBeanArr) {
                String render = this.jsonEventRenderer.render(eventBean);
                Iterator<String> it = this.topics.iterator();
                while (it.hasNext()) {
                    this.producer.send(new ProducerRecord(it.next(), render));
                }
            }
        }
    }

    @Override // com.espertech.esperio.kafka.EsperIOKafkaOutputFlowController
    public void initialize(EsperIOKafkaOutputFlowControllerContext esperIOKafkaOutputFlowControllerContext) {
        this.engine = esperIOKafkaOutputFlowControllerContext.getEngine();
        try {
            this.producer = new KafkaProducer(esperIOKafkaOutputFlowControllerContext.getProperties());
        } catch (Throwable th) {
            log.error("Error obtaining Kafka producer for URI '{}': {}", new Object[]{esperIOKafkaOutputFlowControllerContext.getEngine().getURI(), th.getMessage(), th});
        }
        for (String str : EsperIOKafkaInputAdapter.getRequiredProperty(esperIOKafkaOutputFlowControllerContext.getProperties(), EsperIOKafkaConfig.TOPICS_CONFIG).split(",")) {
            if (str.trim().length() > 0) {
                this.topics.add(str.trim());
            }
        }
        for (String str2 : esperIOKafkaOutputFlowControllerContext.getEngine().getEPAdministrator().getStatementNames()) {
            processStatement(this.engine.getEPAdministrator().getStatement(str2));
        }
        this.engine.addStatementStateListener(new EPStatementStateListener() { // from class: com.espertech.esperio.kafka.EsperIOKafkaOutputFlowControllerByAnnotatedStmt.1
            @Override // com.espertech.esper.client.EPStatementStateListener
            public void onStatementCreate(EPServiceProvider ePServiceProvider, EPStatement ePStatement) {
            }

            @Override // com.espertech.esper.client.EPStatementStateListener
            public void onStatementStateChange(EPServiceProvider ePServiceProvider, EPStatement ePStatement) {
                if (ePStatement.getState() == EPStatementState.STARTED) {
                    EsperIOKafkaOutputFlowControllerByAnnotatedStmt.this.processStatement(ePStatement);
                } else if (ePStatement.getState() == EPStatementState.STOPPED || ePStatement.getState() == EPStatementState.DESTROYED) {
                    EsperIOKafkaOutputFlowControllerByAnnotatedStmt.this.detachStatement(ePStatement);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processStatement(EPStatement ePStatement) {
        if (ePStatement == null || AnnotationUtil.findAnnotation(ePStatement.getAnnotations(), KafkaOutputDefault.class) == null) {
            return;
        }
        ePStatement.addListener(new KafkaOutputDefaultListener(this.engine, ePStatement, this.producer, this.topics));
        log.info("Added Kafka-Output-Adapter listener to statement '{}' topics {}", ePStatement.getName(), this.topics.toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void detachStatement(EPStatement ePStatement) {
        Iterator<UpdateListener> updateListeners = ePStatement.getUpdateListeners();
        UpdateListener updateListener = null;
        while (true) {
            if (!updateListeners.hasNext()) {
                break;
            }
            UpdateListener next = updateListeners.next();
            if (next instanceof KafkaOutputDefaultListener) {
                updateListener = next;
                break;
            }
        }
        if (updateListener != null) {
            ePStatement.removeListener(updateListener);
        }
        log.info("Removed Kafka-Output-Adapter listener from statement '{}'", ePStatement.getName());
    }

    @Override // com.espertech.esperio.kafka.EsperIOKafkaOutputFlowController
    public void close() {
        this.producer.close();
    }
}
