package org.apache.iotdb.db.engine.trigger.builtin;

import java.util.HashMap;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.trigger.exception.TriggerExecutionException;
import org.apache.iotdb.db.engine.trigger.api.Trigger;
import org.apache.iotdb.db.engine.trigger.sink.api.Configuration;
import org.apache.iotdb.db.engine.trigger.sink.api.Event;
import org.apache.iotdb.db.engine.trigger.sink.api.Handler;
import org.apache.iotdb.db.engine.trigger.sink.exception.SinkException;
import org.apache.iotdb.db.engine.trigger.sink.forward.ForwardEvent;
import org.apache.iotdb.db.engine.trigger.sink.forward.http.HTTPForwardConfiguration;
import org.apache.iotdb.db.engine.trigger.sink.forward.http.HTTPForwardEvent;
import org.apache.iotdb.db.engine.trigger.sink.forward.http.HTTPForwardHandler;
import org.apache.iotdb.db.engine.trigger.sink.forward.mqtt.MQTTForwardConfiguration;
import org.apache.iotdb.db.engine.trigger.sink.forward.mqtt.MQTTForwardEvent;
import org.apache.iotdb.db.engine.trigger.sink.forward.mqtt.MQTTForwardHandler;
import org.apache.iotdb.db.engine.trigger.utils.BatchHandlerQueue;
import org.apache.iotdb.trigger.api.TriggerAttributes;
import org.apache.iotdb.tsfile.utils.Binary;

