package org.apache.iotdb.db.pipe.connector.protocol.websocket;

import java.util.Arrays;
import java.util.Optional;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.class */
public class WebSocketConnector implements PipeConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnector.class);
    private Integer port;
    private WebSocketConnectorServer server;
    private String pipeName;

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        this.port = Integer.valueOf(pipeParameterValidator.getParameters().getIntOrDefault(Arrays.asList("connector.websocket.port", "sink.websocket.port"), 8080));
        this.server = WebSocketConnectorServer.getOrCreateInstance(this.port.intValue());
        if (this.server.getPort() != this.port.intValue()) {
            throw new PipeException(String.format("The websocket server has already been created with port = %d. Please set the option cdc.port = %d.", Integer.valueOf(this.server.getPort()), Integer.valueOf(this.server.getPort())));
        }
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) {
        this.pipeName = pipeConnectorRuntimeConfiguration.getRuntimeEnvironment().getPipeName();
    }

    public void handshake() {
        this.server = WebSocketConnectorServer.getOrCreateInstance(this.port.intValue());
        this.server.register(this);
        if (this.server.isStarted()) {
            return;
        }
        synchronized (WebSocketConnectorServer.class) {
            if (!this.server.isStarted()) {
                this.server.start();
            }
        }
    }

    public void heartbeat() throws Exception {
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("WebsocketConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Current event: {}.", tabletInsertionEvent);
            return;
        }
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)) {
            ((EnrichedEvent) tabletInsertionEvent).increaseReferenceCount(WebSocketConnector.class.getName());
            this.server.addEvent(tabletInsertionEvent, this);
            return;
        }
        PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
        for (PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent : pipeInsertNodeTabletInsertionEvent.toRawTabletInsertionEvents()) {
            pipeInsertNodeTabletInsertionEvent.skipReportOnCommit();
            transfer(pipeRawTabletInsertionEvent);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("WebsocketConnector only support PipeTsFileInsertionEvent. Current event: {}.", tsFileInsertionEvent);
            return;
        }
        try {
            for (TabletInsertionEvent tabletInsertionEvent : tsFileInsertionEvent.toTabletInsertionEvents()) {
                ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
                transfer(tabletInsertionEvent);
            }
        } finally {
            tsFileInsertionEvent.close();
        }
    }

    public void transfer(Event event) throws Exception {
    }

    public void close() throws Exception {
        if (this.server != null) {
            this.server.unregister(this);
        }
    }

    public void commit(EnrichedEvent enrichedEvent) {
        Optional.ofNullable(enrichedEvent).ifPresent(enrichedEvent2 -> {
            enrichedEvent2.decreaseReferenceCount(WebSocketConnector.class.getName(), true);
        });
    }

    public String getPipeName() {
        return this.pipeName;
    }
}
