package org.apache.flume.source.jms;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurables;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractPollableSource;
import org.apache.flume.source.jms.DefaultJMSMessageConverter;
import org.apache.flume.source.jms.JMSMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/flume/source/jms/JMSSource.class */
public class JMSSource extends AbstractPollableSource implements BatchSizeSupported {
    private static final Logger logger = LoggerFactory.getLogger(JMSSource.class);
    private static final String JAVA_SCHEME = "java";
    private final InitialContextFactory initialContextFactory;
    private ConnectionFactory connectionFactory;
    private int batchSize;
    private JMSMessageConverter converter;
    private JMSMessageConsumer consumer;
    private String initialContextFactoryName;
    private String providerUrl;
    private String destinationName;
    private JMSDestinationType destinationType;
    private JMSDestinationLocator destinationLocator;
    private String messageSelector;
    private Optional<String> userName;
    private Optional<String> password;
    private SourceCounter sourceCounter;
    private int errorThreshold;
    private long pollTimeout;
    private Optional<String> clientId;
    private boolean createDurableSubscription;
    private String durableSubscriptionName;
    private int jmsExceptionCounter;
    private InitialContext initialContext;

    public JMSSource() {
        this(new InitialContextFactory());
    }

    public JMSSource(InitialContextFactory initialContextFactory) {
        this.initialContextFactory = initialContextFactory;
    }

