/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.extension.siddhi.io.twitter.source;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.twitter.source.TwitterPoller;
import org.wso2.extension.siddhi.io.twitter.source.TwitterStatusListener;
import org.wso2.extension.siddhi.io.twitter.util.QueryBuilder;
import org.wso2.extension.siddhi.io.twitter.util.TwitterConstants;
import org.wso2.extension.siddhi.io.twitter.util.Util;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;
import twitter4j.FilterQuery;
import twitter4j.Query;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;

@Extension(name="twitter", namespace="source", description="The twitter source receives the events from a twitter App. The events will be received in a key-value map. \n\nKey values of the map of a tweet and their descriptions.\n\t1.  createdAt - UTC time when this Tweet was created.\n\t2.  tweetId - The integer representation of the unique identifier for this Tweet.\n\t3.  text - The actual UTF-8 text of the status update.\n\t4.  user.createdAt - The UTC datetime that the user account was created on Twitter.\n\t5.  user.id - The integer representation of the unique identifier for this User.\n\t6.  user.screenName - The screen name, that this user identifies themselves with.\n\t7.  user.name - The name of the user, as they've defined it.\n\t8.  user.mail - The mail.id of the user.\n\t9.  user.location - Nullable. The user-defined location for this account's profile.\n\t10. hashtags - Represents hashtags which have been parsed out of the Tweet.\n\t11. userMentions - Represents other Twitter users mentioned in the text of the Tweet.\n\t12. mediaUrls - Represents media elements uploaded with the Tweet.\n\t13. urls - Represents URLs included in the text of a Tweet.\n\t14. language - The language inwhich tweep tweeted.\n\t15. source - Utility used to post the Tweet, as an HTML-formatted string\n\t16. isRetweet - Indicates whether this is a Retweeted Tweet.\n\t17. retweetCount - Number of times this Tweet has been retweeted.\n\t18. favouriteCount = Nullable. Indicates approximately how many times this Tweet has been liked by Twitter users.\n\t19. geoLocation - Nullable. Represents the geographic location of this Tweet as reported by the user or client application.\n\t20. quotedStatusId - This field only surfaces when the Tweet is a quote Tweet. This field contains the integer value Tweet ID of the quoted Tweet.\n\t21. in.reply.to.status.id - Nullable. If the represented Tweet is a reply, this field will contain the integer representation of the original Tweet's ID.\n\t22. place.id - ID representing this place. This is represented as a string, not an integer.\n\t23. place.name - Short human-readable representation of the place's name.\n\t24. place.fullName - Full human-readable representation of the place's name.\n\t25. place.country_code - Shortened country code representing the country containing this place.\n\t26. place.country - Name of the country containing this place.\n\t27. track.words - Keywords given by the user to track.\n\t28. polling.query - Query given by the user.\n\t", parameters={@Parameter(name="consumer.key", description="Consumer key is the API key to access created twitter app", type={DataType.STRING}), @Parameter(name="consumer.secret", description="Consumer secret is the API secret to access created twitter app", type={DataType.STRING}), @Parameter(name="access.token", description="Access token is used to make API requests on behalf of your account.", type={DataType.STRING}), @Parameter(name="access.token.secret", description="Access token secret is used to make API requests on behalf of your account.", type={DataType.STRING}), @Parameter(name="mode", description="There are two possible values for mode. \n1. streaming - Retrieves real time tweets, \n2. polling - Retrieves historical tweets within one week.", type={DataType.STRING}), @Parameter(name="filter.level", description="Filters tweets by the level of engagement based on the  filter.level. The highest level(medium) corresponds loosely to the 'top tweets'filter the service already offers in its on-site search function. Values will be one of either none, low, or medium.", optional=true, defaultValue="none", type={DataType.STRING}), @Parameter(name="track", description="Filters the tweets that include the given keywords.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="follow", description="Filters the tweets that is tweeted by the given user ids", optional=true, defaultValue="null", type={DataType.LONG}), @Parameter(name="location", description="Filters tweets based on the locations. Here, We have to specify latitude and the longitude of the location. For Example : 51.683979:0.278970", optional=true, defaultValue="null", type={DataType.DOUBLE}), @Parameter(name="language", description="Filters tweets in the given language, given by an ISO 639-1 code.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="query", description="Filters tweets that matches the given Query, UTF-8, URL-encoded search query of 500 characters maximum, including operators. \nFor example : '@NASA' - mentioning Twitter account 'NASA'.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="count", description="Returns specified number of tweets per page, up to a maximum of 100.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="geocode", description="Returns tweets by users located within a given radius of the given latitude/longitude. The location is preferentially taking from the Geotagging API, but will fall back to their Twitter profile. The parameter value is specified by latitude,longitude,radius, where radius units must be specified as either 'mi' (miles) or 'km' (kilometers).", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="result.type", description="Returns tweets based on what type of results you would prefer to receive.Valid values include:\n* mixed : Include both popular and recent results in the response.\n* recent : return only the most recent results in the response\n* popular : return only the most popular results in the response.)", optional=true, defaultValue="mixed", type={DataType.STRING}), @Parameter(name="max.id", description="Returns tweets with an tweet ID less than (that is, older than) or equal to the specified ID", optional=true, defaultValue="-1", type={DataType.LONG}), @Parameter(name="since.id", description="Returns tweets with an tweet ID greater than (that is, more recent than) the specified ID.", optional=true, defaultValue="-1", type={DataType.LONG}), @Parameter(name="until", description="Returns tweets created before the given date. Date should be formatted as YYYY-MM-DD. Search index has a 7-day limit. So no tweets will be found for a date older than one week.", optional=true, defaultValue="null", type={DataType.STRING}), @Parameter(name="polling.interval", description="Specifies the period of time (in seconds) to poll tweets periodically", optional=true, defaultValue="3600", type={DataType.LONG})}, examples={@Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="Under this configuration, it starts listening on random sample of public statuses and they are passed to the rcvEvents stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', track = 'Amazon,Google,Apple', language = 'en', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="Under this configuration, it starts listening tweets in English that containing the keywords Amazon,google or apple and they are passed to the rcvEvents stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'streaming', track = 'Amazon,Google,Apple', language = 'en', filter.level = 'low', follow = '11348282,20536157,15670515,17193794,58561993,18139619',location = '51.280430:-0.563160,51.683979:0.278970', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="Under this configuration, it starts listening tweets in English that containing the keywords Amazon,google,apple or tweeted by the given followers or tweeted from the given location based on the filter.level. and they are passed to the rcvEvents stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = 'happy hour', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text', hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="Under this configuration, it starts polling tweets containing the exact phrase 'happy hour' and they are passed to the rcvEvents stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = '#Amazon', since.id = '973439483906420736', @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text',hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="Under this configuration, it starts polling tweets, containing the hashtag '#Amazon' and tweet Id is greater than since.id and they are passed to the rcvEvents stream."), @Example(syntax="@source(type='twitter', consumer.key='consumer.key',consumer.secret='consumerSecret', access.token='accessToken',access.token.secret='accessTokenSecret', mode= 'polling', query = '@NASA', language = 'en', result.type = 'recent', geocode = '43.913723261972855,-72.54272478125,150km', since.id = 24012619984051000, max.id = 250126199840518145, until = 2018-03-10, @map(type='keyvalue', @attributes(createdAt = 'createdAt', id = 'tweetId', text= 'text', hashtags = 'hashtags'))) \ndefine stream inputStream(createdAt String, id long, text String, hashtags string);", description="Under this configuration, it starts polling recent tweets in english that is  having tweet id greater than since.id and less than max.id, mentioning NASA  and they are passed to the rcvEvents stream.")})
public class TwitterSource
extends Source {
    private static final Logger log = Logger.getLogger(TwitterSource.class);
    private TwitterPoller twitterPoller;
    private TwitterStatusListener twitterStatusListener;
    private SourceEventListener sourceEventListener;
    private TwitterStream twitterStream;
    private String consumerKey;
    private String consumerSecret;
    private String accessToken;
    private String accessSecret;
    private String mode;
    private String followParam;
    private String locationParam;
    private String trackParam;
    private String languageParam;
    private String filterLevel;
    private String queryParam;
    private int count;
    private String geocode;
    private long maxId;
    private Long sinceId;
    private String searchLang;
    private String until;
    private String since;
    private String resultType;
    private long pollingInterval;
    private long[] follow;
    private double[][] locations;
    private double latitude;
    private double longitude;
    private double radius;
    private String unitName;
    private Set<String> staticOptionsKeys;
    private ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture scheduledFuture;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requestedTransportPropertyNames, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.consumerKey = optionHolder.validateAndGetStaticValue("consumer.key");
        this.consumerSecret = optionHolder.validateAndGetStaticValue("consumer.secret");
        this.accessToken = optionHolder.validateAndGetStaticValue("access.token");
        this.accessSecret = optionHolder.validateAndGetStaticValue("access.token.secret");
        this.mode = optionHolder.validateAndGetStaticValue("mode");
        this.locationParam = optionHolder.validateAndGetStaticValue("location", "");
        this.followParam = optionHolder.validateAndGetStaticValue("follow", "");
        this.languageParam = optionHolder.validateAndGetStaticValue("language", "");
        this.trackParam = optionHolder.validateAndGetStaticValue("track", "");
        this.filterLevel = optionHolder.validateAndGetStaticValue("filter.level", "none");
        this.queryParam = optionHolder.validateAndGetStaticValue("query", "");
        this.count = Integer.parseInt(optionHolder.validateAndGetStaticValue("count", "-1"));
        this.geocode = optionHolder.validateAndGetStaticValue("geocode", "");
        this.maxId = Long.parseLong(optionHolder.validateAndGetStaticValue("max.id", "-1"));
        this.sinceId = Long.parseLong(optionHolder.validateAndGetStaticValue("since.id", "-1"));
        this.searchLang = optionHolder.validateAndGetStaticValue("language", "");
        this.until = optionHolder.validateAndGetStaticValue("until", "");
        this.since = optionHolder.validateAndGetStaticValue("since", "");
        this.resultType = optionHolder.validateAndGetStaticValue("result.type", "mixed");
        this.pollingInterval = Long.parseLong(optionHolder.validateAndGetStaticValue("polling.interval", "3600"));
        this.staticOptionsKeys = optionHolder.getStaticOptionsKeys();
        this.scheduledExecutorService = siddhiAppContext.getScheduledExecutorService();
        this.validateParameter();
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{Map.class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        try {
            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
            configurationBuilder.setOAuthConsumerKey(this.consumerKey).setOAuthConsumerSecret(this.consumerSecret).setOAuthAccessToken(this.accessToken).setOAuthAccessTokenSecret(this.accessSecret).setJSONStoreEnabled(true);
            if (this.mode.equalsIgnoreCase("STREAMING")) {
                this.twitterStream = new TwitterStreamFactory(configurationBuilder.build()).getInstance();
                FilterQuery filterQuery = QueryBuilder.createFilterQuery(this.languageParam, this.trackParam, this.follow, this.filterLevel, this.locations);
                this.twitterStatusListener = new TwitterStatusListener(this.sourceEventListener);
                this.twitterStream.addListener(this.twitterStatusListener);
                if (this.staticOptionsKeys.size() == 6) {
                    this.twitterStream.sample();
                } else {
                    this.twitterStream.filter(filterQuery);
                }
            } else {
                Twitter twitter = new TwitterFactory(configurationBuilder.build()).getInstance();
                Query query = QueryBuilder.createQuery(this.queryParam, this.count, this.searchLang, this.sinceId, this.maxId, this.until, this.since, this.resultType, this.geocode, this.latitude, this.longitude, this.radius, this.unitName);
                this.twitterPoller = new TwitterPoller(twitter, query, this.sourceEventListener);
                this.scheduledFuture = this.scheduledExecutorService.scheduleAtFixedRate(this.twitterPoller, 0L, this.pollingInterval, TimeUnit.SECONDS);
            }
        }
        catch (Exception e) {
            throw new ConnectionUnavailableException("Error in connecting with the Twitter API : " + e.getMessage(), (Throwable)e);
        }
    }

    public void disconnect() {
        if (this.twitterStream != null) {
            this.twitterStream.clearListeners();
            if (log.isDebugEnabled()) {
                log.debug((Object)"The status listener has been cleared!");
            }
        }
    }

    public void destroy() {
        if (this.twitterStream != null) {
            this.twitterStream.shutdown();
            if (log.isDebugEnabled()) {
                log.debug((Object)"The twitter stream has been shutdown !");
            }
        }
        if (this.mode.equalsIgnoreCase("POLLING")) {
            this.scheduledFuture.cancel(true);
        }
    }

    public void pause() {
        if (this.mode.equalsIgnoreCase("STREAMING")) {
            this.twitterStatusListener.pause();
        } else {
            this.twitterPoller.pause();
        }
    }

    public void resume() {
        if (this.mode.equalsIgnoreCase("STREAMING")) {
            this.twitterStatusListener.resume();
        } else {
            this.twitterPoller.resume();
        }
    }

    public Map<String, Object> currentState() {
        HashMap<String, Object> currentState = new HashMap<String, Object>();
        currentState.put("since.id", this.twitterPoller.tweetId);
        return currentState;
    }

    public void restoreState(Map<String, Object> map) {
        this.sinceId = Long.parseLong(map.get("since.id").toString());
    }

    private void validateParameter() {
        Query.ResultType resultType1 = Query.ResultType.valueOf(this.resultType);
        if (this.mode.equalsIgnoreCase("STREAMING")) {
            for (String staticOptionKey : this.staticOptionsKeys) {
                if (TwitterConstants.STREAMING_PARAM.contains(staticOptionKey) || TwitterConstants.MANDATORY_PARAM.contains(staticOptionKey)) continue;
                throw new SiddhiAppValidationException(staticOptionKey + " is not valid for the " + this.mode + " " + "mode");
            }
        } else if (this.mode.equalsIgnoreCase("POLLING")) {
            if (this.queryParam.isEmpty()) {
                throw new SiddhiAppValidationException("For polling mode, query should be given.");
            }
            for (String staticOptionkey : this.staticOptionsKeys) {
                if (TwitterConstants.POLLING_PARAM.contains(staticOptionkey) || TwitterConstants.MANDATORY_PARAM.contains(staticOptionkey)) continue;
                throw new SiddhiAppValidationException(staticOptionkey + " is not valid for the " + this.mode + " " + "mode");
            }
        } else {
            throw new SiddhiAppValidationException("There are only two possible values for mode : streaming or polling. But found '" + this.mode + "'.");
        }
        if (!this.followParam.isEmpty()) {
            this.follow = Util.followParam(this.followParam);
        }
        if (!this.locationParam.isEmpty()) {
            this.locations = Util.locationParam(this.locationParam);
        }
        if (!this.geocode.isEmpty()) {
            Enum unit = null;
            String[] parts = this.geocode.split(",");
            String radiusstr = parts[2].trim();
            try {
                this.latitude = Double.parseDouble(parts[0]);
                this.longitude = Double.parseDouble(parts[1]);
                this.radius = Double.parseDouble(radiusstr.substring(0, radiusstr.length() - 2));
            }
            catch (NumberFormatException e) {
                throw new SiddhiAppValidationException("In geocode, Latitude,Longitude,Radius should be a double value : " + e.getMessage());
            }
            for (Query.Unit value : Query.Unit.values()) {
                if (!radiusstr.endsWith(value.name())) continue;
                unit = value;
                break;
            }
            if (unit == null) {
                throw new SiddhiAppValidationException("Unrecognized geocode radius: " + radiusstr + ". Radius units" + " must be specified as either 'mi' (miles) or 'km' (kilometers).");
            }
            this.unitName = unit.name();
        }
        if (!TwitterConstants.FILTER_LEVELS.contains(this.filterLevel)) {
            throw new SiddhiAppValidationException("There are only three possible values for filter.level : low or medium or none. But found '" + this.filterLevel + "'.");
        }
        if (!TwitterConstants.RESULT_TYPES.contains((Object)resultType1)) {
            throw new SiddhiAppValidationException("There are only three possible values for result.type : mixed or popular or recent. But found '" + this.resultType + "'.");
        }
    }
}

