/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol.impl;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.util.Collections;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.apache.nifi.io.socket.multicast.MulticastServicesBroadcaster;
import org.apache.nifi.io.socket.multicast.MulticastUtils;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterServicesBroadcaster
implements MulticastServicesBroadcaster {
    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class));
    private final Set<DiscoverableService> services = new CopyOnWriteArraySet<DiscoverableService>();
    private final InetSocketAddress multicastAddress;
    private final MulticastConfiguration multicastConfiguration;
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final int broadcastDelayMs;
    private Timer broadcaster;
    private MulticastSocket multicastSocket;

    public ClusterServicesBroadcaster(InetSocketAddress multicastAddress, MulticastConfiguration multicastConfiguration, ProtocolContext<ProtocolMessage> protocolContext, String broadcastDelay) {
        if (multicastAddress == null) {
            throw new IllegalArgumentException("Multicast address may not be null.");
        }
        if (!multicastAddress.getAddress().isMulticastAddress()) {
            throw new IllegalArgumentException("Multicast group address is not a Class D IP address.");
        }
        if (protocolContext == null) {
            throw new IllegalArgumentException("Protocol Context may not be null.");
        }
        if (multicastConfiguration == null) {
            throw new IllegalArgumentException("Multicast configuration may not be null.");
        }
        this.services.addAll(this.services);
        this.multicastAddress = multicastAddress;
        this.multicastConfiguration = multicastConfiguration;
        this.protocolContext = protocolContext;
        this.broadcastDelayMs = (int)FormatUtils.getTimeDuration((String)broadcastDelay, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    public void start() throws IOException {
        if (this.isRunning()) {
            throw new IllegalStateException("Instance is already started.");
        }
        this.multicastSocket = MulticastUtils.createMulticastSocket((MulticastConfiguration)this.multicastConfiguration);
        this.broadcaster = new Timer("Cluster Services Broadcaster", true);
        this.broadcaster.schedule(new TimerTask(){

            @Override
            public void run() {
                for (DiscoverableService service : ClusterServicesBroadcaster.this.services) {
                    try {
                        InetSocketAddress serviceAddress = service.getServiceAddress();
                        logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort()));
                        ServiceBroadcastMessage msg = new ServiceBroadcastMessage();
                        msg.setServiceName(service.getServiceName());
                        msg.setAddress(serviceAddress.getHostName());
                        msg.setPort(serviceAddress.getPort());
                        ProtocolMessageMarshaller<ServiceBroadcastMessage> marshaller = ClusterServicesBroadcaster.this.protocolContext.createMarshaller();
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        marshaller.marshal(msg, baos);
                        byte[] packetBytes = baos.toByteArray();
                        DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, ClusterServicesBroadcaster.this.multicastAddress);
                        ClusterServicesBroadcaster.this.multicastSocket.send(packet);
                    }
                    catch (Exception ex) {
                        logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), (Throwable)ex);
                    }
                }
            }
        }, 0L, (long)this.broadcastDelayMs);
    }

    public boolean isRunning() {
        return this.broadcaster != null;
    }

    public void stop() {
        if (!this.isRunning()) {
            throw new IllegalStateException("Instance is already stopped.");
        }
        this.broadcaster.cancel();
        this.broadcaster = null;
        MulticastUtils.closeQuietly((MulticastSocket)this.multicastSocket);
    }

    public int getBroadcastDelayMs() {
        return this.broadcastDelayMs;
    }

    public Set<DiscoverableService> getServices() {
        return Collections.unmodifiableSet(this.services);
    }

    public InetSocketAddress getMulticastAddress() {
        return this.multicastAddress;
    }

    public boolean addService(DiscoverableService service) {
        return this.services.add(service);
    }

    public boolean removeService(String serviceName) {
        for (DiscoverableService service : this.services) {
            if (!service.getServiceName().equals(serviceName)) continue;
            return this.services.remove(service);
        }
        return false;
    }
}

