/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.extension.siddhi.io.twitter.source;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.twitter.source.TwitterPoller;
import org.wso2.extension.siddhi.io.twitter.source.TwitterStatusListener;
import org.wso2.extension.siddhi.io.twitter.util.QueryBuilder;
import org.wso2.extension.siddhi.io.twitter.util.TwitterConstants;
import org.wso2.extension.siddhi.io.twitter.util.Util;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import twitter4j.FilterQuery;
import twitter4j.Query;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;

@Extension(name="twitter", namespace="source", description="The Twitter source receives events from a Twitter app. The events are received in the form of key-value mappings. \n\nThe following are key values of the map of a tweet and their descriptions:\n\t1.  createdAt: The UTC time at which the Tweet was created.\n\t2.  tweetId: The integer representation for the unique identifier of the Tweet.\n\t3.  text: The actual UTF-8 text of the status update.\n\t4.  user.createdAt: The UTC date and time at which the user account was created on Twitter.\n\t5.  user.id: The integer representation for the unique identifier of the user who posted the Tweet.\n\t6.  user.screenName: The screen name with which the user identifies himself/herself.\n\t7.  user.name: The name of the user (as specified by the user).\n\t8.  user.mail: The `mail.id` of the user.\n\t9.  user.location: The location in which the current user account profile is saved. This parameter can have a null value.\n\t10. hashtags: The hashtags that have been parsed out of the Tweet.\n\t11. userMentions: The other Twitter users who are mentioned in the text of the Tweet.\n\t12. mediaUrls: The media elements uploaded with the Tweet.\n\t13. urls: The URLs included in the text of a Tweet.\n\t14. language: The language in which the Tweet is posted.\n\t15. source: the utility used to post the Tweet as an HTML-formatted string.\n\t16. isRetweet: This indicates whether the Tweet is a Retweet or not.\n\t17. retweetCount: The number of times the Tweet has been retweeted.\n\t18. favouriteCount: This indicates the number of times the Tweet has been liked by Twitter users. The value for this field can be null.\n\t19. geoLocation: The geographic location from which the Tweet was posted by the user or client application. The value for this field can be null.\n\t20. quotedStatusId: This field appears only when the Tweet is a quote Tweet. It displays the integer value Tweet ID of the quoted Tweet.\n\t21. in.reply.to.status.id: If the Tweet is a reply to another Tweet, this field displays the integer representation of the original Tweet's ID. The value for this field can be null.\n\t22. place.id: An ID representing the current location from which the Tweet is read. This is represented as a string and not an integer.\n\t23. place.name: A short, human-readable representation of the name of the place.\n\t24. place.fullName: A complete human-readable representation of the name of the place.\n\t25. place.country_code: A shortened country code representing the country in which the place is located.\n\t26. place.country: The name of the country in which the place is located.\n\t27. track.words: The keywords given by the user to track.\n\t28. polling.query: The query provided by the user.\n\t", parameters={@Parameter(name="consumer.key", description="The API key to access the Twitter application created.", type={DataType.STRING}), @Parameter(name="consumer.secret", description="The API secret to access the Twitter application created.", type={DataType.STRING}), @Parameter(name="access.token", description="The access token to be used to make API requests on behalf of your account.", type={DataType.STRING}), @Parameter(name="access.token.secret", description="The access token secret to be used to make API requests on behalf of your account.", type={DataType.STRING}), @Parameter(name="mode", description="The mode in which the Twitter application is run. Possible values are as follows: \n`streaming`: This retrieves real time tweets. \n2`polling`: This retrieves historical tweets that were posted within one week.", type={DataType.STRING}), @Parameter(name="filter.level", description="This is assigned to Tweets based on the level of engagement. The filter level can be `none`, `low`, or `medium`. The highest level (i.e., `medium`) corresponds loosely with the `top tweets` filter that the service already offers in its on-site search function.", optional=true, defaultValue="none", type={DataType.STRING}), @Parameter(name="track", description="This filters the Tweets that include the specified keywords.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="follow", description="This filters the Tweets that are tweeted by the specified user IDs.", optional=true, defaultValue="null", type={DataType.LONG}), @Parameter(name="location", description="This filters Tweets based on the locations. Here, you need to specify thelatitude and the longitude of the location e.g., `51.683979:0.278970`.", optional=true, defaultValue="null", type={DataType.DOUBLE}), @Parameter(name="language", description="This filters Tweets that are posted in the specified language, given by an ISO 639-1 code.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="query", description="This filters Tweets that match the specified UTF-8, URL-encoded search query with a maximum of 500 characters including operators. \n e.g., '@NASA' - mentioning Twitter account 'NASA'.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="count", description="This returns a specified number of Tweets per page up to a maximum of 100.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="geocode", description="This returns Tweets by users who are located within a specified radius of the given latitude/longitude. The location is preferentially taken from the Geotagging API, but it falls back to their Twitter profile. The parameter value is specified in the `latitude,longitude,radius` format where theradius units must be specified as either `mi` (miles) or `km` (kilometers).", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="result.type", description="This parameter allows to to specify the whether you want to receive only popular Tweets, the most recent Tweets or a mix of both.The possible values are as follows:\n* `mixed`: This includes both popular and recent results in the response.\n* `recent`: This includes only the most recent results in the response.\n* `popular`: This includes only the most popular results in the response.)", optional=true, defaultValue="mixed", type={DataType.STRING}), @Parameter(name="max.id", description="This returns Tweets of which the Tweet ID is equal to or less than (i.e., older than) the specified ID", optional=true, defaultValue="-1", type={DataType.LONG}), @Parameter(name="since.id", description="This returns Tweets of which the Tweet ID is greater than (i.e., more recent than) the specified ID.", optional=true, defaultValue="-1", type={DataType.LONG}), @Parameter(name="until", description="This returns Tweets that were created before the given date. Date needs to be formatted as `YYYY-MM-DD`. The search index has a 7-day limit. Therefore, it is not possible to return Tweets that were created more than a week before the current date.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="polling.interval", description="This specifies a time interval (in seconds) to poll the Tweets periodically.", optional=true, defaultValue="3600", type={DataType.LONG})}, examples={@Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="In this example, the twitter source starts listening to a random sample of public statuses and passes the events to the `rcvEvents` stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', track = 'Amazon,Google,Apple', language = 'en', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="In this example, the twitter source starts listening to Tweets in English that include the keywords `Amazon`, `google`, or `apple`. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', track = 'Amazon,Google,Apple', language = 'en', filter.level = 'low', follow = '11348282,20536157,15670515,17193794,58561993,18139619',location = '51.280430:-0.563160,51.683979:0.278970', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="In this example, the twitter source starts listening to Tweets in English that either include the keywords `Amazon`, `google`, `apple`, tweeted by the specified followers, or tweeted from the given location based on the filter.level. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = 'happy hour', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text', hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="In this example, the twitter source starts polling Tweets that contain the exact phrase `happy hour`. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = '#Amazon', since.id = '973439483906420736', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="In this example, the twitter source starts polling tweets that contain the `#Amazon` hashtag and have a Tweet Id that is greater than `since.id`. Then these Tweets are passed to the `rcvEvents` stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = '@NASA', language = 'en', result.type = 'recent', geocode = '43.913723261972855,-72.54272478125,150km', since.id = 24012619984051000, max.id = 250126199840518145, until = 2018-03-10, @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text', hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="In this example, the twitter source starts polling the recent Tweets in English that mention `NASA`, and have Tweet IDs that are greater than the `since.id` and less than the `max.id`. Then these events are passed to the `rcvEvents` stream.")})
public class TwitterSource
extends Source {
    private static final Logger log = Logger.getLogger(TwitterSource.class);
    private TwitterPoller twitterPoller;
    private TwitterStatusListener twitterStatusListener;
    private SourceEventListener sourceEventListener;
    private TwitterStream twitterStream;
    private String consumerKey;
    private String consumerSecret;
    private String accessToken;
    private String accessSecret;
    private String mode;
    private String followParam;
    private String locationParam;
    private String trackParam;
    private String languageParam;
    private String filterLevel;
    private String queryParam;
    private int count;
    private String geocode;
    private long maxId;
    private Long sinceId;
    private String searchLang;
    private String until;
    private String since;
    private String resultType;
    private long pollingInterval;
    private long[] follow;
    private double[][] locations;
    private double latitude;
    private double longitude;
    private double radius;
    private String unitName;
    private Set<String> staticOptionsKeys;
    private ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture scheduledFuture;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.consumerKey = optionHolder.validateAndGetStaticValue("consumer.key");
        this.consumerSecret = optionHolder.validateAndGetStaticValue("consumer.secret");
        this.accessToken = optionHolder.validateAndGetStaticValue("access.token");
        this.accessSecret = optionHolder.validateAndGetStaticValue("access.token.secret");
        this.mode = optionHolder.validateAndGetStaticValue("mode");
        this.locationParam = optionHolder.validateAndGetStaticValue("location", "");
        this.followParam = optionHolder.validateAndGetStaticValue("follow", "");
        this.languageParam = optionHolder.validateAndGetStaticValue("language", "");
        this.trackParam = optionHolder.validateAndGetStaticValue("track", "");
        this.filterLevel = optionHolder.validateAndGetStaticValue("filter.level", "none");
        this.queryParam = optionHolder.validateAndGetStaticValue("query", "");
        this.count = Integer.parseInt(optionHolder.validateAndGetStaticValue("count", "-1"));
        this.geocode = optionHolder.validateAndGetStaticValue("geocode", "");
        this.maxId = Long.parseLong(optionHolder.validateAndGetStaticValue("max.id", "-1"));
        this.sinceId = Long.parseLong(optionHolder.validateAndGetStaticValue("since.id", "-1"));
        this.searchLang = optionHolder.validateAndGetStaticValue("language", "");
        this.until = optionHolder.validateAndGetStaticValue("until", "");
        this.since = optionHolder.validateAndGetStaticValue("since", "");
        this.resultType = optionHolder.validateAndGetStaticValue("result.type", "mixed");
        this.pollingInterval = Long.parseLong(optionHolder.validateAndGetStaticValue("polling.interval", "3600"));
        this.staticOptionsKeys = optionHolder.getStaticOptionsKeys();
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.validateParameter();
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        try {
            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
            configurationBuilder.setOAuthConsumerKey(this.consumerKey).setOAuthConsumerSecret(this.consumerSecret).setOAuthAccessToken(this.accessToken).setOAuthAccessTokenSecret(this.accessSecret).setJSONStoreEnabled(true);
            if (this.mode.equalsIgnoreCase("STREAMING")) {
                this.twitterStream = new TwitterStreamFactory(configurationBuilder.build()).getInstance();
                FilterQuery filterQuery = QueryBuilder.createFilterQuery(this.languageParam, this.trackParam, this.follow, this.filterLevel, this.locations);
                this.twitterStatusListener = new TwitterStatusListener(this.sourceEventListener);
                this.twitterStream.addListener(this.twitterStatusListener);
                if (this.staticOptionsKeys.size() == 6) {
                    this.twitterStream.sample();
                } else {
                    this.twitterStream.filter(filterQuery);
                }
            } else if (this.mode.equalsIgnoreCase("POLLING")) {
                Twitter twitter = new TwitterFactory(configurationBuilder.build()).getInstance();
                Query query = QueryBuilder.createQuery(this.queryParam, this.count, this.searchLang, this.sinceId, this.maxId, this.until, this.since, this.resultType, this.geocode, this.latitude, this.longitude, this.radius, this.unitName);
                this.twitterPoller = new TwitterPoller(twitter, query, this.sourceEventListener);
                this.scheduledFuture = this.scheduledExecutorService.scheduleAtFixedRate(this.twitterPoller, 0L, this.pollingInterval, TimeUnit.SECONDS);
            }
        }
        catch (Exception e) {
            throw new ConnectionUnavailableException("Error in connecting with the Twitter API : " + e.getMessage(), (Throwable)e);
        }
    }

    public void disconnect() {
        if (this.twitterStream != null) {
            this.twitterStream.clearListeners();
            if (log.isDebugEnabled()) {
                log.debug((Object)"The status listener has been cleared!");
            }
        }
    }

    public void destroy() {
        if (this.twitterStream != null) {
            this.twitterStream.shutdown();
            if (log.isDebugEnabled()) {
                log.debug((Object)"The twitter stream has been shutdown !");
            }
        }
        if (this.mode.equalsIgnoreCase("POLLING")) {
            this.scheduledFuture.cancel(true);
            this.twitterPoller.kill();
        }
    }

    public void pause() {
        if (this.mode.equalsIgnoreCase("STREAMING")) {
            this.twitterStatusListener.pause();
        } else {
            this.twitterPoller.pause();
        }
    }

    public void resume() {
        if (this.mode.equalsIgnoreCase("STREAMING")) {
            this.twitterStatusListener.resume();
        } else {
            this.twitterPoller.resume();
        }
    }

    public Map<String, Object> currentState() {
        HashMap<String, Object> currentState = new HashMap<String, Object>();
        currentState.put("since.id", this.twitterPoller.tweetId);
        return currentState;
    }

    public void restoreState(Map<String, Object> map) {
        this.sinceId = Long.parseLong(map.get("since.id").toString());
    }

    private void validateParameter() {
        Query.ResultType resultType1 = Query.ResultType.valueOf(this.resultType);
        if (this.mode.equalsIgnoreCase("STREAMING")) {
            for (String staticOptionKey : this.staticOptionsKeys) {
                if (TwitterConstants.STREAMING_PARAM.contains(staticOptionKey) || TwitterConstants.MANDATORY_PARAM.contains(staticOptionKey)) continue;
                throw new SiddhiAppValidationException(staticOptionKey + " is not valid for the " + this.mode + " " + "mode");
            }
        } else if (this.mode.equalsIgnoreCase("POLLING")) {
            if (this.queryParam.isEmpty()) {
                throw new SiddhiAppValidationException("For polling mode, query should be given.");
            }
            for (String staticOptionkey : this.staticOptionsKeys) {
                if (TwitterConstants.POLLING_PARAM.contains(staticOptionkey) || TwitterConstants.MANDATORY_PARAM.contains(staticOptionkey)) continue;
                throw new SiddhiAppValidationException(staticOptionkey + " is not valid for the " + this.mode + " " + "mode");
            }
        } else {
            throw new SiddhiAppValidationException("There are only two possible values for mode : streaming or polling. But found '" + this.mode + "'.");
        }
        if (!this.followParam.isEmpty()) {
            this.follow = Util.followParam(this.followParam);
        }
        if (!this.locationParam.isEmpty()) {
            this.locations = Util.locationParam(this.locationParam);
        }
        if (!this.geocode.isEmpty()) {
            Enum unit = null;
            String[] parts = this.geocode.split(",");
            String radiusstr = parts[2].trim();
            try {
                this.latitude = Double.parseDouble(parts[0]);
                this.longitude = Double.parseDouble(parts[1]);
                this.radius = Double.parseDouble(radiusstr.substring(0, radiusstr.length() - 2));
            }
            catch (NumberFormatException e) {
                throw new SiddhiAppValidationException("In geocode, Latitude,Longitude,Radius should be a double value : " + e.getMessage());
            }
            for (Query.Unit value : Query.Unit.values()) {
                if (!radiusstr.endsWith(value.name())) continue;
                unit = value;
                break;
            }
            if (unit == null) {
                throw new SiddhiAppValidationException("Unrecognized geocode radius: " + radiusstr + ". Radius units" + " must be specified as either 'mi' (miles) or 'km' (kilometers).");
            }
            this.unitName = unit.name();
        }
        if (!TwitterConstants.FILTER_LEVELS.contains(this.filterLevel)) {
            throw new SiddhiAppValidationException("There are only three possible values for filter.level : low or medium or none. But found '" + this.filterLevel + "'.");
        }
        if (!TwitterConstants.RESULT_TYPES.contains((Object)resultType1)) {
            throw new SiddhiAppValidationException("There are only three possible values for result.type : mixed or popular or recent. But found '" + this.resultType + "'.");
        }
    }
}

