package org.apache.camel.component.salesforce.internal.streaming;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceConsumer;
import org.apache.camel.component.salesforce.SalesforceEndpoint;
import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.support.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.class */
public class SubscriptionHelper extends ServiceSupport {
    static final CometDReplayExtension REPLAY_EXTENSION = new CometDReplayExtension();
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
    private static final int CONNECT_TIMEOUT = 110;
    private static final int CHANNEL_TIMEOUT = 40;
    private static final String FAILURE_FIELD = "failure";
    private static final String EXCEPTION_FIELD = "exception";
    private static final int DISCONNECT_INTERVAL = 5000;
    BayeuxClient client;
    private final SalesforceComponent component;
    private final SalesforceSession session;
    private final long maxBackoff;
    private final long backoffIncrement;
    private ClientSessionChannel.MessageListener handshakeListener;
    private ClientSessionChannel.MessageListener connectListener;
    private ClientSessionChannel.MessageListener disconnectListener;
    private volatile String handshakeError;
    private volatile Exception handshakeException;
    private volatile String connectError;
    private volatile Exception connectException;
    private volatile boolean reconnecting;
    private final long timeout = 60000;
    private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap = new ConcurrentHashMap();
    private final AtomicLong restartBackoff = new AtomicLong(0);

    public SubscriptionHelper(SalesforceComponent salesforceComponent) throws SalesforceException {
        this.component = salesforceComponent;
        this.session = salesforceComponent.getSession();
        this.backoffIncrement = salesforceComponent.getConfig().getBackoffIncrement();
        this.maxBackoff = salesforceComponent.getConfig().getMaxBackoff();
    }

