package io.latent.storm.rabbitmq;

import backtype.storm.topology.ReportedFailedException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.latent.storm.rabbitmq.Declarator;
import io.latent.storm.rabbitmq.config.ProducerConfig;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/latent/storm/rabbitmq/RabbitMQProducer.class */
public class RabbitMQProducer implements Serializable {
    private final Declarator declarator;
    private transient Logger logger;
    private transient ProducerConfig producerConfig;
    private transient Connection connection;
    private transient Channel channel;

    public RabbitMQProducer() {
        this(new Declarator.NoOp());
    }

    public RabbitMQProducer(Declarator declarator) {
        this.declarator = declarator;
    }

    public void send(Message message) {
        send(message, "");
    }

    public void send(Message message, String str) {
        if (message == Message.NONE) {
            return;
        }
        reinitIfNecessary();
        if (this.channel == null) {
            throw new ReportedFailedException("No connection to RabbitMQ");
        }
        try {
            this.channel.basicPublish(this.producerConfig.getExchangeName(), str, new AMQP.BasicProperties.Builder().contentType(this.producerConfig.getContentType()).contentEncoding(this.producerConfig.getContentEncoding()).deliveryMode(Integer.valueOf(this.producerConfig.isPersistent() ? 2 : 1)).build(), message.getBody());
        } catch (AlreadyClosedException e) {
            this.logger.error("already closed exception while attempting to send message", e);
            reset();
            throw new ReportedFailedException(e);
        } catch (IOException e2) {
            this.logger.error("io exception while attempting to send message", e2);
            reset();
            throw new ReportedFailedException(e2);
        }
    }

    public void open(Map map) {
        this.logger = LoggerFactory.getLogger(RabbitMQProducer.class);
        this.producerConfig = ProducerConfig.getFromStormConfig(map);
        internalOpen();
    }

    private void internalOpen() {
        try {
            this.connection = createConnection();
            this.channel = this.connection.createChannel();
            this.declarator.execute(this.channel);
        } catch (Exception e) {
            this.logger.error("could not open connection on exchange " + this.producerConfig.getExchangeName());
            reset();
        }
    }

    public void close() {
        try {
            if (this.channel != null && this.channel.isOpen()) {
                this.channel.close();
            }
        } catch (Exception e) {
            this.logger.debug("error closing channel", e);
        }
        try {
            this.logger.info("closing connection to rabbitmq: " + this.connection);
            this.connection.close();
        } catch (Exception e2) {
            this.logger.debug("error closing connection", e2);
        }
        this.channel = null;
        this.connection = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reset() {
        this.channel = null;
    }

    private void reinitIfNecessary() {
        if (this.channel == null) {
            close();
            internalOpen();
        }
    }

    private Connection createConnection() throws IOException {
        Connection newConnection = this.producerConfig.getConnectionConfig().asConnectionFactory().newConnection();
        newConnection.addShutdownListener(new ShutdownListener() { // from class: io.latent.storm.rabbitmq.RabbitMQProducer.1
            public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                RabbitMQProducer.this.logger.error("shutdown signal received", shutdownSignalException);
                RabbitMQProducer.this.reset();
            }
        });
        this.logger.info("connected to rabbitmq: " + newConnection + " for " + this.producerConfig.getExchangeName());
        return newConnection;
    }
}
