/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.websocket.DefaultWebsocket;
import org.apache.camel.component.websocket.WebsocketEndpoint;
import org.apache.camel.component.websocket.WebsocketProducerConsumer;
import org.apache.camel.component.websocket.WebsocketSendException;
import org.apache.camel.component.websocket.WebsocketStore;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.StopWatch;

public class WebsocketProducer
extends DefaultProducer
implements WebsocketProducerConsumer {
    private WebsocketStore store;
    private final Boolean sendToAll;
    private final WebsocketEndpoint endpoint;

    public WebsocketProducer(WebsocketEndpoint endpoint) {
        super((Endpoint)endpoint);
        this.sendToAll = endpoint.getSendToAll();
        this.endpoint = endpoint;
    }

    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        Object message = in.getMandatoryBody();
        if (message != null && !(message instanceof String) && !(message instanceof byte[])) {
            message = in.getMandatoryBody(String.class);
        }
        if (this.isSendToAllSet(in)) {
            this.sendToAll(this.store, message, exchange);
        } else {
            String connectionKey = (String)in.getHeader("websocket.connectionKey", String.class);
            if (connectionKey != null) {
                DefaultWebsocket websocket = this.store.get(connectionKey);
                this.log.debug("Sending to connection key {} -> {}", (Object)connectionKey, message);
                Future<Void> future = this.sendMessage(websocket, message);
                if (future != null) {
                    int timeout = this.endpoint.getSendTimeout();
                    future.get(timeout, TimeUnit.MILLISECONDS);
                    if (!future.isCancelled() && !future.isDone()) {
                        throw new WebsocketSendException("Failed to send message to the connection within " + timeout + " millis.", exchange);
                    }
                }
            } else {
                throw new WebsocketSendException("Failed to send message to single connection; connection key not set.", exchange);
            }
        }
    }

    @Override
    public WebsocketEndpoint getEndpoint() {
        return this.endpoint;
    }

    public void doStart() throws Exception {
        super.doStart();
        this.endpoint.connect(this);
    }

    public void doStop() throws Exception {
        this.endpoint.disconnect(this);
        super.doStop();
    }

    boolean isSendToAllSet(Message in) {
        Boolean value = (Boolean)in.getHeader("websocket.sendToAll", (Object)this.sendToAll, Boolean.class);
        return value == null ? false : value;
    }

    void sendToAll(WebsocketStore store, Object message, Exchange exchange) throws Exception {
        this.log.debug("Sending to all {}", message);
        Collection<DefaultWebsocket> websockets = store.getAll();
        WebsocketSendException exception = null;
        CopyOnWriteArrayList<Future<Void>> futures = new CopyOnWriteArrayList<Future<Void>>();
        for (DefaultWebsocket websocket : websockets) {
            try {
                Future<Void> future = this.sendMessage(websocket, message);
                if (future == null) continue;
                futures.add(future);
            }
            catch (Exception e) {
                if (exception != null) continue;
                exception = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
            }
        }
        StopWatch watch = new StopWatch();
        int timeout = this.endpoint.getSendTimeout();
        while (!futures.isEmpty() && watch.taken() < (long)timeout) {
            for (Future future : futures) {
                if (future.isDone() || future.isCancelled()) {
                    futures.remove(future);
                }
                if (futures.isEmpty()) continue;
                long interval = Math.min(1000, timeout);
                this.log.debug("Sleeping {} millis waiting for sendToAll to complete sending with timeout {} millis", (Object)interval, (Object)timeout);
                try {
                    Thread.sleep(interval);
                }
                catch (InterruptedException e) {
                    this.handleSleepInterruptedException(e, exchange);
                }
            }
        }
        if (!futures.isEmpty()) {
            exception = new WebsocketSendException("Failed to deliver message within " + this.endpoint.getSendTimeout() + " millis to one or more recipients.", exchange);
        }
        if (exception != null) {
            throw exception;
        }
    }

    Future<Void> sendMessage(DefaultWebsocket websocket, Object message) throws IOException {
        Future future = null;
        if (websocket != null && websocket.getSession().isOpen()) {
            this.log.trace("Sending to websocket {} -> {}", (Object)websocket.getConnectionKey(), message);
            if (message instanceof String) {
                future = websocket.getSession().getRemote().sendStringByFuture((String)message);
            } else if (message instanceof byte[]) {
                ByteBuffer buf = ByteBuffer.wrap((byte[])message);
                future = websocket.getSession().getRemote().sendBytesByFuture(buf);
            }
        }
        return future;
    }

    public void setStore(WebsocketStore store) {
        this.store = store;
    }

    protected void handleSleepInterruptedException(InterruptedException e, Exchange exchange) throws InterruptedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sleep interrupted, are we stopping? {}", (Object)(this.isStopping() || this.isStopped() ? 1 : 0));
        }
        Thread.currentThread().interrupt();
        throw e;
    }
}

