package org.wso2.extension.siddhi.io.twitter.source;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.twitter.util.Util;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import twitter4j.FilterQuery;
import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;

/* loaded from: input_file:org/wso2/extension/siddhi/io/twitter/source/TwitterConsumer.class */
public enum TwitterConsumer {
    INSTANCE;

    private static final Logger log = Logger.getLogger(TwitterConsumer.class);
    private boolean paused;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();
    Map<String, Object> event = new HashMap();
    long tweetId = -1;

    /* loaded from: input_file:org/wso2/extension/siddhi/io/twitter/source/TwitterConsumer$TwitterPoller.class */
    class TwitterPoller implements Runnable {
        Twitter twitter;
        Query query;
        QueryResult result;
        SourceEventListener sourceEventListener;

        TwitterPoller(Twitter twitter, Query query, SourceEventListener sourceEventListener) {
            this.twitter = twitter;
            this.query = query;
            this.sourceEventListener = sourceEventListener;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            do {
                try {
                    this.result = this.twitter.search(this.query);
                    for (Status status : this.result.getTweets()) {
                        TwitterConsumer.this.event = Util.createMap(status);
                        if (z) {
                            TwitterConsumer.this.tweetId = status.getId();
                            z = false;
                        }
                        if (TwitterConsumer.this.paused) {
                            TwitterConsumer.this.lock.lock();
                            while (TwitterConsumer.this.paused) {
                                try {
                                    try {
                                        TwitterConsumer.this.condition.await();
                                    } catch (InterruptedException e) {
                                        Thread.currentThread().interrupt();
                                        TwitterConsumer.this.lock.unlock();
                                    }
                                } catch (Throwable th) {
                                    TwitterConsumer.this.lock.unlock();
                                    throw th;
                                    break;
                                }
                            }
                            TwitterConsumer.this.lock.unlock();
                        }
                        this.sourceEventListener.onEvent(TwitterConsumer.this.event, (String[]) null);
                    }
                    if (this.result.nextQuery() != null) {
                        this.query = this.result.nextQuery();
                    } else {
                        this.query.setSinceId(TwitterConsumer.this.tweetId);
                        this.query.setMaxId(-1L);
                    }
                    checkRateLimit(this.result);
                } catch (TwitterException e2) {
                    TwitterConsumer.log.error("Failed to search tweets: " + e2.getMessage());
                }
            } while (this.result.nextQuery() != null);
        }

        private void checkRateLimit(QueryResult queryResult) {
            if (queryResult.getRateLimitStatus().getRemaining() <= 0) {
                try {
                    Thread.sleep(queryResult.getRateLimitStatus().getSecondsUntilReset());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    TwitterConsumer.log.error("Thread was interrupted during wait : " + e);
                }
            }
        }
    }

    /* loaded from: input_file:org/wso2/extension/siddhi/io/twitter/source/TwitterConsumer$TwitterStatusListener.class */
    class TwitterStatusListener implements StatusListener {
        private SourceEventListener sourceEventListener;

        TwitterStatusListener(SourceEventListener sourceEventListener) {
            this.sourceEventListener = sourceEventListener;
        }

        @Override // twitter4j.StatusListener
        public void onStatus(Status status) {
            if (TwitterConsumer.this.paused) {
                TwitterConsumer.this.lock.lock();
                while (TwitterConsumer.this.paused) {
                    try {
                        TwitterConsumer.this.condition.await();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        TwitterConsumer.this.lock.unlock();
                    }
                }
            }
            TwitterConsumer.this.event = Util.createMap(status);
            this.sourceEventListener.onEvent(TwitterConsumer.this.event, (String[]) null);
        }

        @Override // twitter4j.StatusListener
        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
            TwitterConsumer.log.debug("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
        }

        @Override // twitter4j.StatusListener
        public void onTrackLimitationNotice(int i) {
            TwitterConsumer.log.debug("Got track limitation notice: " + i);
        }

        @Override // twitter4j.StatusListener
        public void onScrubGeo(long j, long j2) {
            TwitterConsumer.log.debug("Got scrub_geo event userId:" + j + " upToStatusId:" + j2);
        }

        @Override // twitter4j.StatusListener
        public void onStallWarning(StallWarning stallWarning) {
            TwitterConsumer.log.debug("Got stall warning:" + stallWarning);
        }

        @Override // twitter4j.StreamListener
        public void onException(Exception exc) {
            TwitterConsumer.log.error("Twitter source threw an exception", exc);
        }
    }

    TwitterConsumer() {
    }

    public void consume(TwitterStream twitterStream, SourceEventListener sourceEventListener, FilterQuery filterQuery, int i) {
        twitterStream.addListener(new TwitterStatusListener(sourceEventListener));
        if (i == 6) {
            twitterStream.sample();
        } else {
            twitterStream.filter(filterQuery);
        }
    }

    public void consume(Twitter twitter, Query query, SourceEventListener sourceEventListener, SiddhiAppContext siddhiAppContext, long j) {
        siddhiAppContext.getScheduledExecutorService().scheduleAtFixedRate(new TwitterPoller(twitter, query, sourceEventListener), 0L, j, TimeUnit.SECONDS);
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
        try {
            this.lock.lock();
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