/* loaded from: input_file:org/apache/iotdb/db/engine/trigger/builtin/ForwardTrigger.class */
public class ForwardTrigger implements Trigger {
    private static final String PROTOCOL_HTTP = "http";
    private static final String PROTOCOL_MQTT = "mqtt";
    private Handler forwardHandler;
    private Configuration forwardConfig;
    private BatchHandlerQueue<Event> queue;
    private final HashMap<String, String> labels = new HashMap<>();
    private String protocol;

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public void onCreate(TriggerAttributes triggerAttributes) throws Exception {
        this.protocol = triggerAttributes.getStringOrDefault("protocol", PROTOCOL_HTTP).toLowerCase();
        int intOrDefault = triggerAttributes.getIntOrDefault("queueNumber", 8);
        int intOrDefault2 = triggerAttributes.getIntOrDefault("queueSize", 2000);
        int intOrDefault3 = triggerAttributes.getIntOrDefault("batchSize", 50);
        String str = this.protocol;
        boolean z = -1;
        switch (str.hashCode()) {
            case 3213448:
                if (str.equals(PROTOCOL_HTTP)) {
                    z = false;
                    break;
                }
                break;
            case 3359524:
                if (str.equals(PROTOCOL_MQTT)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this.forwardConfig = createHTTPConfiguration(triggerAttributes);
                this.forwardHandler = new HTTPForwardHandler();
                break;
            case true:
                this.forwardConfig = createMQTTConfiguration(triggerAttributes);
                this.forwardHandler = new MQTTForwardHandler();
                break;
            default:
                throw new TriggerExecutionException("Forward protocol doesn't support.");
        }
        this.queue = new BatchHandlerQueue<>(intOrDefault, intOrDefault2, intOrDefault3, this.forwardHandler);
        this.forwardHandler.open(this.forwardConfig);
    }

    private HTTPForwardConfiguration createHTTPConfiguration(TriggerAttributes triggerAttributes) throws SinkException {
        HTTPForwardConfiguration hTTPForwardConfiguration = new HTTPForwardConfiguration(triggerAttributes.getString("endpoint"), triggerAttributes.getBooleanOrDefault("stopIfException", false));
        hTTPForwardConfiguration.checkConfig();
        return hTTPForwardConfiguration;
    }

    private MQTTForwardConfiguration createMQTTConfiguration(TriggerAttributes triggerAttributes) throws SinkException {
        MQTTForwardConfiguration mQTTForwardConfiguration = new MQTTForwardConfiguration(triggerAttributes.getString("host"), triggerAttributes.getInt("port").intValue(), triggerAttributes.getString("username"), triggerAttributes.getString("password"), triggerAttributes.getString("topic"), triggerAttributes.getLongOrDefault("reconnectDelay", 10L), triggerAttributes.getLongOrDefault("connectAttemptsMax", 3L), triggerAttributes.getStringOrDefault("qos", "exactly_once"), triggerAttributes.getBooleanOrDefault("retain", false), triggerAttributes.getIntOrDefault("poolSize", 4), triggerAttributes.getBooleanOrDefault("stopIfException", false));
        mQTTForwardConfiguration.checkConfig();
        return mQTTForwardConfiguration;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public void onDrop() throws Exception {
        this.forwardHandler.close();
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public void onStart() throws Exception {
        this.forwardHandler.open(this.forwardConfig);
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public void onStop() throws Exception {
        this.forwardHandler.close();
    }

    private void offerEventToQueue(long j, Object obj, PartialPath partialPath) throws Exception {
        ForwardEvent mQTTForwardEvent;
        String str = this.protocol;
        boolean z = -1;
        switch (str.hashCode()) {
            case 3213448:
                if (str.equals(PROTOCOL_HTTP)) {
                    z = false;
                    break;
                }
                break;
            case 3359524:
                if (str.equals(PROTOCOL_MQTT)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                mQTTForwardEvent = new HTTPForwardEvent(j, obj, partialPath);
                break;
            case true:
                mQTTForwardEvent = new MQTTForwardEvent(j, obj, partialPath);
                break;
            default:
                throw new TriggerExecutionException("Forward protocol doesn't support.");
        }
        this.queue.offer(mQTTForwardEvent);
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public Integer fire(long j, Integer num, PartialPath partialPath) throws Exception {
        offerEventToQueue(j, num, partialPath);
        return num;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public int[] fire(long[] jArr, int[] iArr, PartialPath partialPath) throws Exception {
        for (int i = 0; i < jArr.length; i++) {
            offerEventToQueue(jArr[i], Integer.valueOf(iArr[i]), partialPath);
        }
        return iArr;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public Long fire(long j, Long l, PartialPath partialPath) throws Exception {
        offerEventToQueue(j, l, partialPath);
        return l;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public long[] fire(long[] jArr, long[] jArr2, PartialPath partialPath) throws Exception {
        for (int i = 0; i < jArr.length; i++) {
            offerEventToQueue(jArr[i], Long.valueOf(jArr2[i]), partialPath);
        }
        return jArr2;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public Float fire(long j, Float f, PartialPath partialPath) throws Exception {
        offerEventToQueue(j, f, partialPath);
        return f;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public float[] fire(long[] jArr, float[] fArr, PartialPath partialPath) throws Exception {
        for (int i = 0; i < jArr.length; i++) {
            offerEventToQueue(jArr[i], Float.valueOf(fArr[i]), partialPath);
        }
        return fArr;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public Double fire(long j, Double d, PartialPath partialPath) throws Exception {
        offerEventToQueue(j, d, partialPath);
        return d;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public double[] fire(long[] jArr, double[] dArr, PartialPath partialPath) throws Exception {
        for (int i = 0; i < jArr.length; i++) {
            offerEventToQueue(jArr[i], Double.valueOf(dArr[i]), partialPath);
        }
        return dArr;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public Boolean fire(long j, Boolean bool, PartialPath partialPath) throws Exception {
        offerEventToQueue(j, bool, partialPath);
        return bool;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public boolean[] fire(long[] jArr, boolean[] zArr, PartialPath partialPath) throws Exception {
        for (int i = 0; i < jArr.length; i++) {
            offerEventToQueue(jArr[i], Boolean.valueOf(zArr[i]), partialPath);
        }
        return zArr;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public Binary fire(long j, Binary binary, PartialPath partialPath) throws Exception {
        offerEventToQueue(j, binary, partialPath);
        return binary;
    }

    @Override // org.apache.iotdb.db.engine.trigger.api.Trigger
    public Binary[] fire(long[] jArr, Binary[] binaryArr, PartialPath partialPath) throws Exception {
        for (int i = 0; i < jArr.length; i++) {
            offerEventToQueue(jArr[i], binaryArr[i], partialPath);
        }
        return binaryArr;
    }
}
