package org.wso2.extension.siddhi.io.twitter.source;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.twitter.util.QueryBuilder;
import org.wso2.extension.siddhi.io.twitter.util.TwitterConstants;
import org.wso2.extension.siddhi.io.twitter.util.Util;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import twitter4j.FilterQuery;
import twitter4j.Query;
import twitter4j.TwitterFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;

@Extension(name = "twitter", namespace = TwitterConstants.STATUS_SOURCE, description = "The Twitter source receives events from a Twitter app. The events are received in the form of key-value mappings. \n\nThe following are key values of the map of a tweet and their descriptions:\n\t1.  createdAt: The UTC time at which the Tweet was created.\n\t2.  tweetId: The integer representation for the unique identifier of the Tweet.\n\t3.  text: The actual UTF-8 text of the status update.\n\t4.  user.createdAt: The UTC date and time at which the user account was created on Twitter.\n\t5.  user.id: The integer representation for the unique identifier of the user who posted the Tweet.\n\t6.  user.screenName: The screen name with which the user identifies himself/herself.\n\t7.  user.name: The name of the user (as specified by the user).\n\t8.  user.mail: The `mail.id` of the user.\n\t9.  user.location: The location in which the current user account profile is saved. This parameter can have a null value.\n\t10. hashtags: The hashtags that have been parsed out of the Tweet.\n\t11. userMentions: The other Twitter users who are mentioned in the text of the Tweet.\n\t12. mediaUrls: The media elements uploaded with the Tweet.\n\t13. urls: The URLs included in the text of a Tweet.\n\t14. language: The language in which the Tweet is posted.\n\t15. source: the utility used to post the Tweet as an HTML-formatted string.\n\t16. isRetweet: This indicates whether the Tweet is a Retweet or not.\n\t17. retweetCount: The number of times the Tweet has been retweeted.\n\t18. favouriteCount: This indicates the number of times the Tweet has been liked by Twitter users. The value for this field can be null.\n\t19. geoLocation: The geographic location from which the Tweet was posted by the user or client application. The value for this field can be null.\n\t20. quotedStatusId: This field appears only when the Tweet is a quote Tweet. It displays the integer value Tweet ID of the quoted Tweet.\n\t21. in.reply.to.status.id: If the Tweet is a reply to another Tweet, this field displays the integer representation of the original Tweet's ID. The value for this field can be null.\n\t22. place.id: An ID representing the current location from which the Tweet is read. This is represented as a string and not an integer.\n\t23. place.name: A short, human-readable representation of the name of the place.\n\t24. place.fullName: A complete human-readable representation of the name of the place.\n\t25. place.country_code: A shortened country code representing the country in which the place is located.\n\t26. place.country: The name of the country in which the place is located.\n\t27. track.words: The keywords given by the user to track.\n\t28. polling.query: The query provided by the user.\n\t", parameters = {@Parameter(name = TwitterConstants.CONSUMER_KEY, description = "The API key to access the Twitter application created.", type = {DataType.STRING}), @Parameter(name = TwitterConstants.CONSUMER_SECRET, description = "The API secret to access the Twitter application created.", type = {DataType.STRING}), @Parameter(name = TwitterConstants.ACCESS_TOKEN, description = "The access token to be used to make API requests on behalf of your account.", type = {DataType.STRING}), @Parameter(name = TwitterConstants.ACCESS_SECRET, description = "The access token secret to be used to make API requests on behalf of your account.", type = {DataType.STRING}), @Parameter(name = TwitterConstants.MODE, description = "The mode in which the Twitter application is run. Possible values are as follows: \n`streaming`: This retrieves real time tweets. \n2`polling`: This retrieves historical tweets that were posted within one week.", type = {DataType.STRING}), @Parameter(name = TwitterConstants.STREAMING_FILTER_FILTER_LEVEL, description = "This is assigned to Tweets based on the level of engagement. The filter level can be `none`, `low`, or `medium`. The highest level (i.e., `medium`) corresponds loosely with the `top tweets` filter that the service already offers in its on-site search function.", optional = true, defaultValue = "none", type = {DataType.STRING}), @Parameter(name = TwitterConstants.STREAMING_FILTER_TRACK, description = "This filters the Tweets that include the specified keywords.", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.STRING}), @Parameter(name = TwitterConstants.STREAMING_FILTER_FOLLOW, description = "This filters the Tweets that are tweeted by the specified user IDs.", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.LONG}), @Parameter(name = TwitterConstants.STREAMING_FILTER_LOCATIONS, description = "This filters Tweets based on the locations. Here, you need to specify thelatitude and the longitude of the location e.g., `51.683979:0.278970`.", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.DOUBLE}), @Parameter(name = "language", description = "This filters Tweets that are posted in the specified language, given by an ISO 639-1 code.", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.STRING}), @Parameter(name = TwitterConstants.POLLING_SEARCH_QUERY, description = "This filters Tweets that match the specified UTF-8, URL-encoded search query with a maximum of 500 characters including operators. \n e.g., '@NASA' - mentioning Twitter account 'NASA'.", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.STRING}), @Parameter(name = TwitterConstants.POLLING_SEARCH_COUNT, description = "This returns a specified number of Tweets per page up to a maximum of 100.", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.STRING}), @Parameter(name = TwitterConstants.POLLING_SEARCH_GEOCODE, description = "This returns Tweets by users who are located within a specified radius of the given latitude/longitude. The location is preferentially taken from the Geotagging API, but it falls back to their Twitter profile. The parameter value is specified in the `latitude,longitude,radius` format where theradius units must be specified as either `mi` (miles) or `km` (kilometers).", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.STRING}), @Parameter(name = TwitterConstants.POLLING_SEARCH_RESULT_TYPE, description = "This parameter allows to to specify the whether you want to receive only popular Tweets, the most recent Tweets or a mix of both.The possible values are as follows:\n* `mixed`: This includes both popular and recent results in the response.\n* `recent`: This includes only the most recent results in the response.\n* `popular`: This includes only the most popular results in the response.)", optional = true, defaultValue = "mixed", type = {DataType.STRING}), @Parameter(name = TwitterConstants.POLLING_SEARCH_MAXID, description = "This returns Tweets of which the Tweet ID is equal to or less than (i.e., older than) the specified ID", optional = true, defaultValue = "-1", type = {DataType.LONG}), @Parameter(name = TwitterConstants.POLLING_SEARCH_SINCEID, description = "This returns Tweets of which the Tweet ID is greater than (i.e., more recent than) the specified ID.", optional = true, defaultValue = "-1", type = {DataType.LONG}), @Parameter(name = TwitterConstants.POLLING_SEARCH_UNTIL, description = "This returns Tweets that were created before the given date. Date needs to be formatted as `YYYY-MM-DD`. The search index has a 7-day limit. Therefore, it is not possible to return Tweets that were created more than a week before the current date.", optional = true, defaultValue = TwitterConstants.NULL_STRING, type = {DataType.STRING}), @Parameter(name = TwitterConstants.POLLING_INTERVAL, description = "This specifies a time interval (in seconds) to poll the Tweets periodically.", optional = true, defaultValue = "3600", type = {DataType.LONG})}, examples = {@Example(syntax = "@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description = "In this example, the twitter source starts listening to a random sample of public statuses and passes the events to the `rcvEvents` stream."), @Example(syntax = "@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', track = 'Amazon,Google,Apple', language = 'en', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description = "In this example, the twitter source starts listening to Tweets in English that include the keywords `Amazon`, `google`, or `apple`. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax = "@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', track = 'Amazon,Google,Apple', language = 'en', filter.level = 'low', follow = '11348282,20536157,15670515,17193794,58561993,18139619',location = '51.280430:-0.563160,51.683979:0.278970', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description = "In this example, the twitter source starts listening to Tweets in English that either include the keywords `Amazon`, `google`, `apple`, tweeted by the specified followers, or tweeted from the given location based on the filter.level. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax = "@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = 'happy hour', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text', hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description = "In this example, the twitter source starts polling Tweets that contain the exact phrase `happy hour`. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax = "@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = '#Amazon', since.id = '973439483906420736', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description = "In this example, the twitter source starts polling tweets that contain the `#Amazon` hashtag and have a Tweet Id that is greater than `since.id`. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax = "@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = '@NASA', language = 'en', result.type = 'recent', geocode = '43.913723261972855,-72.54272478125,150km', since.id = 24012619984051000, max.id = 250126199840518145, until = 2018-03-10, @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text', hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description = "In this example, the twitter source starts polling the recent Tweets in English that mention `NASA`, and have Tweet IDs that are greater than the `since.id` and less than the `max.id`. Then these events are passed to the `rcvEvents` stream.")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/twitter/source/TwitterSource.class */
public class TwitterSource extends Source {
    private static final Logger log = Logger.getLogger(TwitterSource.class);
    private TwitterPoller twitterPoller;
    private TwitterStatusListener twitterStatusListener;
    private SourceEventListener sourceEventListener;
    private TwitterStream twitterStream;
    private String consumerKey;
    private String consumerSecret;
    private String accessToken;
    private String accessSecret;
    private String mode;
    private String followParam;
    private String locationParam;
    private String trackParam;
    private String languageParam;
    private String filterLevel;
    private String queryParam;
    private int count;
    private String geocode;
    private long maxId;
    private Long sinceId;
    private String searchLang;
    private String until;
    private String since;
    private String resultType;
    private long pollingInterval;
    private long[] follow;
    private double[][] locations;
    private double latitude;
    private double longitude;
    private double radius;
    private String unitName;
    private Set<String> staticOptionsKeys;
    private ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture scheduledFuture;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.consumerKey = optionHolder.validateAndGetStaticValue(TwitterConstants.CONSUMER_KEY);
        this.consumerSecret = optionHolder.validateAndGetStaticValue(TwitterConstants.CONSUMER_SECRET);
        this.accessToken = optionHolder.validateAndGetStaticValue(TwitterConstants.ACCESS_TOKEN);
        this.accessSecret = optionHolder.validateAndGetStaticValue(TwitterConstants.ACCESS_SECRET);
        this.mode = optionHolder.validateAndGetStaticValue(TwitterConstants.MODE);
        this.locationParam = optionHolder.validateAndGetStaticValue(TwitterConstants.STREAMING_FILTER_LOCATIONS, TwitterConstants.EMPTY_STRING);
        this.followParam = optionHolder.validateAndGetStaticValue(TwitterConstants.STREAMING_FILTER_FOLLOW, TwitterConstants.EMPTY_STRING);
        this.languageParam = optionHolder.validateAndGetStaticValue("language", TwitterConstants.EMPTY_STRING);
        this.trackParam = optionHolder.validateAndGetStaticValue(TwitterConstants.STREAMING_FILTER_TRACK, TwitterConstants.EMPTY_STRING);
        this.filterLevel = optionHolder.validateAndGetStaticValue(TwitterConstants.STREAMING_FILTER_FILTER_LEVEL, "none");
        this.queryParam = optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_QUERY, TwitterConstants.EMPTY_STRING);
        this.count = Integer.parseInt(optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_COUNT, "-1"));
        this.geocode = optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_GEOCODE, TwitterConstants.EMPTY_STRING);
        this.maxId = Long.parseLong(optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_MAXID, "-1"));
        this.sinceId = Long.valueOf(Long.parseLong(optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_SINCEID, "-1")));
        this.searchLang = optionHolder.validateAndGetStaticValue("language", TwitterConstants.EMPTY_STRING);
        this.until = optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_UNTIL, TwitterConstants.EMPTY_STRING);
        this.since = optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_SINCE, TwitterConstants.EMPTY_STRING);
        this.resultType = optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_SEARCH_RESULT_TYPE, "mixed");
        this.pollingInterval = Long.parseLong(optionHolder.validateAndGetStaticValue(TwitterConstants.POLLING_INTERVAL, "3600"));
        this.staticOptionsKeys = optionHolder.getStaticOptionsKeys();
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        validateParameter();
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        try {
            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
            configurationBuilder.setOAuthConsumerKey(this.consumerKey).setOAuthConsumerSecret(this.consumerSecret).setOAuthAccessToken(this.accessToken).setOAuthAccessTokenSecret(this.accessSecret).setJSONStoreEnabled(true);
            if (this.mode.equalsIgnoreCase(TwitterConstants.MODE_STREAMING)) {
                this.twitterStream = new TwitterStreamFactory(configurationBuilder.build()).getInstance();
                FilterQuery createFilterQuery = QueryBuilder.createFilterQuery(this.languageParam, this.trackParam, this.follow, this.filterLevel, this.locations);
                this.twitterStatusListener = new TwitterStatusListener(this.sourceEventListener);
                this.twitterStream.addListener(this.twitterStatusListener);
                if (this.staticOptionsKeys.size() == 6) {
                    this.twitterStream.sample();
                } else {
                    this.twitterStream.filter(createFilterQuery);
                }
            } else if (this.mode.equalsIgnoreCase(TwitterConstants.MODE_POLLING)) {
                this.twitterPoller = new TwitterPoller(new TwitterFactory(configurationBuilder.build()).getInstance(), QueryBuilder.createQuery(this.queryParam, this.count, this.searchLang, this.sinceId.longValue(), this.maxId, this.until, this.since, this.resultType, this.geocode, this.latitude, this.longitude, this.radius, this.unitName), this.sourceEventListener);
                this.scheduledFuture = this.scheduledExecutorService.scheduleAtFixedRate(this.twitterPoller, 0L, this.pollingInterval, TimeUnit.SECONDS);
            }
        } catch (Exception e) {
            throw new ConnectionUnavailableException("Error in connecting with the Twitter API : " + e.getMessage(), e);
        }
    }

    public void disconnect() {
        if (this.twitterStream != null) {
            this.twitterStream.clearListeners();
            if (log.isDebugEnabled()) {
                log.debug("The status listener has been cleared!");
            }
        }
    }

    public void destroy() {
        if (this.twitterStream != null) {
            this.twitterStream.shutdown();
            if (log.isDebugEnabled()) {
                log.debug("The twitter stream has been shutdown !");
            }
        }
        if (this.mode.equalsIgnoreCase(TwitterConstants.MODE_POLLING)) {
            this.scheduledFuture.cancel(true);
            this.twitterPoller.kill();
        }
    }

    public void pause() {
        if (this.mode.equalsIgnoreCase(TwitterConstants.MODE_STREAMING)) {
            this.twitterStatusListener.pause();
        } else {
            this.twitterPoller.pause();
        }
    }

    public void resume() {
        if (this.mode.equalsIgnoreCase(TwitterConstants.MODE_STREAMING)) {
            this.twitterStatusListener.resume();
        } else {
            this.twitterPoller.resume();
        }
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put(TwitterConstants.POLLING_SEARCH_SINCEID, Long.valueOf(this.twitterPoller.tweetId));
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        this.sinceId = Long.valueOf(Long.parseLong(map.get(TwitterConstants.POLLING_SEARCH_SINCEID).toString()));
    }

    private void validateParameter() {
        Query.ResultType valueOf = Query.ResultType.valueOf(this.resultType);
        if (this.mode.equalsIgnoreCase(TwitterConstants.MODE_STREAMING)) {
            for (String str : this.staticOptionsKeys) {
                if (!TwitterConstants.STREAMING_PARAM.contains(str) && !TwitterConstants.MANDATORY_PARAM.contains(str)) {
                    throw new SiddhiAppValidationException(str + " is not valid for the " + this.mode + " " + TwitterConstants.MODE);
                }
            }
        } else {
            if (!this.mode.equalsIgnoreCase(TwitterConstants.MODE_POLLING)) {
                throw new SiddhiAppValidationException("There are only two possible values for mode : streaming or polling. But found '" + this.mode + "'.");
            }
            if (this.queryParam.isEmpty()) {
                throw new SiddhiAppValidationException("For polling mode, query should be given.");
            }
            for (String str2 : this.staticOptionsKeys) {
                if (!TwitterConstants.POLLING_PARAM.contains(str2) && !TwitterConstants.MANDATORY_PARAM.contains(str2)) {
                    throw new SiddhiAppValidationException(str2 + " is not valid for the " + this.mode + " " + TwitterConstants.MODE);
                }
            }
        }
        if (!this.followParam.isEmpty()) {
            this.follow = Util.followParam(this.followParam);
        }
        if (!this.locationParam.isEmpty()) {
            this.locations = Util.locationParam(this.locationParam);
        }
        if (!this.geocode.isEmpty()) {
            Query.Unit unit = null;
            String[] split = this.geocode.split(TwitterConstants.DELIMITER);
            String trim = split[2].trim();
            try {
                this.latitude = Double.parseDouble(split[0]);
                this.longitude = Double.parseDouble(split[1]);
                this.radius = Double.parseDouble(trim.substring(0, trim.length() - 2));
                Query.Unit[] values = Query.Unit.values();
                int length = values.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    Query.Unit unit2 = values[i];
                    if (trim.endsWith(unit2.name())) {
                        unit = unit2;
                        break;
                    }
                    i++;
                }
                if (unit == null) {
                    throw new SiddhiAppValidationException("Unrecognized geocode radius: " + trim + ". Radius units must be specified as either 'mi' (miles) or 'km' (kilometers).");
                }
                this.unitName = unit.name();
            } catch (NumberFormatException e) {
                throw new SiddhiAppValidationException("In geocode, Latitude,Longitude,Radius should be a double value : " + e.getMessage());
            }
        }
        if (!TwitterConstants.FILTER_LEVELS.contains(this.filterLevel)) {
            throw new SiddhiAppValidationException("There are only three possible values for filter.level : low or medium or none. But found '" + this.filterLevel + "'.");
        }
        if (!TwitterConstants.RESULT_TYPES.contains(valueOf)) {
            throw new SiddhiAppValidationException("There are only three possible values for result.type : mixed or popular or recent. But found '" + this.resultType + "'.");
        }
    }
}
