package io.druid.segment.realtime.firehose;

import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.guice.annotations.RemoteChatHandler;
import io.druid.server.DruidNode;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;

/* loaded from: input_file:io/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.class */
public class ServiceAnnouncingChatHandlerProvider implements ChatHandlerProvider {
    private static final Logger log = new Logger(ServiceAnnouncingChatHandlerProvider.class);
    private final DruidNode node;
    private final ServiceAnnouncer serviceAnnouncer;
    private final ConcurrentMap<String, ChatHandler> handlers = Maps.newConcurrentMap();
    private final ConcurrentSkipListSet<String> announcements = new ConcurrentSkipListSet<>();

    @Inject
    public ServiceAnnouncingChatHandlerProvider(@RemoteChatHandler DruidNode druidNode, ServiceAnnouncer serviceAnnouncer) {
        this.node = druidNode;
        this.serviceAnnouncer = serviceAnnouncer;
    }

    @Override // io.druid.segment.realtime.firehose.ChatHandlerProvider
    public void register(String str, ChatHandler chatHandler) {
        register(str, chatHandler, true);
    }

    @Override // io.druid.segment.realtime.firehose.ChatHandlerProvider
    public void register(String str, ChatHandler chatHandler, boolean z) {
        log.info("Registering Eventhandler[%s]", new Object[]{str});
        if (this.handlers.putIfAbsent(str, chatHandler) != null) {
            throw new ISE("handler already registered for service[%s]", new Object[]{str});
        }
        if (z) {
            try {
                this.serviceAnnouncer.announce(makeDruidNode(str));
                if (this.announcements.add(str)) {
                } else {
                    throw new ISE("announcements already has an entry for service[%s]", new Object[]{str});
                }
            } catch (Exception e) {
                log.warn(e, "Failed to register service[%s]", new Object[]{str});
                this.handlers.remove(str, chatHandler);
            }
        }
    }

    @Override // io.druid.segment.realtime.firehose.ChatHandlerProvider
    public void unregister(String str) {
        log.info("Unregistering chat handler[%s]", new Object[]{str});
        ChatHandler chatHandler = this.handlers.get(str);
        if (chatHandler == null) {
            log.warn("handler[%s] not currently registered, ignoring.", new Object[]{str});
            return;
        }
        if (this.announcements.contains(str)) {
            try {
                this.serviceAnnouncer.unannounce(makeDruidNode(str));
            } catch (Exception e) {
                log.warn(e, "Failed to unregister service[%s]", new Object[]{str});
            }
            this.announcements.remove(str);
        }
        this.handlers.remove(str, chatHandler);
    }

    @Override // io.druid.segment.realtime.firehose.ChatHandlerProvider
    public Optional<ChatHandler> get(String str) {
        return Optional.fromNullable(this.handlers.get(str));
    }

    private DruidNode makeDruidNode(String str) {
        return new DruidNode(str, this.node.getHost(), Integer.valueOf(this.node.getPort()));
    }
}