    protected void doStart() throws Exception {
        this.client = createClient(this.component);
        this.handshakeError = null;
        this.handshakeException = null;
        this.connectError = null;
        this.connectException = null;
        if (this.handshakeListener == null) {
            this.handshakeListener = new ClientSessionChannel.MessageListener() { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.1
                /* JADX WARN: Type inference failed for: r7v0, types: [java.lang.Throwable, org.apache.camel.component.salesforce.api.SalesforceException] */
                /* JADX WARN: Type inference failed for: r7v1, types: [java.lang.Throwable, org.apache.camel.component.salesforce.api.SalesforceException] */
                public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                    SubscriptionHelper.LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", message);
                    if (message.isSuccessful()) {
                        if (SubscriptionHelper.this.listenerMap.isEmpty()) {
                            return;
                        }
                        SubscriptionHelper.this.reconnecting = true;
                        return;
                    }
                    SubscriptionHelper.LOG.warn("Handshake failure: {}", message);
                    SubscriptionHelper.this.handshakeError = (String) message.get("error");
                    SubscriptionHelper.this.handshakeException = SubscriptionHelper.this.getFailure(message);
                    if (SubscriptionHelper.this.handshakeError != null) {
                        if (SubscriptionHelper.this.handshakeError.startsWith("401::")) {
                            try {
                                SubscriptionHelper.LOG.info("Refreshing OAuth token...");
                                SubscriptionHelper.this.session.login(SubscriptionHelper.this.session.getAccessToken());
                                SubscriptionHelper.LOG.info("Refreshed OAuth token for re-handshake");
                            } catch (SalesforceException e) {
                                SubscriptionHelper.LOG.warn("Error renewing OAuth token on 401 error: " + e.getMessage(), (Throwable) e);
                            }
                        }
                        if (SubscriptionHelper.this.handshakeError.startsWith("403::")) {
                            try {
                                SubscriptionHelper.LOG.info("Cleaning session (logout) from SalesforceSession before restarting client");
                                SubscriptionHelper.this.session.logout();
                            } catch (SalesforceException e2) {
                                SubscriptionHelper.LOG.warn("Error while cleaning session: " + e2.getMessage(), (Throwable) e2);
                            }
                        }
                    }
                    SubscriptionHelper.this.restartClient();
                }
            };
        }
        this.client.getChannel("/meta/handshake").addListener(this.handshakeListener);
        if (this.connectListener == null) {
            this.connectListener = new ClientSessionChannel.MessageListener() { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.2
                public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                    SubscriptionHelper.LOG.debug("[CHANNEL:META_CONNECT]: {}", message);
                    if (!message.isSuccessful()) {
                        SubscriptionHelper.LOG.warn("Connect failure: {}", message);
                        SubscriptionHelper.this.connectError = (String) message.get("error");
                        SubscriptionHelper.this.connectException = SubscriptionHelper.this.getFailure(message);
                        return;
                    }
                    if (SubscriptionHelper.this.reconnecting) {
                        SubscriptionHelper.this.reconnecting = false;
                        SubscriptionHelper.LOG.debug("Refreshing subscriptions to {} channels on reconnect", Integer.valueOf(SubscriptionHelper.this.listenerMap.size()));
                        HashMap hashMap = new HashMap();
                        hashMap.putAll(SubscriptionHelper.this.listenerMap);
                        SubscriptionHelper.this.listenerMap.clear();
                        Iterator it = hashMap.entrySet().iterator();
                        while (it.hasNext()) {
                            SalesforceConsumer salesforceConsumer = (SalesforceConsumer) ((Map.Entry) it.next()).getKey();
                            SubscriptionHelper.this.subscribe(salesforceConsumer.getTopicName(), salesforceConsumer);
                        }
                    }
                }
            };
        }
        this.client.getChannel("/meta/connect").addListener(this.connectListener);
        if (this.disconnectListener == null) {
            this.disconnectListener = new ClientSessionChannel.MessageListener() { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.3
                public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                    SubscriptionHelper.this.restartClient();
                }
            };
        }
        this.client.getChannel("/meta/disconnect").addListener(this.disconnectListener);
        this.client.handshake();
        if (this.client.waitFor(TimeUnit.MILLISECONDS.convert(110L, TimeUnit.SECONDS), BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            return;
        }
        if (this.handshakeException != null) {
            throw new CamelException(String.format("Exception during HANDSHAKE: %s", this.handshakeException.getMessage()), this.handshakeException);
        }
        if (this.handshakeError != null) {
            throw new CamelException(String.format("Error during HANDSHAKE: %s", this.handshakeError));
        }
        if (this.connectException != null) {
            throw new CamelException(String.format("Exception during CONNECT: %s", this.connectException.getMessage()), this.connectException);
        }
        if (this.connectError == null) {
            throw new CamelException(String.format("Handshake request timeout after %s seconds", Integer.valueOf(CONNECT_TIMEOUT)));
        }
        throw new CamelException(String.format("Error during CONNECT: %s", this.connectError));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void restartClient() {
        this.component.getConfig().getHttpClient().getExecutor().execute(new Runnable() { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.4
            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable, java.lang.Exception] */
            /* JADX WARN: Type inference failed for: r1v24, types: [java.lang.Exception] */
            /* JADX WARN: Type inference failed for: r3v1, types: [java.lang.Throwable] */
            @Override // java.lang.Runnable
            public void run() {
                SubscriptionHelper.LOG.info("Restarting on unexpected disconnect from Salesforce...");
                boolean z = false;
                SubscriptionHelper.LOG.debug("Waiting to disconnect...");
                while (!SubscriptionHelper.this.client.isDisconnected()) {
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e) {
                        SubscriptionHelper.LOG.error("Aborting restart on interrupt!");
                        z = true;
                    }
                }
                if (z) {
                    return;
                }
                long andAdd = SubscriptionHelper.this.restartBackoff.getAndAdd(SubscriptionHelper.this.backoffIncrement);
                if (andAdd > SubscriptionHelper.this.maxBackoff) {
                    SubscriptionHelper.LOG.error("Restart aborted after exceeding {} msecs backoff", Long.valueOf(SubscriptionHelper.this.maxBackoff));
                    z = true;
                } else {
                    SubscriptionHelper.LOG.debug("Pausing for {} msecs before restart attempt", Long.valueOf(andAdd));
                    try {
                        Thread.sleep(andAdd);
                    } catch (InterruptedException e2) {
                        SubscriptionHelper.LOG.error("Aborting restart on interrupt!");
                        z = true;
                    }
                }
                if (z) {
                    return;
                }
                SalesforceException salesforceException = new SalesforceException("Unknown error", (Throwable) null);
                try {
                    SubscriptionHelper.this.doStop();
                    SubscriptionHelper.this.doStart();
                } catch (Exception e3) {
                    SubscriptionHelper.LOG.error("Error restarting: " + e3.getMessage(), (Throwable) e3);
                    salesforceException = e3;
                }
                if (SubscriptionHelper.this.client.isHandshook()) {
                    SubscriptionHelper.LOG.info("Successfully restarted!");
                    SubscriptionHelper.this.restartBackoff.set(SubscriptionHelper.this.client.getBackoffIncrement());
                    return;
                }
                SubscriptionHelper.LOG.error("Failed to restart after pausing for {} msecs", Long.valueOf(andAdd));
                if (andAdd + SubscriptionHelper.this.backoffIncrement > SubscriptionHelper.this.maxBackoff) {
                    String str = "Aborting restart attempt due to: " + salesforceException.getMessage();
                    CamelException salesforceException2 = new SalesforceException(str, (Throwable) salesforceException);
                    Iterator it = SubscriptionHelper.this.listenerMap.keySet().iterator();
                    while (it.hasNext()) {
                        ((SalesforceConsumer) it.next()).handleException(str, salesforceException2);
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Exception getFailure(Message message) {
        Exception exc = null;
        if (message.get(EXCEPTION_FIELD) != null) {
            exc = (Exception) message.get(EXCEPTION_FIELD);
        } else if (message.get(FAILURE_FIELD) != null) {
            exc = (Exception) ((Map) message.get(FAILURE_FIELD)).get(EXCEPTION_FIELD);
        }
        return exc;
    }

    protected void doStop() throws Exception {
        this.client.getChannel("/meta/disconnect").removeListener(this.disconnectListener);
        this.client.getChannel("/meta/connect").removeListener(this.connectListener);
        this.client.getChannel("/meta/handshake").removeListener(this.handshakeListener);
        if (!this.client.disconnect(60000L)) {
            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(this.component), 60000L);
        }
        this.client = null;
    }

    static BayeuxClient createClient(SalesforceComponent salesforceComponent) throws SalesforceException {
        SalesforceHttpClient httpClient = salesforceComponent.getConfig().getHttpClient();
        HashMap hashMap = new HashMap();
        hashMap.put("maxNetworkDelay", Long.valueOf(httpClient.getTimeout()));
        final SalesforceSession session = salesforceComponent.getSession();
        if (session.getAccessToken() == null) {
            session.login(null);
        }
        BayeuxClient bayeuxClient = new BayeuxClient(getEndpointUrl(salesforceComponent), new LongPollingTransport(hashMap, httpClient) { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.5
            protected void customize(Request request) {
                super.customize(request);
                request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + session.getAccessToken());
            }
        }, new ClientTransport[0]);
        bayeuxClient.addExtension(REPLAY_EXTENSION);
        return bayeuxClient;
    }

    public void subscribe(final String str, final SalesforceConsumer salesforceConsumer) {
        final String channelName = getChannelName(str);
        setupReplay((SalesforceEndpoint) salesforceConsumer.getEndpoint());
        LOG.info("Subscribing to channel {}...", channelName);
        final ClientSessionChannel.MessageListener messageListener = new ClientSessionChannel.MessageListener() { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.6
            public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                SubscriptionHelper.LOG.debug("Received Message: {}", message);
                salesforceConsumer.processMessage(clientSessionChannel, message);
            }
        };
        ClientSessionChannel channel = this.client.getChannel(channelName);
        this.client.getChannel("/meta/subscribe").addListener(new ClientSessionChannel.MessageListener() { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.7
            public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                SubscriptionHelper.LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", message);
                String obj = message.get("subscription").toString();
                if (channelName.equals(obj)) {
                    if (message.isSuccessful()) {
                        SubscriptionHelper.LOG.info("Subscribed to channel {}", obj);
                        SubscriptionHelper.this.listenerMap.put(salesforceConsumer, messageListener);
                    } else {
                        String str2 = (String) message.get("error");
                        if (str2 == null) {
                            str2 = "Missing error message";
                        }
                        Exception failure = SubscriptionHelper.this.getFailure(message);
                        Object[] objArr = new Object[2];
                        objArr[0] = str;
                        objArr[1] = failure != null ? failure.getMessage() : str2;
                        String format = String.format("Error subscribing to %s: %s", objArr);
                        salesforceConsumer.handleException(format, new SalesforceException(format, failure));
                    }
                    SubscriptionHelper.this.client.getChannel("/meta/subscribe").removeListener(this);
                }
            }
        });
        channel.subscribe(messageListener);
    }

    void setupReplay(SalesforceEndpoint salesforceEndpoint) {
        String topicName = salesforceEndpoint.getTopicName();
        Optional<Long> determineReplayIdFor = determineReplayIdFor(salesforceEndpoint, topicName);
        if (determineReplayIdFor.isPresent()) {
            String channelName = getChannelName(topicName);
            Long l = determineReplayIdFor.get();
            LOG.info("Set Replay extension to replay from `{}` for channel `{}`", l, channelName);
            REPLAY_EXTENSION.addChannelReplayId(channelName, l.longValue());
        }
    }

    static Optional<Long> determineReplayIdFor(SalesforceEndpoint salesforceEndpoint, String str) {
        String channelName = getChannelName(str);
        Long replayId = salesforceEndpoint.getReplayId();
        SalesforceComponent m6getComponent = salesforceEndpoint.m6getComponent();
        SalesforceEndpointConfig configuration = salesforceEndpoint.getConfiguration();
        Map<String, Long> initialReplayIdMap = configuration.getInitialReplayIdMap();
        Long orDefault = initialReplayIdMap.getOrDefault(str, initialReplayIdMap.get(channelName));
        Long defaultReplayId = configuration.getDefaultReplayId();
        SalesforceEndpointConfig config = m6getComponent.getConfig();
        Map<String, Long> initialReplayIdMap2 = config.getInitialReplayIdMap();
        return Stream.of((Object[]) new Long[]{replayId, orDefault, initialReplayIdMap2.getOrDefault(str, initialReplayIdMap2.get(channelName)), defaultReplayId, config.getDefaultReplayId()}).filter((v0) -> {
            return Objects.nonNull(v0);
        }).findFirst();
    }

    static String getChannelName(String str) {
        return "/topic/" + str;
    }

    public void unsubscribe(String str, SalesforceConsumer salesforceConsumer) throws CamelException {
        final String channelName = getChannelName(str);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final String[] strArr = {null};
        final Exception[] excArr = {null};
        ClientSessionChannel.MessageListener messageListener = new ClientSessionChannel.MessageListener() { // from class: org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.8
            public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                SubscriptionHelper.LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", message);
                Object obj = message.get("subscription");
                if (obj != null) {
                    String obj2 = obj.toString();
                    if (channelName.equals(obj2)) {
                        if (message.isSuccessful()) {
                            SubscriptionHelper.LOG.info("Unsubscribed from channel {}", obj2);
                        } else {
                            strArr[0] = (String) message.get("error");
                            excArr[0] = SubscriptionHelper.this.getFailure(message);
                        }
                        countDownLatch.countDown();
                    }
                }
            }
        };
        this.client.getChannel("/meta/unsubscribe").addListener(messageListener);
        try {
            ClientSessionChannel.MessageListener remove = this.listenerMap.remove(salesforceConsumer);
            if (remove != null) {
                LOG.info("Unsubscribing from channel {}...", channelName);
                this.client.getChannel(channelName).unsubscribe(remove);
                try {
                    if (!countDownLatch.await(40L, TimeUnit.SECONDS)) {
                        throw new CamelException(excArr[0] != null ? String.format("Error unsubscribing from topic %s: %s", str, excArr[0].getMessage()) : strArr[0] != null ? String.format("Error unsubscribing from topic %s: %s", str, strArr[0]) : String.format("Timeout error unsubscribing from topic %s after %s seconds", str, Integer.valueOf(CHANNEL_TIMEOUT)), excArr[0]);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } finally {
            this.client.getChannel("/meta/unsubscribe").removeListener(messageListener);
        }
    }

    static String getEndpointUrl(SalesforceComponent salesforceComponent) {
        if (Double.valueOf(salesforceComponent.getConfig().getApiVersion()).doubleValue() == 36.0d) {
            if ((salesforceComponent.getConfig().getDefaultReplayId() == null && salesforceComponent.getConfig().getInitialReplayIdMap().isEmpty()) ? false : true) {
                return salesforceComponent.getSession().getInstanceUrl() + "/cometd/replay/" + salesforceComponent.getConfig().getApiVersion();
            }
        }
        return salesforceComponent.getSession().getInstanceUrl() + "/cometd/" + salesforceComponent.getConfig().getApiVersion();
    }
}
