package org.apache.samza.coordinator.stream;

import joptsimple.OptionSet;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.execution.JobPlanner;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/coordinator/stream/CoordinatorStreamWriter.class */
public class CoordinatorStreamWriter {
    private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamWriter.class);
    public static final String SOURCE = "coordinator-stream-writer";
    public static final String SET_CONFIG_TYPE = "set-config";
    private CoordinatorStreamSystemProducer coordinatorStreamSystemProducer;

    public CoordinatorStreamWriter(Config config) {
        this.coordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
    }

    public void start() {
        this.coordinatorStreamSystemProducer.register(SOURCE);
        this.coordinatorStreamSystemProducer.start();
        log.info("Started coordinator stream writer.");
    }

    public void stop() {
        log.info("Stopping the coordinator stream producer.");
        this.coordinatorStreamSystemProducer.stop();
    }

    public void sendMessage(String str, String str2, String str3) {
        if (!str.equals("set-config")) {
            throw new IllegalArgumentException("Type is invalid. The possible values are {set-config}");
        }
        sendSetConfigMessage(str2, str3);
    }

    private void sendSetConfigMessage(String str, String str2) {
        log.info("sent SetConfig message with key = " + str + " and value = " + str2);
        this.coordinatorStreamSystemProducer.send(new SetConfig(SOURCE, str, str2));
    }

    public static void main(String[] strArr) {
        CoordinatorStreamWriterCommandLine coordinatorStreamWriterCommandLine = new CoordinatorStreamWriterCommandLine();
        OptionSet parse = coordinatorStreamWriterCommandLine.parser().parse(strArr);
        MapConfig generateSingleJobConfig = JobPlanner.generateSingleJobConfig(coordinatorStreamWriterCommandLine.loadConfig(parse));
        String loadType = coordinatorStreamWriterCommandLine.loadType(parse);
        String loadKey = coordinatorStreamWriterCommandLine.loadKey(parse);
        String loadValue = coordinatorStreamWriterCommandLine.loadValue(parse);
        CoordinatorStreamWriter coordinatorStreamWriter = new CoordinatorStreamWriter(generateSingleJobConfig);
        coordinatorStreamWriter.start();
        coordinatorStreamWriter.sendMessage(loadType, loadKey, loadValue);
        coordinatorStreamWriter.stop();
    }
}
