package org.apache.iotdb.db.pipe.connector.protocol.websocket;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.utils.Pair;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.class */
public class WebSocketConnectorServer extends WebSocketServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketConnectorServer.class);
    private final PriorityBlockingQueue<Pair<Long, Event>> events;
    private final WebSocketConnector websocketConnector;
    private final ConcurrentMap<Long, Event> eventMap;

    public WebSocketConnectorServer(InetSocketAddress inetSocketAddress, WebSocketConnector webSocketConnector) {
        super(inetSocketAddress);
        this.events = new PriorityBlockingQueue<>(11, Comparator.comparing(pair -> {
            return (Long) pair.left;
        }));
        this.eventMap = new ConcurrentHashMap();
        this.websocketConnector = webSocketConnector;
    }

    public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
        LOGGER.info(String.format("The connection from client %s:%d has been opened!", webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort())));
    }

    public void onClose(WebSocket webSocket, int i, String str, boolean z) {
        LOGGER.info(String.format("The client from %s:%d has been closed!", webSocket.getRemoteSocketAddress().getAddress(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort())));
    }

    public void onMessage(WebSocket webSocket, String str) {
        if (str.startsWith("START")) {
            LOGGER.info("Received a start message from {}:{}", webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort()));
            handleStart(webSocket);
        } else if (str.startsWith("ACK")) {
            handleAck(webSocket, str);
        } else if (str.startsWith("ERROR")) {
            LOGGER.error("Received an error message {} from {}:{}", new Object[]{str, webSocket.getRemoteSocketAddress().getHostName(), Integer.valueOf(webSocket.getRemoteSocketAddress().getPort())});
            handleError(webSocket, str);
        }
    }

    public void onError(WebSocket webSocket, Exception exc) {
        LOGGER.error(webSocket.getRemoteSocketAddress() != null ? String.format("Got an error `%s` from %s:%d", exc.getMessage(), webSocket.getLocalSocketAddress().getHostName(), Integer.valueOf(webSocket.getLocalSocketAddress().getPort())) : String.format("Got an error `%s` from client", exc.getMessage()));
    }

    public void onStart() {
        LOGGER.info(String.format("The webSocket server %s:%d has been started!", getAddress().getHostName(), Integer.valueOf(getPort())));
    }

    public void addEvent(Pair<Long, Event> pair) {
        if (this.events.size() >= 5) {
            synchronized (this.events) {
                while (this.events.size() >= 5) {
                    try {
                        this.events.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new PipeException(e.getMessage());
                    }
                }
            }
        }
        this.events.put(pair);
    }

    private void handleStart(WebSocket webSocket) {
        while (true) {
            try {
                Pair<Long, Event> take = this.events.take();
                synchronized (this.events) {
                    this.events.notifyAll();
                }
                if (transfer(take, webSocket)) {
                    return;
                } else {
                    this.websocketConnector.commit(((Long) take.getLeft()).longValue(), take.getRight() instanceof EnrichedEvent ? (EnrichedEvent) take.getRight() : null);
                }
            } catch (InterruptedException e) {
                LOGGER.warn(String.format("The event can't be taken, because: %s", e.getMessage()));
                Thread.currentThread().interrupt();
                throw new PipeException(e.getMessage());
            }
        }
    }

    private void handleAck(WebSocket webSocket, String str) {
        long parseLong = Long.parseLong(str.replace("ACK:", SubStringFunctionColumnTransformer.EMPTY_STRING));
        Event remove = this.eventMap.remove(Long.valueOf(parseLong));
        if (remove != null) {
            this.websocketConnector.commit(parseLong, remove instanceof EnrichedEvent ? (EnrichedEvent) remove : null);
        }
        handleStart(webSocket);
    }

    private void handleError(WebSocket webSocket, String str) {
        long parseLong = Long.parseLong(str.replace("ERROR:", SubStringFunctionColumnTransformer.EMPTY_STRING));
        LOGGER.warn(String.format("The tablet of commitId: %d can't be parsed by client, it will be retried later.", Long.valueOf(parseLong)));
        Event remove = this.eventMap.remove(Long.valueOf(parseLong));
        if (remove != null) {
            this.events.put(new Pair<>(Long.valueOf(parseLong), remove));
        }
        handleStart(webSocket);
    }

    private boolean transfer(Pair<Long, Event> pair, WebSocket webSocket) {
        ByteBuffer serialize;
        Long l = (Long) pair.getLeft();
        Event event = (Event) pair.getRight();
        try {
            if (event instanceof PipeInsertNodeTabletInsertionEvent) {
                serialize = ((PipeInsertNodeTabletInsertionEvent) event).convertToTablet().serialize();
            } else {
                if (!(event instanceof PipeRawTabletInsertionEvent)) {
                    throw new NotImplementedException("IoTDBCDCConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
                }
                serialize = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize();
            }
            if (serialize == null) {
                return false;
            }
            ByteBuffer allocate = ByteBuffer.allocate(8 + serialize.limit());
            allocate.putLong(l.longValue());
            allocate.put(serialize);
            allocate.flip();
            broadcast(allocate, Collections.singletonList(webSocket));
            this.eventMap.put((Long) pair.getLeft(), (Event) pair.getRight());
            return true;
        } catch (Exception e) {
            this.events.put(pair);
            throw new PipeException(e.getMessage());
        }
    }
}
