package org.apache.eventmesh.connector.rabbitmq.source.connector;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.GetResponse;
import io.cloudevents.CloudEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.connector.rabbitmq.client.RabbitmqClient;
import org.apache.eventmesh.connector.rabbitmq.client.RabbitmqConnectionFactory;
import org.apache.eventmesh.connector.rabbitmq.cloudevent.RabbitmqCloudEvent;
import org.apache.eventmesh.connector.rabbitmq.source.config.RabbitMQSourceConfig;
import org.apache.eventmesh.connector.rabbitmq.source.config.SourceConnectorConfig;
import org.apache.eventmesh.openconnect.api.config.Config;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector.class */
public class RabbitMQSourceConnector implements Source {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQSourceConnector.class);
    private RabbitMQSourceConfig sourceConfig;
    private static final int DEFAULT_BATCH_SIZE = 10;
    private BlockingQueue<CloudEvent> queue;
    private RabbitMQSourceHandler rabbitMQSourceHandler;
    private RabbitmqClient rabbitmqClient;
    private Connection connection;
    private Channel channel;
    private volatile boolean started = false;
    private final RabbitmqConnectionFactory rabbitmqConnectionFactory = new RabbitmqConnectionFactory();
    private final ThreadPoolExecutor executor = ThreadPoolFactory.createThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, Runtime.getRuntime().availableProcessors() * 2, "EventMesh-RabbitMQSourceConnector-");

    /* loaded from: input_file:org/apache/eventmesh/connector/rabbitmq/source/connector/RabbitMQSourceConnector$RabbitMQSourceHandler.class */
    public class RabbitMQSourceHandler implements Runnable {
        private final Channel channel;
        private final SourceConnectorConfig connectorConfig;
        private final AtomicBoolean stop = new AtomicBoolean(false);

        public RabbitMQSourceHandler(Channel channel, SourceConnectorConfig sourceConnectorConfig) {
            this.channel = channel;
            this.connectorConfig = sourceConnectorConfig;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.stop.get()) {
                try {
                    GetResponse basicGet = this.channel.basicGet(this.connectorConfig.getQueueName(), this.connectorConfig.isAutoAck());
                    if (basicGet != null) {
                        CloudEvent convertToCloudEvent = RabbitmqCloudEvent.getFromByteArray(basicGet.getBody()).convertToCloudEvent();
                        if (convertToCloudEvent != null) {
                            RabbitMQSourceConnector.this.queue.add(convertToCloudEvent);
                        }
                        if (!this.connectorConfig.isAutoAck()) {
                            this.channel.basicAck(basicGet.getEnvelope().getDeliveryTag(), false);
                        }
                    }
                } catch (Exception e) {
                    RabbitMQSourceConnector.log.error("[RabbitMQSourceHandler] thread run happen exception.", e);
                }
            }
        }

        public void stop() {
            this.stop.compareAndSet(false, true);
        }
    }

    public Class<? extends Config> configClass() {
        return RabbitMQSourceConfig.class;
    }

    public void init(Config config) throws Exception {
    }

    public void init(ConnectorContext connectorContext) throws Exception {
        this.queue = new LinkedBlockingQueue(1000);
        this.sourceConfig = (RabbitMQSourceConfig) ((SourceConnectorContext) connectorContext).getSourceConfig();
        this.rabbitmqClient = new RabbitmqClient(this.rabbitmqConnectionFactory);
        this.connection = this.rabbitmqClient.getConnection(this.sourceConfig.getConnectorConfig().getHost(), this.sourceConfig.getConnectorConfig().getUsername(), this.sourceConfig.getConnectorConfig().getPasswd(), this.sourceConfig.getConnectorConfig().getPort(), this.sourceConfig.getConnectorConfig().getVirtualHost());
        this.channel = this.rabbitmqConnectionFactory.createChannel(this.connection);
        this.rabbitMQSourceHandler = new RabbitMQSourceHandler(this.channel, this.sourceConfig.getConnectorConfig());
    }

    public void start() throws Exception {
        if (this.started) {
            return;
        }
        this.rabbitmqClient.binding(this.channel, this.sourceConfig.getConnectorConfig().getExchangeType(), this.sourceConfig.getConnectorConfig().getExchangeName(), this.sourceConfig.getConnectorConfig().getRoutingKey(), this.sourceConfig.getConnectorConfig().getQueueName());
        this.executor.execute(this.rabbitMQSourceHandler);
        this.started = true;
    }

    public void commit(ConnectRecord connectRecord) {
    }

    public String name() {
        return this.sourceConfig.getConnectorConfig().getConnectorName();
    }

    public void stop() {
        if (this.started) {
            try {
                this.rabbitmqClient.unbinding(this.channel, this.sourceConfig.getConnectorConfig().getExchangeName(), this.sourceConfig.getConnectorConfig().getRoutingKey(), this.sourceConfig.getConnectorConfig().getQueueName());
                this.rabbitmqClient.closeConnection(this.connection);
                this.rabbitmqClient.closeChannel(this.channel);
                this.rabbitMQSourceHandler.stop();
            } finally {
                this.started = false;
            }
        }
    }

    public List<ConnectRecord> poll() {
        ArrayList arrayList = new ArrayList(DEFAULT_BATCH_SIZE);
        for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
            try {
                CloudEvent poll = this.queue.poll(3L, TimeUnit.SECONDS);
                if (poll == null) {
                    break;
                }
                arrayList.add(CloudEventUtil.convertEventToRecord(poll));
            } catch (InterruptedException e) {
            }
        }
        return arrayList;
    }
}
