/*
 * Decompiled with CFR 0.152.
 */
package com.stratio.streaming.api;

import com.stratio.streaming.api.IStratioStreamingAPI;
import com.stratio.streaming.api.StratioStreamingAPI$;
import com.stratio.streaming.api.StreamingAPIAsyncOperation;
import com.stratio.streaming.api.StreamingAPIListOperation;
import com.stratio.streaming.api.StreamingAPISyncOperation;
import com.stratio.streaming.api.messaging.ColumnNameType;
import com.stratio.streaming.api.messaging.ColumnNameValue;
import com.stratio.streaming.api.messaging.MessageBuilder$;
import com.stratio.streaming.api.messaging.MessageBuilderWithColumns;
import com.stratio.streaming.api.zookeeper.ZookeeperConsumer;
import com.stratio.streaming.commons.constants.InternalTopic;
import com.stratio.streaming.commons.exceptions.StratioEngineConnectionException;
import com.stratio.streaming.commons.exceptions.StratioEngineOperationException;
import com.stratio.streaming.commons.exceptions.StratioEngineStatusException;
import com.stratio.streaming.commons.kafka.service.KafkaTopicService;
import com.stratio.streaming.commons.kafka.service.TopicService;
import com.stratio.streaming.commons.messages.ColumnNameTypeValue;
import com.stratio.streaming.commons.messages.StratioStreamingMessage;
import com.stratio.streaming.commons.messages.StreamQuery;
import com.stratio.streaming.commons.streams.StratioStream;
import com.stratio.streaming.dto.StratioQueryStream;
import com.stratio.streaming.kafka.KafkaConsumer;
import com.stratio.streaming.kafka.KafkaConsumer$;
import com.stratio.streaming.kafka.KafkaProducer;
import com.stratio.streaming.kafka.KafkaProducer$;
import com.stratio.streaming.messaging.InsertMessageBuilder;
import com.stratio.streaming.messaging.QueryMessageBuilder;
import com.stratio.streaming.messaging.StreamMessageBuilder;
import com.typesafe.config.Config;
import kafka.consumer.KafkaStream;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.Pathable;
import org.apache.curator.retry.RetryOneTime;
import org.slf4j.Logger;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\r-e\u0001B\u0001\u0003\u0001-\u00111c\u0015;sCRLwn\u0015;sK\u0006l\u0017N\\4B!&S!a\u0001\u0003\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\u0006\r\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u000f!\tqa\u001d;sCRLwNC\u0001\n\u0003\r\u0019w.\\\u0002\u0001'\r\u0001AB\u0005\t\u0003\u001bAi\u0011A\u0004\u0006\u0002\u001f\u0005)1oY1mC&\u0011\u0011C\u0004\u0002\u0007\u0003:L(+\u001a4\u0011\u0005M!R\"\u0001\u0002\n\u0005U\u0011!\u0001F%TiJ\fG/[8TiJ,\u0017-\\5oO\u0006\u0003\u0016\nC\u0003\u0018\u0001\u0011\u0005\u0001$\u0001\u0004=S:LGO\u0010\u000b\u00023A\u00111\u0003\u0001\u0005\u00067\u0001!\t\u0001H\u0001\rGJ,\u0017\r^3TiJ,\u0017-\u001c\u000b\u0004;\u0001J\u0003CA\u0007\u001f\u0013\tybB\u0001\u0003V]&$\b\"B\u0011\u001b\u0001\u0004\u0011\u0013AC:ue\u0016\fWNT1nKB\u00111E\n\b\u0003\u001b\u0011J!!\n\b\u0002\rA\u0013X\rZ3g\u0013\t9\u0003F\u0001\u0004TiJLgn\u001a\u0006\u0003K9AQA\u000b\u000eA\u0002-\nqaY8mk6t7\u000fE\u0002-cMj\u0011!\f\u0006\u0003]=\nA!\u001e;jY*\t\u0001'\u0001\u0003kCZ\f\u0017B\u0001\u001a.\u0005\u0011a\u0015n\u001d;\u0011\u0005Q:T\"A\u001b\u000b\u0005Y\u0012\u0011!C7fgN\fw-\u001b8h\u0013\tATG\u0001\bD_2,XN\u001c(b[\u0016$\u0016\u0010]3\t\u000bi\u0002A\u0011A\u001e\u0002\u0017\u0005dG/\u001a:TiJ,\u0017-\u001c\u000b\u0004;qj\u0004\"B\u0011:\u0001\u0004\u0011\u0003\"\u0002\u0016:\u0001\u0004Y\u0003\"B \u0001\t\u0003\u0001\u0015AC5og\u0016\u0014H\u000fR1uCR\u0019Q$\u0011\"\t\u000b\u0005r\u0004\u0019\u0001\u0012\t\u000b\rs\u0004\u0019\u0001#\u0002\t\u0011\fG/\u0019\t\u0004YE*\u0005C\u0001\u001bG\u0013\t9UGA\bD_2,XN\u001c(b[\u00164\u0016\r\\;f\u0011\u0015I\u0005\u0001\"\u0001K\u0003!\tG\rZ)vKJLHc\u0001\u0012L\u0019\")\u0011\u0005\u0013a\u0001E!)Q\n\u0013a\u0001E\u0005)\u0011/^3ss\")q\n\u0001C\u0001!\u0006Qq-\u001a;Rk\u0016\u0014\u00180\u00133\u0015\u0007\t\n&\u000bC\u0003\"\u001d\u0002\u0007!\u0005C\u0003N\u001d\u0002\u0007!\u0005C\u0003U\u0001\u0011\u0005Q+A\u0006sK6|g/Z)vKJLHcA\u000fW/\")\u0011e\u0015a\u0001E!)\u0001l\u0015a\u0001E\u00059\u0011/^3ss&#\u0007\"\u0002.\u0001\t\u0003Y\u0016A\u00033s_B\u001cFO]3b[R\u0011Q\u0004\u0018\u0005\u0006Ce\u0003\rA\t\u0005\u0006=\u0002!\taX\u0001\rY&\u001cH/\u001a8TiJ,\u0017-\u001c\u000b\u0003AB\u0004B!\u00194#Q6\t!M\u0003\u0002dI\u0006A1m\u001c8tk6,'OC\u0001f\u0003\u0015Y\u0017MZ6b\u0013\t9'MA\u0006LC\u001a\\\u0017m\u0015;sK\u0006l\u0007CA5o\u001b\u0005Q'BA6m\u0003!iWm]:bO\u0016\u001c(BA7\u0005\u0003\u001d\u0019w.\\7p]NL!a\u001c6\u0003/M#(/\u0019;j_N#(/Z1nS:<W*Z:tC\u001e,\u0007\"B\u0011^\u0001\u0004\u0011\u0003\"\u0002:\u0001\t\u0003\u0019\u0018\u0001E:u_Bd\u0015n\u001d;f]N#(/Z1n)\tiB\u000fC\u0003\"c\u0002\u0007!\u0005C\u0003w\u0001\u0011%q/A\u0018tQV$Hm\\<o\u0017\u000647.Y\"p]N,X.\u001a:B]\u0012\u0014V-\\8wKN#(/Z1nS:<G*[:uK:,'\u000f\u0006\u0002\u001eq\")\u0011%\u001ea\u0001E!)!\u0010\u0001C\u0001w\u0006\t\u0012/^3sS\u0016\u001chI]8n'R\u0014X-Y7\u0015\u0007q\f9\u0001E\u0002-cu\u00042A`A\u0002\u001b\u0005y(bAA\u0001\t\u0005\u0019A\r^8\n\u0007\u0005\u0015qP\u0001\nTiJ\fG/[8Rk\u0016\u0014\u0018p\u0015;sK\u0006l\u0007BBA\u0005s\u0002\u0007!%\u0001\u0004tiJ,\u0017-\u001c\u0005\b\u0003\u001b\u0001A\u0011AA\b\u0003E\u0019w\u000e\\;n]N4%o\\7TiJ,\u0017-\u001c\u000b\u0005\u0003#\tI\u0002\u0005\u0003-c\u0005M\u0001cA5\u0002\u0016%\u0019\u0011q\u00036\u0003'\r{G.^7o\u001d\u0006lW\rV=qKZ\u000bG.^3\t\u000f\u0005%\u00111\u0002a\u0001E!9\u0011Q\u0004\u0001\u0005\u0002\u0005}\u0011a\u00037jgR\u001cFO]3b[N$\"!!\t\u0011\t1\n\u00141\u0005\t\u0005\u0003K\tY#\u0004\u0002\u0002()\u0019\u0011\u0011\u00067\u0002\u000fM$(/Z1ng&!\u0011QFA\u0014\u00055\u0019FO]1uS>\u001cFO]3b[\"9\u0011\u0011\u0007\u0001\u0005\u0002\u0005M\u0012aD:bm\u0016$vnQ1tg\u0006tGM]1\u0015\u0007u\t)\u0004\u0003\u0004\"\u0003_\u0001\rA\t\u0005\b\u0003s\u0001A\u0011AA\u001e\u0003M\u0019Ho\u001c9TCZ,Gk\\\"bgN\fg\u000e\u001a:b)\ri\u0012Q\b\u0005\u0007C\u0005]\u0002\u0019\u0001\u0012\t\u000f\u0005\u0005\u0003\u0001\"\u0001\u0002D\u0005Y1/\u0019<f)>luN\\4p)\ri\u0012Q\t\u0005\u0007C\u0005}\u0002\u0019\u0001\u0012\t\u000f\u0005%\u0003\u0001\"\u0001\u0002L\u0005y1\u000f^8q'\u00064X\rV8N_:<w\u000eF\u0002\u001e\u0003\u001bBa!IA$\u0001\u0004\u0011\u0003bBA)\u0001\u0011\u0005\u00111K\u0001\u000bg\u00064X\rV8T_2\u0014HcA\u000f\u0002V!1\u0011%a\u0014A\u0002\tBq!!\u0017\u0001\t\u0003\tY&\u0001\bti>\u00048+\u0019<f)>\u001cv\u000e\u001c:\u0015\u0007u\ti\u0006\u0003\u0004\"\u0003/\u0002\rA\t\u0005\b\u0003C\u0002A\u0011AA2\u0003-Ig\u000eZ3y'R\u0014X-Y7\u0015\u0007u\t)\u0007\u0003\u0004\"\u0003?\u0002\rA\t\u0005\b\u0003S\u0002A\u0011AA6\u0003=\u0019Ho\u001c9J]\u0012,\u0007p\u0015;sK\u0006lGcA\u000f\u0002n!1\u0011%a\u001aA\u0002\tBq!!\u001d\u0001\t\u0003\t\u0019(\u0001\u0006j]&$\u0018.\u00197ju\u0016$\u0012A\u0005\u0005\b\u0003o\u0002A\u0011AA=\u0003iIg.\u001b;jC2L'0Z,ji\"\u001cVM\u001d<fe\u000e{gNZ5h)%\u0011\u00121PA@\u0003\u0013\u000bi\tC\u0004\u0002~\u0005U\u0004\u0019\u0001\u0012\u0002\u0017-\fgm[1TKJ4XM\u001d\u0005\t\u0003\u0003\u000b)\b1\u0001\u0002\u0004\u0006I1.\u00194lCB{'\u000f\u001e\t\u0004\u001b\u0005\u0015\u0015bAAD\u001d\t\u0019\u0011J\u001c;\t\u000f\u0005-\u0015Q\u000fa\u0001E\u0005\u0011B\u000f[3[_>\\W-\u001a9feN+'O^3s\u0011!\ty)!\u001eA\u0002\u0005\r\u0015\u0001\u0005;iKj{wn[3fa\u0016\u0014\bk\u001c:u\u0011\u001d\t\u0019\n\u0001C!\u0003+\u000b\u0001c^5uQN+'O^3s\u0007>tg-[4\u0015\u000bI\t9*a'\t\u000f\u0005e\u0015\u0011\u0013a\u0001E\u0005Y1.\u00194lCF+xN];n\u0011\u001d\ti*!%A\u0002\t\nqB_8pW\u0016,\u0007/\u001a:Rk>\u0014X/\u001c\u0005\b\u0003'\u0003A\u0011IAQ)%\u0011\u00121UAT\u0003S\u000bi\u000bC\u0004\u0002&\u0006}\u0005\u0019\u0001\u0012\u0002\u0013-\fgm[1I_N$\b\u0002CAA\u0003?\u0003\r!a!\t\u000f\u0005-\u0016q\u0014a\u0001E\u0005i!p\\8lK\u0016\u0004XM\u001d%pgRD\u0001\"a,\u0002 \u0002\u0007\u00111Q\u0001\u000eu>|7.Z3qKJ\u0004vN\u001d;\t\u000f\u0005M\u0006\u0001\"\u0011\u0002t\u0005!\u0011N\\5u\u0011\u001d\t9\f\u0001C!\u0003s\u000ba![:J]&$HCAA^!\ri\u0011QX\u0005\u0004\u0003\u007fs!a\u0002\"p_2,\u0017M\u001c\u0005\b\u0003\u0007\u0004A\u0011IA]\u0003-I7oQ8o]\u0016\u001cG/\u001a3\t\u000f\u0005\u001d\u0007\u0001\"\u0001\u0002J\u0006AB-\u001a4j]\u0016\f5m\u001b8po2,GmZ3US6,w*\u001e;\u0015\u0007e\tY\r\u0003\u0005\u0002N\u0006\u0015\u0007\u0019AAB\u0003-!\u0018.\\3PkRLe.T:\t\u000f\u0005E\u0007\u0001\"\u0011\u0002T\u0006)1\r\\8tKV\tQ\u0004C\u0005\u0002X\u0002\u0011\r\u0011\"\u0001\u0002Z\u0006\u00112\u000f\u001e:fC6Lgn\u001a+pa&\u001cg*Y7f+\t\tY\u000e\u0005\u0003\u0002^\u0006\rXBAAp\u0015\r\t\toL\u0001\u0005Y\u0006tw-C\u0002(\u0003?D\u0001\"a:\u0001A\u0003%\u00111\\\u0001\u0014gR\u0014X-Y7j]\u001e$v\u000e]5d\u001d\u0006lW\r\t\u0005\n\u0003W\u0004!\u0019!C\u0001\u00033\fac\u001d;sK\u0006l\u0017N\\4ECR\fGk\u001c9jG:\u000bW.\u001a\u0005\t\u0003_\u0004\u0001\u0015!\u0003\u0002\\\u000692\u000f\u001e:fC6Lgn\u001a#bi\u0006$v\u000e]5d\u001d\u0006lW\r\t\u0005\n\u0003g\u0004!\u0019!C\u0001\u00033\f\u0011b]3tg&|g.\u00133\t\u0011\u0005]\b\u0001)A\u0005\u00037\f!b]3tg&|g.\u00133!\u0011)\tY\u0010\u0001EC\u0002\u0013\u0005\u0011\u0011\\\u0001\u0015G>t7/^7fe\n\u0013xn[3s'\u0016\u0014h/\u001a:\t\u0015\u0005}\b\u0001#A!B\u0013\tY.A\u000bd_:\u001cX/\\3s\u0005J|7.\u001a:TKJ4XM\u001d\u0011\t\u0015\t\r\u0001\u0001#b\u0001\n\u0003\u0011)!\u0001\nd_:\u001cX/\\3s\u0005J|7.\u001a:Q_J$XCAAB\u0011)\u0011I\u0001\u0001E\u0001B\u0003&\u00111Q\u0001\u0014G>t7/^7fe\n\u0013xn[3s!>\u0014H\u000f\t\u0005\n\u0005\u001b\u0001\u0001\u0019!C\u0001\u00033\fAb[1gW\u0006\u001cE.^:uKJD\u0011B!\u0005\u0001\u0001\u0004%\tAa\u0005\u0002!-\fgm[1DYV\u001cH/\u001a:`I\u0015\fHcA\u000f\u0003\u0016!Q!q\u0003B\b\u0003\u0003\u0005\r!a7\u0002\u0007a$\u0013\u0007\u0003\u0005\u0003\u001c\u0001\u0001\u000b\u0015BAn\u00035Y\u0017MZ6b\u00072,8\u000f^3sA!Q!q\u0004\u0001\t\u0006\u0004%\tA!\t\u0002\u0017-\fgm[1Ce>\\WM]\u000b\u0002E!I!Q\u0005\u0001\t\u0002\u0003\u0006KAI\u0001\rW\u000647.\u0019\"s_.,'\u000f\t\u0005\n\u0005S\u0001\u0001\u0019!C\u0001\u00033\fqB_8pW\u0016,\u0007/\u001a:TKJ4XM\u001d\u0005\n\u0005[\u0001\u0001\u0019!C\u0001\u0005_\t1C_8pW\u0016,\u0007/\u001a:TKJ4XM]0%KF$2!\bB\u0019\u0011)\u00119Ba\u000b\u0002\u0002\u0003\u0007\u00111\u001c\u0005\t\u0005k\u0001\u0001\u0015)\u0003\u0002\\\u0006\u0001\"p\\8lK\u0016\u0004XM]*feZ,'\u000f\t\u0005\u000b\u0005s\u0001\u0001R1A\u0005\u0002\t\u0005\u0012\u0001\u0005>p_.,W\r]3s\u00072,8\u000f^3s\u0011%\u0011i\u0004\u0001E\u0001B\u0003&!%A\t{_>\\W-\u001a9fe\u000ecWo\u001d;fe\u0002B\u0011B!\u0011\u0001\u0001\u0004%\tAa\u0011\u0002\u0017M$(/Z1nS:<W\u000b]\u000b\u0003\u0003wC\u0011Ba\u0012\u0001\u0001\u0004%\tA!\u0013\u0002\u001fM$(/Z1nS:<W\u000b]0%KF$2!\bB&\u0011)\u00119B!\u0012\u0002\u0002\u0003\u0007\u00111\u0018\u0005\t\u0005\u001f\u0002\u0001\u0015)\u0003\u0002<\u0006a1\u000f\u001e:fC6LgnZ+qA!I!1\u000b\u0001A\u0002\u0013\u0005!1I\u0001\u0011gR\u0014X-Y7j]\u001e\u0014VO\u001c8j]\u001eD\u0011Ba\u0016\u0001\u0001\u0004%\tA!\u0017\u0002)M$(/Z1nS:<'+\u001e8oS:<w\fJ3r)\ri\"1\f\u0005\u000b\u0005/\u0011)&!AA\u0002\u0005m\u0006\u0002\u0003B0\u0001\u0001\u0006K!a/\u0002#M$(/Z1nS:<'+\u001e8oS:<\u0007\u0005C\u0005\u0003d\u0001\u0011\r\u0011\"\u0001\u0003f\u0005\u00112\u000f\u001e:fC6Lgn\u001a'jgR,g.\u001a:t+\t\u00119\u0007E\u0004\u0003j\tM$Ea\u001e\u000e\u0005\t-$\u0002\u0002B7\u0005_\nq!\\;uC\ndWMC\u0002\u0003r9\t!bY8mY\u0016\u001cG/[8o\u0013\u0011\u0011)Ha\u001b\u0003\u00075\u000b\u0007\u000f\u0005\u0003\u0003z\tuTB\u0001B>\u0015\t)G!\u0003\u0003\u0003\u0000\tm$!D&bM.\f7i\u001c8tk6,'\u000f\u0003\u0005\u0003\u0004\u0002\u0001\u000b\u0011\u0002B4\u0003M\u0019HO]3b[&tw\rT5ti\u0016tWM]:!\u0011)\u00119\t\u0001EC\u0002\u0013\u0005!\u0011R\u0001\u000eW\u000647.\u0019)s_\u0012,8-\u001a:\u0016\u0005\t-\u0005\u0003\u0002B=\u0005\u001bKAAa$\u0003|\ti1*\u00194lCB\u0013x\u000eZ;dKJD!Ba%\u0001\u0011\u0003\u0005\u000b\u0015\u0002BF\u00039Y\u0017MZ6b!J|G-^2fe\u0002B!Ba&\u0001\u0011\u000b\u0007I\u0011\u0001BE\u0003EY\u0017MZ6b\t\u0006$\u0018\r\u0015:pIV\u001cWM\u001d\u0005\u000b\u00057\u0003\u0001\u0012!Q!\n\t-\u0015AE6bM.\fG)\u0019;b!J|G-^2fe\u0002B\u0011Ba(\u0001\u0005\u0004%\tA!)\u0002\u0017I,GO]=Q_2L7-_\u000b\u0003\u0005G\u0003BA!*\u000386\u0011!q\u0015\u0006\u0005\u0005S\u0013Y+A\u0003sKR\u0014\u0018P\u0003\u0003\u0003.\n=\u0016aB2ve\u0006$xN\u001d\u0006\u0005\u0005c\u0013\u0019,\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0003\u0005k\u000b1a\u001c:h\u0013\u0011\u0011ILa*\u0003\u0019I+GO]=P]\u0016$\u0016.\\3\t\u0011\tu\u0006\u0001)A\u0005\u0005G\u000bAB]3uef\u0004v\u000e\\5ds\u0002B!B!1\u0001\u0011\u000b\u0007I\u0011\u0001Bb\u0003=Qxn\\6fKB,'o\u00117jK:$XC\u0001Bc!\u0011\u00119M!4\u000e\u0005\t%'\u0002\u0002Bf\u0005W\u000b\u0011B\u001a:b[\u0016<xN]6\n\t\t='\u0011\u001a\u0002\u0011\u0007V\u0014\u0018\r^8s\rJ\fW.Z<pe.D!Ba5\u0001\u0011\u0003\u0005\u000b\u0015\u0002Bc\u0003AQxn\\6fKB,'o\u00117jK:$\b\u0005C\u0006\u0003X\u0002\u0001\r\u00111A\u0005\u0002\te\u0017\u0001\u0004;pa&\u001c7+\u001a:wS\u000e,WC\u0001Bn!\u0011\u0011iN!:\u000e\u0005\t}'\u0002\u0002Bq\u0005G\fqa]3sm&\u001cWM\u0003\u0002fY&!!q\u001dBp\u00051!v\u000e]5d'\u0016\u0014h/[2f\u0011-\u0011Y\u000f\u0001a\u0001\u0002\u0004%\tA!<\u0002!Q|\u0007/[2TKJ4\u0018nY3`I\u0015\fHcA\u000f\u0003p\"Q!q\u0003Bu\u0003\u0003\u0005\rAa7\t\u0011\tM\b\u0001)Q\u0005\u00057\fQ\u0002^8qS\u000e\u001cVM\u001d<jG\u0016\u0004\u0003\"\u0003B|\u0001\u0001\u0007I\u0011\u0001B\u0003\u0003)\t7m\u001b+j[\u0016|U\u000f\u001e\u0005\n\u0005w\u0004\u0001\u0019!C\u0001\u0005{\fa\"Y2l)&lWmT;u?\u0012*\u0017\u000fF\u0002\u001e\u0005\u007fD!Ba\u0006\u0003z\u0006\u0005\t\u0019AAB\u0011!\u0019\u0019\u0001\u0001Q!\n\u0005\r\u0015aC1dWRKW.Z(vi\u0002B!ba\u0002\u0001\u0011\u000b\u0007I\u0011AB\u0005\u0003EQxn\\6fKB,'oQ8ogVlWM]\u000b\u0003\u0007\u0017\u0001Ba!\u0004\u0004\u00145\u00111q\u0002\u0006\u0004\u0007#\u0011\u0011!\u0003>p_.,W\r]3s\u0013\u0011\u0019)ba\u0004\u0003#i{wn[3fa\u0016\u00148i\u001c8tk6,'\u000f\u0003\u0006\u0004\u001a\u0001A\t\u0011)Q\u0005\u0007\u0017\t!C_8pW\u0016,\u0007/\u001a:D_:\u001cX/\\3sA!Q1Q\u0004\u0001\t\u0006\u0004%\taa\b\u0002\u001bMLhnY(qKJ\fG/[8o+\t\u0019\t\u0003E\u0002\u0014\u0007GI1a!\n\u0003\u0005e\u0019FO]3b[&tw-\u0011)J'ft7m\u00149fe\u0006$\u0018n\u001c8\t\u0015\r%\u0002\u0001#A!B\u0013\u0019\t#\u0001\bts:\u001cw\n]3sCRLwN\u001c\u0011\t\u0015\r5\u0002\u0001#b\u0001\n\u0003\u0019y#\u0001\bbgft7m\u00149fe\u0006$\u0018n\u001c8\u0016\u0005\rE\u0002cA\n\u00044%\u00191Q\u0007\u0002\u00035M#(/Z1nS:<\u0017\tU%Bgft7m\u00149fe\u0006$\u0018n\u001c8\t\u0015\re\u0002\u0001#A!B\u0013\u0019\t$A\bbgft7m\u00149fe\u0006$\u0018n\u001c8!\u0011)\u0019i\u0004\u0001EC\u0002\u0013\u00051qH\u0001\u0010gR\fG/^:Pa\u0016\u0014\u0018\r^5p]V\u00111\u0011\t\t\u0004'\r\r\u0013bAB#\u0005\tI2\u000b\u001e:fC6LgnZ!Q\u00132K7\u000f^(qKJ\fG/[8o\u0011)\u0019I\u0005\u0001E\u0001B\u0003&1\u0011I\u0001\u0011gR\fG/^:Pa\u0016\u0014\u0018\r^5p]\u0002Bqa!\u0014\u0001\t\u0013\u0019y%\u0001\ndQ\u0016\u001c7.\u00129iK6,'/\u00197O_\u0012,G#A\u000f\t\u000f\rM\u0003\u0001\"\u0003\u0004P\u000592\u000f^1si\u0016\u0003\b.Z7fe\u0006dgj\u001c3f/\u0006$8\r\u001b\u0005\b\u0007/\u0002A\u0011BB(\u0003=Ig.\u001b;jC2L'0\u001a+pa&\u001c\u0007bBB.\u0001\u0011%1qJ\u0001\u0015G\",7m[*ue\u0016\fW.\u001b8h'R\fG/^:\t\u000f\r}\u0003\u0001\"\u0003\u0004P\u0005Y\u0011\r\u001a3MSN$XM\\3s\u000f\u001d\u0019\u0019G\u0001E\u0001\u0007K\n1c\u0015;sCRLwn\u0015;sK\u0006l\u0017N\\4B!&\u00032aEB4\r\u0019\t!\u0001#\u0001\u0004jM)1q\r\u0007\u0004lA\u00191c!\u001c\n\u0007\r=$AA\rTiJ\fG/[8TiJ,\u0017-\\5oO\u0006\u0003\u0016jQ8oM&<\u0007bB\f\u0004h\u0011\u000511\u000f\u000b\u0003\u0007KB1ba\u001e\u0004h!\u0015\r\u0011\"\u0001\u0004z\u0005\u0019An\\4\u0016\u0005\rm\u0004\u0003BB?\u0007\u0007k!aa \u000b\t\r\u0005%1W\u0001\u0006g24GG[\u0005\u0005\u0007\u000b\u001byH\u0001\u0004M_\u001e<WM\u001d\u0005\f\u0007\u0013\u001b9\u0007#A!B\u0013\u0019Y(\u0001\u0003m_\u001e\u0004\u0003")
public class StratioStreamingAPI
implements IStratioStreamingAPI {
    private final String streamingTopicName = InternalTopic.TOPIC_REQUEST.getTopicName();
    private final String streamingDataTopicName = InternalTopic.TOPIC_DATA.getTopicName();
    private final String sessionId = String.valueOf(BoxesRunTime.boxToLong((long)System.currentTimeMillis()));
    private String consumerBrokerServer;
    private int consumerBrokerPort;
    private String kafkaCluster = "";
    private String kafkaBroker;
    private String zookeeperServer = "";
    private String zookeeperCluster;
    private boolean streamingUp = false;
    private boolean streamingRunning = false;
    private final Map<String, KafkaConsumer> streamingListeners = (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
    private KafkaProducer kafkaProducer;
    private KafkaProducer kafkaDataProducer;
    private final RetryOneTime retryPolicy = new RetryOneTime(500);
    private CuratorFramework zookeeperClient;
    private TopicService topicService;
    private int ackTimeOut = 8000;
    private ZookeeperConsumer zookeeperConsumer;
    private StreamingAPISyncOperation syncOperation;
    private StreamingAPIAsyncOperation asyncOperation;
    private StreamingAPIListOperation statusOperation;
    private volatile int bitmap$0;

    public static void com$stratio$streaming$api$StratioStreamingAPIConfig$_setter_$config_$eq(Config config) {
        StratioStreamingAPI$.MODULE$.com$stratio$streaming$api$StratioStreamingAPIConfig$_setter_$config_$eq(config);
    }

    public static Config config() {
        return StratioStreamingAPI$.MODULE$.config();
    }

    public static Logger log() {
        return StratioStreamingAPI$.MODULE$.log();
    }

    private String consumerBrokerServer$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 1) == 0) {
                this.consumerBrokerServer = this.kafkaCluster().split(",")[0].split(":")[0];
                this.bitmap$0 |= 1;
            }
            return this.consumerBrokerServer;
        }
    }

    private int consumerBrokerPort$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 2) == 0) {
                this.consumerBrokerPort = new StringOps(Predef$.MODULE$.augmentString(this.kafkaCluster().split(",")[0].split(":")[1])).toInt();
                this.bitmap$0 |= 2;
            }
            return this.consumerBrokerPort;
        }
    }

    private String kafkaBroker$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 4) == 0) {
                this.kafkaBroker = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.kafkaCluster()}));
                this.bitmap$0 |= 4;
            }
            return this.kafkaBroker;
        }
    }

    private String zookeeperCluster$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 8) == 0) {
                this.zookeeperCluster = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.zookeeperServer()}));
                this.bitmap$0 |= 8;
            }
            return this.zookeeperCluster;
        }
    }

    private KafkaProducer kafkaProducer$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 0x10) == 0) {
                this.kafkaProducer = new KafkaProducer(InternalTopic.TOPIC_REQUEST.getTopicName(), this.kafkaBroker(), KafkaProducer$.MODULE$.$lessinit$greater$default$3(), KafkaProducer$.MODULE$.$lessinit$greater$default$4(), KafkaProducer$.MODULE$.$lessinit$greater$default$5(), KafkaProducer$.MODULE$.$lessinit$greater$default$6(), KafkaProducer$.MODULE$.$lessinit$greater$default$7(), KafkaProducer$.MODULE$.$lessinit$greater$default$8());
                this.bitmap$0 |= 0x10;
            }
            return this.kafkaProducer;
        }
    }

    private KafkaProducer kafkaDataProducer$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 0x20) == 0) {
                this.kafkaDataProducer = new KafkaProducer(InternalTopic.TOPIC_DATA.getTopicName(), this.kafkaBroker(), KafkaProducer$.MODULE$.$lessinit$greater$default$3(), KafkaProducer$.MODULE$.$lessinit$greater$default$4(), KafkaProducer$.MODULE$.$lessinit$greater$default$5(), KafkaProducer$.MODULE$.$lessinit$greater$default$6(), KafkaProducer$.MODULE$.$lessinit$greater$default$7(), KafkaProducer$.MODULE$.$lessinit$greater$default$8());
                this.bitmap$0 |= 0x20;
            }
            return this.kafkaDataProducer;
        }
    }

    private CuratorFramework zookeeperClient$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 0x40) == 0) {
                this.zookeeperClient = CuratorFrameworkFactory.newClient((String)this.zookeeperCluster(), (RetryPolicy)this.retryPolicy());
                this.bitmap$0 |= 0x40;
            }
            return this.zookeeperClient;
        }
    }

    private ZookeeperConsumer zookeeperConsumer$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 0x80) == 0) {
                this.zookeeperClient().start();
                this.zookeeperConsumer = new ZookeeperConsumer(this.zookeeperClient());
                this.bitmap$0 |= 0x80;
            }
            return this.zookeeperConsumer;
        }
    }

    private StreamingAPISyncOperation syncOperation$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 0x100) == 0) {
                this.syncOperation = new StreamingAPISyncOperation(this.kafkaProducer(), this.zookeeperConsumer(), this.ackTimeOut());
                this.bitmap$0 |= 0x100;
            }
            return this.syncOperation;
        }
    }

    private StreamingAPIAsyncOperation asyncOperation$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 0x200) == 0) {
                this.asyncOperation = new StreamingAPIAsyncOperation(this.kafkaDataProducer());
                this.bitmap$0 |= 0x200;
            }
            return this.asyncOperation;
        }
    }

    private StreamingAPIListOperation statusOperation$lzycompute() {
        StratioStreamingAPI stratioStreamingAPI = this;
        synchronized (stratioStreamingAPI) {
            if ((this.bitmap$0 & 0x400) == 0) {
                this.statusOperation = new StreamingAPIListOperation(this.kafkaProducer(), this.zookeeperConsumer(), this.ackTimeOut());
                this.bitmap$0 |= 0x400;
            }
            return this.statusOperation;
        }
    }

    @Override
    public void createStream(String streamName, java.util.List<ColumnNameType> columns) {
        this.checkStreamingStatus();
        String operation = "CREATE".toLowerCase();
        StratioStreamingMessage creationStreamMessage = new MessageBuilderWithColumns(this.sessionId(), operation).build(streamName, columns);
        this.syncOperation().performSyncOperation(creationStreamMessage);
    }

    @Override
    public void alterStream(String streamName, java.util.List<ColumnNameType> columns) {
        this.checkStreamingStatus();
        String operation = "ALTER".toLowerCase();
        StratioStreamingMessage alterStreamMessage = new MessageBuilderWithColumns(this.sessionId(), operation).build(streamName, columns);
        this.syncOperation().performSyncOperation(alterStreamMessage);
    }

    @Override
    public void insertData(String streamName, java.util.List<ColumnNameValue> data) {
        this.checkStreamingStatus();
        StratioStreamingMessage insertStreamMessage = new InsertMessageBuilder(this.sessionId()).build(streamName, data);
        this.asyncOperation().performAsyncOperation(insertStreamMessage);
    }

    @Override
    public String addQuery(String streamName, String query) {
        this.checkStreamingStatus();
        String operation = "ADD_QUERY".toLowerCase();
        StratioStreamingMessage addQueryStreamMessage = new QueryMessageBuilder(this.sessionId()).build(streamName, query, operation);
        this.syncOperation().performSyncOperation(addQueryStreamMessage);
        return this.getQueryId(streamName, query);
    }

    public String getQueryId(String streamName, String query) {
        String string;
        java.util.List<StratioQueryStream> queries = this.queriesFromStream(streamName);
        Option addedQuery = JavaConversions$.MODULE$.asScalaBuffer(queries).find((Function1)new Serializable(this, query){
            public static final long serialVersionUID = 0L;
            private final String query$1;

            public final boolean apply(StratioQueryStream theQuery) {
                return theQuery.query().equals(this.query$1);
            }
            {
                this.query$1 = query$1;
            }
        });
        Option option = addedQuery;
        if (option instanceof Some) {
            Some some = (Some)option;
            StratioQueryStream q = (StratioQueryStream)some.x();
            string = q.queryId();
        } else {
            string = "";
        }
        return string;
    }

    @Override
    public void removeQuery(String streamName, String queryId) {
        this.checkStreamingStatus();
        String operation = "REMOVE_QUERY".toLowerCase();
        StratioStreamingMessage removeQueryMessage = new QueryMessageBuilder(this.sessionId()).build(streamName, queryId, operation);
        this.syncOperation().performSyncOperation(removeQueryMessage);
    }

    @Override
    public void dropStream(String streamName) {
        this.checkStreamingStatus();
        String operation = "DROP".toLowerCase();
        StratioStreamingMessage dropStreamMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(dropStreamMessage);
    }

    @Override
    public KafkaStream<String, StratioStreamingMessage> listenStream(String streamName) {
        this.checkStreamingStatus();
        String operation = "LISTEN".toLowerCase();
        StratioStreamingMessage listenStreamMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(listenStreamMessage);
        String x$1 = streamName;
        String x$2 = this.zookeeperCluster();
        boolean x$3 = false;
        String x$4 = KafkaConsumer$.MODULE$.$lessinit$greater$default$3();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(x$1, x$2, x$4, x$3);
        this.streamingListeners().put((Object)streamName, (Object)kafkaConsumer);
        return kafkaConsumer.stream();
    }

    @Override
    public void stopListenStream(String streamName) {
        this.checkStreamingStatus();
        String operation = "STOP_LISTEN".toLowerCase();
        StratioStreamingMessage stopListenStreamMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.shutdownKafkaConsumerAndRemoveStreamingListener(streamName);
        this.syncOperation().performSyncOperation(stopListenStreamMessage);
    }

    private void shutdownKafkaConsumerAndRemoveStreamingListener(String streamName) {
        Option kafkaConsumer = this.streamingListeners().get((Object)streamName);
        Option option = kafkaConsumer;
        if (option instanceof Some) {
            Some some = (Some)option;
            KafkaConsumer consumer = (KafkaConsumer)some.x();
            consumer.close();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        this.streamingListeners().remove((Object)streamName);
    }

    @Override
    public java.util.List<StratioQueryStream> queriesFromStream(String stream) {
        Option stratioStream;
        List stratioStreams = JavaConversions$.MODULE$.asScalaBuffer(this.listStreams()).toList();
        Option option = stratioStream = stratioStreams.find((Function1)new Serializable(this, stream){
            public static final long serialVersionUID = 0L;
            private final String stream$1;

            public final boolean apply(StratioStream element) {
                return element.getStreamName().equals(this.stream$1);
            }
            {
                this.stream$1 = stream$1;
            }
        });
        None$ none$ = None$.MODULE$;
        Option option2 = option;
        if (!(none$ != null ? !none$.equals(option2) : option2 != null)) {
            throw new StratioEngineOperationException("StratioEngine error: STREAM DOES NOT EXIST");
        }
        if (option instanceof Some) {
            Some some = (Some)option;
            StratioStream element = (StratioStream)some.x();
            java.util.List list = JavaConversions$.MODULE$.bufferAsJavaList((Buffer)JavaConversions$.MODULE$.asScalaBuffer(element.getQueries()).map((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final StratioQueryStream apply(StreamQuery query) {
                    return new StratioQueryStream(query.getQuery(), query.getQueryId());
                }
            }, Buffer$.MODULE$.canBuildFrom()));
            return list;
        }
        throw new MatchError((Object)option);
    }

    @Override
    public java.util.List<ColumnNameTypeValue> columnsFromStream(String stream) {
        Option stratioStream;
        List stratioStreams = JavaConversions$.MODULE$.asScalaBuffer(this.listStreams()).toList();
        Option option = stratioStream = stratioStreams.find((Function1)new Serializable(this, stream){
            public static final long serialVersionUID = 0L;
            private final String stream$2;

            public final boolean apply(StratioStream element) {
                return element.getStreamName().equals(this.stream$2);
            }
            {
                this.stream$2 = stream$2;
            }
        });
        None$ none$ = None$.MODULE$;
        Option option2 = option;
        if (!(none$ != null ? !none$.equals(option2) : option2 != null)) {
            throw new StratioEngineOperationException("StratioEngine error: STREAM DOES NOT EXIST");
        }
        if (option instanceof Some) {
            Some some = (Some)option;
            StratioStream element = (StratioStream)some.x();
            java.util.List list = element.getColumns();
            return list;
        }
        throw new MatchError((Object)option);
    }

    @Override
    public java.util.List<StratioStream> listStreams() {
        this.checkStreamingStatus();
        String operation = "LIST".toLowerCase();
        StratioStreamingMessage listStreamMessage = MessageBuilder$.MODULE$.builder().withOperation(operation).withSessionId(this.sessionId()).build();
        return this.statusOperation().getListStreams(listStreamMessage);
    }

    @Override
    public void saveToCassandra(String streamName) {
        this.checkStreamingStatus();
        String operation = "SAVETO_CASSANDRA".toLowerCase();
        StratioStreamingMessage saveToCassandraMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(saveToCassandraMessage);
    }

    @Override
    public void stopSaveToCassandra(String streamName) {
        this.checkStreamingStatus();
        String operation = "STOP_SAVETO_CASSANDRA".toLowerCase();
        StratioStreamingMessage stopSaveToCassandraMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(stopSaveToCassandraMessage);
    }

    @Override
    public void saveToMongo(String streamName) {
        this.checkStreamingStatus();
        String operation = "SAVETO_MONGO".toLowerCase();
        StratioStreamingMessage saveToCassandraMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(saveToCassandraMessage);
    }

    @Override
    public void stopSaveToMongo(String streamName) {
        this.checkStreamingStatus();
        String operation = "STOP_SAVETO_MONGO".toLowerCase();
        StratioStreamingMessage stopSaveToCassandraMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(stopSaveToCassandraMessage);
    }

    @Override
    public void saveToSolr(String streamName) {
        this.checkStreamingStatus();
        String operation = "SAVETO_SOLR".toLowerCase();
        StratioStreamingMessage saveToSolrMEssage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(saveToSolrMEssage);
    }

    @Override
    public void stopSaveToSolr(String streamName) {
        this.checkStreamingStatus();
        String operation = "STOP_SAVETO_SOLR".toLowerCase();
        StratioStreamingMessage stopSaveToSolrMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(stopSaveToSolrMessage);
    }

    @Override
    public void indexStream(String streamName) {
        this.checkStreamingStatus();
        String operation = "INDEX".toLowerCase();
        StratioStreamingMessage indexStreamMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(indexStreamMessage);
    }

    @Override
    public void stopIndexStream(String streamName) {
        this.checkStreamingStatus();
        String operation = "STOP_INDEX".toLowerCase();
        StratioStreamingMessage indexStreamMessage = new StreamMessageBuilder(this.sessionId()).build(streamName, operation);
        this.syncOperation().performSyncOperation(indexStreamMessage);
    }

    @Override
    public IStratioStreamingAPI initialize() {
        try {
            String brokerServer = StratioStreamingAPI$.MODULE$.config().getString("kafka.server");
            int brokerPort = StratioStreamingAPI$.MODULE$.config().getInt("kafka.port");
            this.kafkaCluster_$eq(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{brokerServer, BoxesRunTime.boxToInteger((int)brokerPort)})));
            String theZookeeperServer = StratioStreamingAPI$.MODULE$.config().getString("zookeeper.server");
            int zookeeperPort = StratioStreamingAPI$.MODULE$.config().getInt("zookeeper.port");
            this.zookeeperServer_$eq(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{theZookeeperServer, BoxesRunTime.boxToInteger((int)zookeeperPort)})));
            StratioStreamingAPI$.MODULE$.log().info("Establishing connection with the engine...");
            this.checkEphemeralNode();
            this.startEphemeralNodeWatch();
            StratioStreamingAPI$.MODULE$.log().info("Initializing kafka topic...");
            this.initializeTopic();
            return this;
        }
        catch (Throwable throwable) {
            throw new StratioEngineConnectionException(new StringBuilder().append((Object)"Unable to connect to stratio streaming. ").append((Object)throwable.getMessage()).toString());
        }
    }

    @Override
    public IStratioStreamingAPI initializeWithServerConfig(String kafkaServer, int kafkaPort, String theZookeeperServer, int theZookeeperPort) {
        try {
            this.kafkaCluster_$eq(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{kafkaServer, BoxesRunTime.boxToInteger((int)kafkaPort)})));
            this.zookeeperServer_$eq(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{theZookeeperServer, BoxesRunTime.boxToInteger((int)theZookeeperPort)})));
            StratioStreamingAPI$.MODULE$.log().info("Establishing connection with the engine...");
            this.checkEphemeralNode();
            this.startEphemeralNodeWatch();
            StratioStreamingAPI$.MODULE$.log().info("Initializing kafka topic...");
            this.initializeTopic();
            return this;
        }
        catch (Throwable throwable) {
            throw new StratioEngineConnectionException(new StringBuilder().append((Object)"Unable to connect to stratio streaming. ").append((Object)throwable.getMessage()).toString());
        }
    }

    @Override
    public IStratioStreamingAPI withServerConfig(String kafkaQuorum, String zookeeperQuorum) {
        this.kafkaCluster_$eq(kafkaQuorum);
        this.zookeeperServer_$eq(zookeeperQuorum);
        return this;
    }

    @Override
    public IStratioStreamingAPI withServerConfig(String kafkaHost, int kafkaPort, String zookeeperHost, int zookeeperPort) {
        this.kafkaCluster_$eq(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{kafkaHost, BoxesRunTime.boxToInteger((int)kafkaPort)})));
        this.zookeeperServer_$eq(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{zookeeperHost, BoxesRunTime.boxToInteger((int)zookeeperPort)})));
        return this;
    }

    @Override
    public IStratioStreamingAPI init() {
        try {
            StratioStreamingAPI$.MODULE$.log().info("Establishing connection with the engine...");
            this.checkEphemeralNode();
            this.startEphemeralNodeWatch();
            StratioStreamingAPI$.MODULE$.log().info("Initializing kafka topic...");
            this.initializeTopic();
            return this;
        }
        catch (Throwable throwable) {
            throw new StratioEngineConnectionException(new StringBuilder().append((Object)"Unable to connect to stratio streaming. ").append((Object)throwable.getMessage()).toString());
        }
    }

    @Override
    public boolean isInit() {
        return this.streamingUp() && this.streamingRunning();
    }

    @Override
    public boolean isConnected() {
        return this.streamingUp() && this.streamingRunning();
    }

    @Override
    public StratioStreamingAPI defineAcknowledgeTimeOut(int timeOutInMs) {
        this.ackTimeOut_$eq(timeOutInMs);
        return this;
    }

    @Override
    public void close() {
        this.kafkaProducer().close();
        this.kafkaDataProducer().close();
        this.topicService().close();
        this.zookeeperClient().close();
    }

    public String streamingTopicName() {
        return this.streamingTopicName;
    }

    public String streamingDataTopicName() {
        return this.streamingDataTopicName;
    }

    public String sessionId() {
        return this.sessionId;
    }

    public String consumerBrokerServer() {
        return (this.bitmap$0 & 1) == 0 ? this.consumerBrokerServer$lzycompute() : this.consumerBrokerServer;
    }

    public int consumerBrokerPort() {
        return (this.bitmap$0 & 2) == 0 ? this.consumerBrokerPort$lzycompute() : this.consumerBrokerPort;
    }

    public String kafkaCluster() {
        return this.kafkaCluster;
    }

    public void kafkaCluster_$eq(String x$1) {
        this.kafkaCluster = x$1;
    }

    public String kafkaBroker() {
        return (this.bitmap$0 & 4) == 0 ? this.kafkaBroker$lzycompute() : this.kafkaBroker;
    }

    public String zookeeperServer() {
        return this.zookeeperServer;
    }

    public void zookeeperServer_$eq(String x$1) {
        this.zookeeperServer = x$1;
    }

    public String zookeeperCluster() {
        return (this.bitmap$0 & 8) == 0 ? this.zookeeperCluster$lzycompute() : this.zookeeperCluster;
    }

    public boolean streamingUp() {
        return this.streamingUp;
    }

    public void streamingUp_$eq(boolean x$1) {
        this.streamingUp = x$1;
    }

    public boolean streamingRunning() {
        return this.streamingRunning;
    }

    public void streamingRunning_$eq(boolean x$1) {
        this.streamingRunning = x$1;
    }

    public Map<String, KafkaConsumer> streamingListeners() {
        return this.streamingListeners;
    }

    public KafkaProducer kafkaProducer() {
        return (this.bitmap$0 & 0x10) == 0 ? this.kafkaProducer$lzycompute() : this.kafkaProducer;
    }

    public KafkaProducer kafkaDataProducer() {
        return (this.bitmap$0 & 0x20) == 0 ? this.kafkaDataProducer$lzycompute() : this.kafkaDataProducer;
    }

    public RetryOneTime retryPolicy() {
        return this.retryPolicy;
    }

    public CuratorFramework zookeeperClient() {
        return (this.bitmap$0 & 0x40) == 0 ? this.zookeeperClient$lzycompute() : this.zookeeperClient;
    }

    public TopicService topicService() {
        return this.topicService;
    }

    public void topicService_$eq(TopicService x$1) {
        this.topicService = x$1;
    }

    public int ackTimeOut() {
        return this.ackTimeOut;
    }

    public void ackTimeOut_$eq(int x$1) {
        this.ackTimeOut = x$1;
    }

    public ZookeeperConsumer zookeeperConsumer() {
        return (this.bitmap$0 & 0x80) == 0 ? this.zookeeperConsumer$lzycompute() : this.zookeeperConsumer;
    }

    public StreamingAPISyncOperation syncOperation() {
        return (this.bitmap$0 & 0x100) == 0 ? this.syncOperation$lzycompute() : this.syncOperation;
    }

    public StreamingAPIAsyncOperation asyncOperation() {
        return (this.bitmap$0 & 0x200) == 0 ? this.asyncOperation$lzycompute() : this.asyncOperation;
    }

    public StreamingAPIListOperation statusOperation() {
        return (this.bitmap$0 & 0x400) == 0 ? this.statusOperation$lzycompute() : this.statusOperation;
    }

    private void checkEphemeralNode() {
        block2: {
            String string;
            block5: {
                block4: {
                    block3: {
                        String streamingStatus;
                        String ephemeralNodePath = "/stratio/streaming/status";
                        if (!this.zookeeperConsumer().zNodeExists(ephemeralNodePath)) break block2;
                        String string2 = string = (streamingStatus = (String)this.zookeeperConsumer().getZNodeData("/stratio/streaming/status").get());
                        if ("connected" != null ? !"connected".equals(string2) : string2 != null) break block3;
                        this.streamingUp_$eq(true);
                        this.streamingRunning_$eq(false);
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        break block4;
                    }
                    String string3 = string;
                    if ("initialized" != null ? !"initialized".equals(string3) : string3 != null) break block5;
                    this.streamingUp_$eq(true);
                    this.streamingRunning_$eq(true);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
            throw new MatchError((Object)string);
        }
        StratioStreamingAPI$.MODULE$.log().warn("Ephemeral node does not exist");
        throw new StratioEngineStatusException("Can't connect. Check if Stratio streaming is down or connection to Zookeeper");
    }

    private void startEphemeralNodeWatch() {
        ((Pathable)this.zookeeperClient().checkExists().watched()).forPath("/stratio/streaming/status");
        this.addListener();
    }

    private void initializeTopic() {
        this.topicService_$eq((TopicService)new KafkaTopicService(this.zookeeperCluster(), this.consumerBrokerServer(), this.consumerBrokerPort(), 10000, 10000));
        this.topicService().createTopicIfNotExist(this.streamingTopicName(), 1, 1);
        this.topicService().createTopicIfNotExist(this.streamingDataTopicName(), 1, 1);
    }

    private void checkStreamingStatus() {
        if (this.streamingUp()) {
            if (this.streamingRunning()) {
                return;
            }
            throw new StratioEngineStatusException("Stratio streaming not yet initialized");
        }
        throw new StratioEngineStatusException("Stratio streaming is down");
    }

    private void addListener() {
        this.zookeeperClient().getCuratorListenable().addListener((Object)new CuratorListener(this){
            private final /* synthetic */ StratioStreamingAPI $outer;

            /*
             * Unable to fully structure code
             */
            public void eventReceived(CuratorFramework client, CuratorEvent event) {
                block5: {
                    block2: {
                        block4: {
                            block3: {
                                var3_3 = event.getType();
                                v0 = CuratorEventType.WATCHED;
                                var4_4 = var3_3;
                                if (v0 != null ? v0.equals(var4_4) == false : var4_4 != null) break block2;
                                ((Pathable)this.$outer.zookeeperClient().checkExists().watched()).forPath("/stratio/streaming/status");
                                var6_5 = false;
                                var7_6 = null;
                                var8_7 = this.$outer.zookeeperConsumer().getZNodeData("/stratio/streaming/status");
                                if (!(var8_7 instanceof Some)) break block3;
                                var6_5 = true;
                                var7_6 = (Some)var8_7;
                                var10_9 = var9_8 = (String)var7_6.x();
                                if ("connected" != null ? "connected".equals(var10_9) == false : var10_9 != null) break block3;
                                this.$outer.streamingUp_$eq(true);
                                var11_10 = BoxedUnit.UNIT;
                                break block4;
                            }
                            if (!var6_5) ** GOTO lbl-1000
                            var13_14 = var12_13 = (String)var7_6.x();
                            if (!("initialized" != null ? "initialized".equals(var13_14) == false : var13_14 != null)) {
                                this.$outer.streamingUp_$eq(false);
                                var11_11 = BoxedUnit.UNIT;
                            } else lbl-1000:
                            // 2 sources

                            {
                                this.$outer.streamingUp_$eq(false);
                                this.$outer.streamingRunning_$eq(false);
                                var11_12 = BoxedUnit.UNIT;
                            }
                        }
                        var5_15 = BoxedUnit.UNIT;
                        break block5;
                    }
                    StratioStreamingAPI$.MODULE$.log().debug("Unused curatorEvent {}", new Object[]{var3_3});
                    var5_16 = BoxedUnit.UNIT;
                }
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }
}