    protected void doConfigure(Context context) throws FlumeException {
        this.sourceCounter = new SourceCounter(getName());
        this.initialContextFactoryName = context.getString(JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY, JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME).trim();
        this.providerUrl = context.getString(JMSSourceConfiguration.PROVIDER_URL, JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME).trim();
        this.destinationName = context.getString(JMSSourceConfiguration.DESTINATION_NAME, JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME).trim();
        String upperCase = context.getString(JMSSourceConfiguration.DESTINATION_TYPE, JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME).trim().toUpperCase(Locale.ENGLISH);
        String upperCase2 = context.getString(JMSSourceConfiguration.DESTINATION_LOCATOR, JMSSourceConfiguration.DESTINATION_LOCATOR_DEFAULT).trim().toUpperCase(Locale.ENGLISH);
        this.messageSelector = context.getString(JMSSourceConfiguration.MESSAGE_SELECTOR, JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME).trim();
        this.batchSize = context.getInteger(JMSSourceConfiguration.BATCH_SIZE, 100).intValue();
        this.errorThreshold = context.getInteger(JMSSourceConfiguration.ERROR_THRESHOLD, 10).intValue();
        this.userName = Optional.fromNullable(context.getString(JMSSourceConfiguration.USERNAME));
        this.pollTimeout = context.getLong(JMSSourceConfiguration.POLL_TIMEOUT, 1000L).longValue();
        this.clientId = Optional.fromNullable(context.getString(JMSSourceConfiguration.CLIENT_ID));
        this.createDurableSubscription = context.getBoolean(JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION, false).booleanValue();
        this.durableSubscriptionName = context.getString(JMSSourceConfiguration.DURABLE_SUBSCRIPTION_NAME, JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME);
        String trim = context.getString(JMSSourceConfiguration.PASSWORD_FILE, JMSSourceConfiguration.DEFAULT_DURABLE_SUBSCRIPTION_NAME).trim();
        if (trim.isEmpty()) {
            this.password = Optional.absent();
        } else {
            try {
                this.password = Optional.of(Files.toString(new File(trim), Charsets.UTF_8).trim());
            } catch (IOException e) {
                throw new FlumeException(String.format("Could not read password file %s", trim), e);
            }
        }
        String trim2 = context.getString(JMSSourceConfiguration.CONVERTER_TYPE, JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT).trim();
        if (JMSSourceConfiguration.CONVERTER_TYPE_DEFAULT.equalsIgnoreCase(trim2)) {
            trim2 = DefaultJMSMessageConverter.Builder.class.getName();
        }
        Context context2 = new Context(context.getSubProperties("converter."));
        try {
            Class<?> cls = Class.forName(trim2);
            if (JMSMessageConverter.Builder.class.isAssignableFrom(cls)) {
                this.converter = ((JMSMessageConverter.Builder) cls.newInstance()).build(context2);
            } else {
                Preconditions.checkState(JMSMessageConverter.class.isAssignableFrom(cls), String.format("Class %s is not a subclass of JMSMessageConverter", cls.getName()));
                this.converter = (JMSMessageConverter) cls.newInstance();
                boolean configure = Configurables.configure(this.converter, context2);
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Attempted configuration of %s, result = %s", trim2, String.valueOf(configure)));
                }
            }
            String trim3 = context.getString(JMSSourceConfiguration.CONNECTION_FACTORY, JMSSourceConfiguration.CONNECTION_FACTORY_DEFAULT).trim();
            try {
                String scheme = new URI(trim3).getScheme();
                assertTrue(scheme == null || scheme.equals(JAVA_SCHEME), "Unsupported JNDI URI: " + trim3);
            } catch (URISyntaxException e2) {
                logger.warn("Invalid JNDI URI - {}", trim3);
            }
            assertNotEmpty(this.initialContextFactoryName, String.format("Initial Context Factory is empty. This is specified by %s", JMSSourceConfiguration.INITIAL_CONTEXT_FACTORY));
            assertNotEmpty(this.providerUrl, String.format("Provider URL is empty. This is specified by %s", JMSSourceConfiguration.PROVIDER_URL));
            assertNotEmpty(this.destinationName, String.format("Destination Name is empty. This is specified by %s", JMSSourceConfiguration.DESTINATION_NAME));
            assertNotEmpty(upperCase, String.format("Destination Type is empty. This is specified by %s", JMSSourceConfiguration.DESTINATION_TYPE));
            try {
                this.destinationType = JMSDestinationType.valueOf(upperCase);
                if (this.createDurableSubscription) {
                    if (JMSDestinationType.TOPIC != this.destinationType) {
                        throw new FlumeException(String.format("Only Destination type '%s' supports durable subscriptions.", JMSDestinationType.TOPIC.toString()));
                    }
                    if (!this.clientId.isPresent()) {
                        throw new FlumeException(String.format("You have to specify '%s' when using durable subscriptions.", JMSSourceConfiguration.CLIENT_ID));
                    }
                    if (StringUtils.isEmpty(this.durableSubscriptionName)) {
                        throw new FlumeException(String.format("If '%s' is set to true, '%s' has to be specified.", JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION, JMSSourceConfiguration.DURABLE_SUBSCRIPTION_NAME));
                    }
                } else if (!StringUtils.isEmpty(this.durableSubscriptionName)) {
                    logger.warn(String.format("'%s' is set, but '%s' is false.If you want to create a durable subscription, set %s to true.", JMSSourceConfiguration.DURABLE_SUBSCRIPTION_NAME, JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION, JMSSourceConfiguration.CREATE_DURABLE_SUBSCRIPTION));
                }
                try {
                    this.destinationLocator = JMSDestinationLocator.valueOf(upperCase2);
                    Preconditions.checkArgument(this.batchSize > 0, "Batch size must be greater than 0");
                    try {
                        Properties properties = new Properties();
                        properties.setProperty("java.naming.factory.initial", this.initialContextFactoryName);
                        properties.setProperty("java.naming.provider.url", this.providerUrl);
                        if (this.userName.isPresent()) {
                            properties.setProperty("java.naming.security.principal", (String) this.userName.get());
                        }
                        if (this.password.isPresent()) {
                            properties.setProperty("java.naming.security.credentials", (String) this.password.get());
                        }
                        this.initialContext = this.initialContextFactory.create(properties);
                        try {
                            this.connectionFactory = (ConnectionFactory) this.initialContext.lookup(trim3);
                        } catch (NamingException e3) {
                            throw new FlumeException("Could not lookup ConnectionFactory", e3);
                        }
                    } catch (NamingException e4) {
                        throw new FlumeException(String.format("Could not create initial context %s provider %s", this.initialContextFactoryName, this.providerUrl), e4);
                    }
                } catch (IllegalArgumentException e5) {
                    throw new FlumeException(String.format("Destination locator '%s' is invalid.", upperCase2), e5);
                }
            } catch (IllegalArgumentException e6) {
                throw new FlumeException(String.format("Destination type '%s' is invalid.", upperCase), e6);
            }
        } catch (Exception e7) {
            throw new FlumeException(String.format("Unable to create instance of converter %s", trim2), e7);
        }
    }

    private void assertNotEmpty(String str, String str2) {
        Preconditions.checkArgument(!str.isEmpty(), str2);
    }

    private void assertTrue(boolean z, String str) {
        Preconditions.checkArgument(z, str);
    }

    protected synchronized PollableSource.Status doProcess() throws EventDeliveryException {
        try {
            try {
                try {
                    try {
                        if (this.consumer == null) {
                            this.consumer = createConsumer();
                        }
                        List<Event> take = this.consumer.take();
                        int size = take.size();
                        if (size == 0) {
                            PollableSource.Status status = PollableSource.Status.BACKOFF;
                            if (0 != 0) {
                                if (this.consumer != null) {
                                    this.consumer.rollback();
                                }
                            } else if (this.consumer != null) {
                                this.consumer.commit();
                                this.jmsExceptionCounter = 0;
                            }
                            return status;
                        }
                        this.sourceCounter.incrementAppendBatchReceivedCount();
                        this.sourceCounter.addToEventReceivedCount(size);
                        getChannelProcessor().processEventBatch(take);
                        this.sourceCounter.addToEventAcceptedCount(size);
                        this.sourceCounter.incrementAppendBatchAcceptedCount();
                        PollableSource.Status status2 = PollableSource.Status.READY;
                        if (0 != 0) {
                            if (this.consumer != null) {
                                this.consumer.rollback();
                            }
                        } else if (this.consumer != null) {
                            this.consumer.commit();
                            this.jmsExceptionCounter = 0;
                        }
                        return status2;
                    } catch (JMSException e) {
                        logger.warn("JMSException consuming events", e);
                        int i = this.jmsExceptionCounter + 1;
                        this.jmsExceptionCounter = i;
                        if (i > this.errorThreshold && this.consumer != null) {
                            logger.warn("Exceeded JMSException threshold, closing consumer");
                            this.sourceCounter.incrementEventReadFail();
                            this.consumer.rollback();
                            this.consumer.close();
                            this.consumer = null;
                        }
                        if (1 != 0) {
                            if (this.consumer != null) {
                                this.consumer.rollback();
                            }
                        } else if (this.consumer != null) {
                            this.consumer.commit();
                            this.jmsExceptionCounter = 0;
                        }
                        return PollableSource.Status.BACKOFF;
                    }
                } catch (Throwable th) {
                    logger.error("Unexpected error processing events", th);
                    this.sourceCounter.incrementEventReadFail();
                    if (th instanceof Error) {
                        throw ((Error) th);
                    }
                    if (1 != 0) {
                        if (this.consumer != null) {
                            this.consumer.rollback();
                        }
                    } else if (this.consumer != null) {
                        this.consumer.commit();
                        this.jmsExceptionCounter = 0;
                    }
                    return PollableSource.Status.BACKOFF;
                }
            } catch (ChannelException e2) {
                logger.warn("Error appending event to channel. Channel might be full. Consider increasing the channel capacity or make sure the sinks perform faster.", e2);
                this.sourceCounter.incrementChannelWriteFail();
                if (1 != 0) {
                    if (this.consumer != null) {
                        this.consumer.rollback();
                    }
                } else if (this.consumer != null) {
                    this.consumer.commit();
                    this.jmsExceptionCounter = 0;
                }
                return PollableSource.Status.BACKOFF;
            }
        } catch (Throwable th2) {
            if (1 != 0) {
                if (this.consumer != null) {
                    this.consumer.rollback();
                }
            } else if (this.consumer != null) {
                this.consumer.commit();
                this.jmsExceptionCounter = 0;
            }
            throw th2;
        }
    }

    protected synchronized void doStart() {
        try {
            this.consumer = createConsumer();
            this.jmsExceptionCounter = 0;
            this.sourceCounter.start();
        } catch (JMSException e) {
            throw new FlumeException("Unable to create consumer", e);
        }
    }

    protected synchronized void doStop() {
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
        this.sourceCounter.stop();
    }

    @VisibleForTesting
    JMSMessageConsumer createConsumer() throws JMSException {
        logger.info("Creating new consumer for " + this.destinationName);
        JMSMessageConsumer jMSMessageConsumer = new JMSMessageConsumer(this.initialContext, this.connectionFactory, this.destinationName, this.destinationLocator, this.destinationType, this.messageSelector, this.batchSize, this.pollTimeout, this.converter, this.userName, this.password, this.clientId, this.createDurableSubscription, this.durableSubscriptionName);
        this.jmsExceptionCounter = 0;
        return jMSMessageConsumer;
    }

    public long getBatchSize() {
        return this.batchSize;
    }
}
