package org.apache.samza.coordinator.stream;

import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.class */
public class CoordinatorStreamSystemProducer {
    private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemProducer.class);
    private final Serde<List<?>> keySerde;
    private final Serde<Map<String, Object>> messageSerde;
    private final SystemStream systemStream;
    private final SystemProducer systemProducer;
    private final SystemAdmin systemAdmin;
    private boolean isStarted;

    public CoordinatorStreamSystemProducer(Config config, MetricsRegistry metricsRegistry) {
        SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
        SystemFactory coordinatorSystemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
        SystemAdmin admin = coordinatorSystemFactory.getAdmin(coordinatorSystemStream.getSystem(), config, getClass().getSimpleName());
        SystemProducer producer = coordinatorSystemFactory.getProducer(coordinatorSystemStream.getSystem(), config, metricsRegistry, getClass().getSimpleName());
        this.systemStream = coordinatorSystemStream;
        this.systemProducer = producer;
        this.systemAdmin = admin;
        this.keySerde = new JsonSerde();
        this.messageSerde = new JsonSerde();
    }

    public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
        this.systemStream = systemStream;
        this.systemProducer = systemProducer;
        this.systemAdmin = systemAdmin;
        this.keySerde = new JsonSerde();
        this.messageSerde = new JsonSerde();
    }

    public void register(String str) {
        this.systemProducer.register(str);
    }

    public void start() {
        if (this.isStarted) {
            log.info("Coordinator stream producer already started");
            return;
        }
        log.info("Starting coordinator stream producer.");
        this.systemProducer.start();
        this.systemAdmin.start();
        this.isStarted = true;
    }

    public void stop() {
        log.info("Stopping coordinator stream producer.");
        this.systemProducer.stop();
        this.systemAdmin.stop();
        this.isStarted = false;
    }

    public void send(CoordinatorStreamMessage coordinatorStreamMessage) {
        log.debug("Sending {}", coordinatorStreamMessage);
        try {
            String source = coordinatorStreamMessage.getSource();
            byte[] bytes = this.keySerde.toBytes(Arrays.asList(coordinatorStreamMessage.getKeyArray()));
            byte[] bArr = null;
            if (!coordinatorStreamMessage.isDelete()) {
                bArr = this.messageSerde.toBytes(coordinatorStreamMessage.getMessageMap());
            }
            this.systemProducer.send(source, new OutgoingMessageEnvelope(this.systemStream, 0, bytes, bArr));
        } catch (Exception e) {
            throw new SamzaException(e);
        }
    }

    public void writeConfig(String str, Config config) {
        log.debug("Writing config: {}", config);
        for (Map.Entry entry : config.entrySet()) {
            send(new SetConfig(str, (String) entry.getKey(), (String) entry.getValue()));
        }
        this.systemProducer.flush(str);
    }

    @VisibleForTesting
    boolean isStarted() {
        return this.isStarted;
    }
}
