/*
 * Decompiled with CFR 0.152.
 */
package com.stratio.streaming.api;

import com.google.gson.Gson;
import com.stratio.streaming.api.StratioStreamingAPIConfig;
import com.stratio.streaming.api.StratioStreamingAPIConfig$class;
import com.stratio.streaming.commons.exceptions.StratioEngineOperationException;
import com.stratio.streaming.commons.messages.StratioStreamingMessage;
import com.stratio.streaming.kafka.KafkaProducer;
import com.stratio.streaming.zookeeper.ZookeeperConsumer;
import com.typesafe.config.Config;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.mutable.StringBuilder;
import scala.concurrent.Await$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u001d4A!\u0001\u0002\u0001\u0017\t)2\u000b\u001e:fC6LgnZ!Q\u0013>\u0003XM]1uS>t'BA\u0002\u0005\u0003\r\t\u0007/\u001b\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011aB:ue\u0006$\u0018n\u001c\u0006\u0002\u0013\u0005\u00191m\\7\u0004\u0001M\u0019\u0001\u0001\u0004\n\u0011\u00055\u0001R\"\u0001\b\u000b\u0003=\tQa]2bY\u0006L!!\u0005\b\u0003\r\u0005s\u0017PU3g!\t\u0019B#D\u0001\u0003\u0013\t)\"AA\rTiJ\fG/[8TiJ,\u0017-\\5oO\u0006\u0003\u0016jQ8oM&<\u0007\"B\f\u0001\t\u0003A\u0012A\u0002\u001fj]&$h\bF\u0001\u001a!\t\u0019\u0002\u0001C\u0004\u001c\u0001\t\u0007I\u0011\u0003\u000f\u0002\u00071|w-F\u0001\u001e!\tq2%D\u0001 \u0015\t\u0001\u0013%A\u0003tY\u001a$$NC\u0001#\u0003\ry'oZ\u0005\u0003I}\u0011a\u0001T8hO\u0016\u0014\bB\u0002\u0014\u0001A\u0003%Q$\u0001\u0003m_\u001e\u0004\u0003\"\u0002\u0015\u0001\t#I\u0013AF1eI6+7o]1hKR{7*\u00194lCR{\u0007/[2\u0015\t)js\u0007\u0011\t\u0003\u001b-J!\u0001\f\b\u0003\tUs\u0017\u000e\u001e\u0005\u0006]\u001d\u0002\raL\u0001\b[\u0016\u001c8/Y4f!\t\u0001T'D\u00012\u0015\t\u00114'\u0001\u0005nKN\u001c\u0018mZ3t\u0015\t!D!A\u0004d_6lwN\\:\n\u0005Y\n$aF*ue\u0006$\u0018n\\*ue\u0016\fW.\u001b8h\u001b\u0016\u001c8/Y4f\u0011\u0015At\u00051\u0001:\u0003A\u0019'/Z1uS>tWK\\5rk\u0016LE\r\u0005\u0002;{9\u0011QbO\u0005\u0003y9\ta\u0001\u0015:fI\u00164\u0017B\u0001 @\u0005\u0019\u0019FO]5oO*\u0011AH\u0004\u0005\u0006\u0003\u001e\u0002\rAQ\u0001\u000ei\u0006\u0014G.\u001a)s_\u0012,8-\u001a:\u0011\u0005\r3U\"\u0001#\u000b\u0005\u0015#\u0011!B6bM.\f\u0017BA$E\u00055Y\u0015MZ6b!J|G-^2fe\")\u0011\n\u0001C\t\u0015\u0006Ir-\u001a;Pa\u0016\u0014\u0018\r^5p]jsu\u000eZ3Gk2d\u0007+\u0019;i)\rI4*\u0014\u0005\u0006\u0019\"\u0003\r!O\u0001\n_B,'/\u0019;j_:DQA\u0014%A\u0002e\n\u0001\"\u001e8jcV,\u0017\n\u001a\u0005\u0006!\u0002!\t\"U\u0001\u001co\u0006LGOR8s)\",7\u000b\u001e:fC6Lgn\u001a*fgB|gn]3\u0015\tIK\u0016M\u0019\t\u0003'vr!\u0001V\u001e\u000f\u0005UCV\"\u0001,\u000b\u0005]S\u0011A\u0002\u001fs_>$h(C\u0001\u0010\u0011\u0015Qv\n1\u0001\\\u0003EQxn\\6fKB,'oQ8ogVlWM\u001d\t\u00039~k\u0011!\u0018\u0006\u0003=\u0012\t\u0011B_8pW\u0016,\u0007/\u001a:\n\u0005\u0001l&!\u0005.p_.,W\r]3s\u0007>t7/^7fe\")af\u0014a\u0001_!)1m\u0014a\u0001I\u0006q\u0011mY6US6,w*\u001e;J]6\u001b\bCA\u0007f\u0013\t1gBA\u0002J]R\u0004")
public class StreamingAPIOperation
implements StratioStreamingAPIConfig {
    private final Logger log;
    private final Config config;

    @Override
    public Config config() {
        return this.config;
    }

    @Override
    public void com$stratio$streaming$api$StratioStreamingAPIConfig$_setter_$config_$eq(Config x$1) {
        this.config = x$1;
    }

    public Logger log() {
        return this.log;
    }

    public void addMessageToKafkaTopic(StratioStreamingMessage message, String creationUniqueId, KafkaProducer tableProducer) {
        String kafkaMessage = new Gson().toJson((Object)message);
        tableProducer.send(kafkaMessage, message.getOperation());
    }

    public String getOperationZNodeFullPath(String operation, String uniqueId) {
        String zookeeperBasePath = "/stratio/streaming";
        String zookeeperPath = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/", "/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{zookeeperBasePath, operation, uniqueId}));
        return zookeeperPath;
    }

    public String waitForTheStreamingResponse(ZookeeperConsumer zookeeperConsumer, StratioStreamingMessage message, int ackTimeOutInMs) {
        String zNodeFullPath = this.getOperationZNodeFullPath(message.getOperation().toLowerCase(), message.getRequest_id());
        try {
            Await$.MODULE$.result(zookeeperConsumer.readZNode(zNodeFullPath), (Duration)new package.DurationInt(package$.MODULE$.DurationInt(ackTimeOutInMs)).milliseconds());
            Option<String> response = zookeeperConsumer.getZNodeData(zNodeFullPath);
            zookeeperConsumer.removeZNode(zNodeFullPath);
            return (String)response.get();
        }
        catch (TimeoutException timeoutException) {
            this.log().error(new StringBuilder().append((Object)"Ack timeout expired for: ").append((Object)message.getRequest()).toString());
            throw new StratioEngineOperationException(new StringBuilder().append((Object)"Acknowledge timeout expired").append((Object)message.getRequest()).toString());
        }
    }

    public StreamingAPIOperation() {
        StratioStreamingAPIConfig$class.$init$(this);
        this.log = LoggerFactory.getLogger(this.getClass());
    }
}

