/*
 * Decompiled with CFR 0.152.
 */
package reactor.net;

import com.gs.collections.impl.list.mutable.FastList;
import java.util.Collection;
import java.util.Iterator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.registry.CachingRegistry;
import reactor.event.registry.Registration;
import reactor.event.registry.Registry;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.util.Assert;

public abstract class AbstractNetPeer<IN, OUT> {
    private final Registry<NetChannel<IN, OUT>> netChannels = new CachingRegistry();
    private final Event<AbstractNetPeer<IN, OUT>> selfEvent = Event.wrap((Object)this);
    private final Selector open = Selectors.$();
    private final Selector close = Selectors.$();
    private final Selector start = Selectors.$();
    private final Selector shutdown = Selectors.$();
    private final Environment env;
    private final Reactor reactor;
    private final Codec<Buffer, IN, OUT> codec;
    private final Collection<Consumer<NetChannel<IN, OUT>>> consumers;

    protected AbstractNetPeer(@Nonnull Environment env, @Nonnull Reactor reactor, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> consumers) {
        this.env = env;
        this.reactor = reactor;
        this.codec = codec;
        this.consumers = consumers;
        for (final Consumer<NetChannel<IN, OUT>> consumer : consumers) {
            reactor.on(this.open, new Consumer<Event<NetChannel<IN, OUT>>>(){

                public void accept(Event<NetChannel<IN, OUT>> ev) {
                    consumer.accept(ev.getData());
                }
            });
        }
    }

    public Promise<Boolean> close() {
        Deferred d = Promises.defer((Environment)this.env, (Dispatcher)this.reactor.getDispatcher());
        this.close(d);
        return (Promise)d.compose();
    }

    public void close(@Nullable Consumer<Boolean> onClose) {
        for (Registration reg : this.getChannels()) {
            if (reg.isCancelled()) continue;
            this.doCloseChannel((NetChannel)reg.getObject());
        }
        if (null != onClose) {
            this.reactor.schedule(onClose, (Object)true);
        }
    }

    public Iterator<NetChannel<IN, OUT>> iterator() {
        FastList channels = FastList.newList();
        for (Registration reg : this.getChannels()) {
            channels.add(reg.getObject());
        }
        return channels.iterator();
    }

    protected <C> Registration<? extends NetChannel<IN, OUT>> register(@Nonnull C ioChannel, @Nonnull NetChannel<IN, OUT> netChannel) {
        Assert.notNull(ioChannel, (String)"Channel cannot be null.");
        Assert.notNull(netChannel, (String)"NetChannel cannot be null.");
        return this.netChannels.register(Selectors.$(ioChannel), netChannel);
    }

    protected <C> NetChannel<IN, OUT> select(@Nonnull C ioChannel) {
        Assert.notNull(ioChannel, (String)"Channel cannot be null.");
        Iterator channs = this.netChannels.select(ioChannel).iterator();
        if (channs.hasNext()) {
            return (NetChannel)((Registration)channs.next()).getObject();
        }
        NetChannel<IN, OUT> conn = this.createChannel(ioChannel);
        this.register(ioChannel, conn);
        this.notifyOpen(conn);
        return conn;
    }

    protected <C> void close(@Nonnull C channel) {
        Assert.notNull(channel, (String)"Channel cannot be null");
        for (Registration reg : this.netChannels.select(channel)) {
            NetChannel chann = (NetChannel)reg.getObject();
            reg.cancel();
            this.notifyClose(chann);
        }
    }

    protected abstract <C> NetChannel<IN, OUT> createChannel(C var1);

    protected void notifyStart(Runnable started) {
        this.getReactor().notify(this.start.getObject(), this.selfEvent);
        if (null != started) {
            this.getReactor().schedule((Consumer)new Consumer<Runnable>(){

                public void accept(Runnable r) {
                    r.run();
                }
            }, (Object)started);
        }
    }

    protected void notifyError(@Nonnull Throwable error) {
        Assert.notNull((Object)error, (String)"Error cannot be null.");
        this.reactor.notify(error.getClass(), Event.wrap((Object)error));
    }

    protected void notifyOpen(@Nonnull NetChannel<IN, OUT> channel) {
        this.reactor.notify(this.open.getObject(), Event.wrap(channel));
    }

    protected void notifyClose(@Nonnull NetChannel<IN, OUT> channel) {
        this.reactor.notify(this.close.getObject(), Event.wrap(channel));
    }

    protected void notifyShutdown() {
        this.getReactor().notify(this.shutdown.getObject(), this.selfEvent);
    }

    @Nullable
    protected Codec<Buffer, IN, OUT> getCodec() {
        return this.codec;
    }

    @Nonnull
    protected Environment getEnvironment() {
        return this.env;
    }

    @Nonnull
    protected Reactor getReactor() {
        return this.reactor;
    }

    @Nonnull
    protected Collection<Consumer<NetChannel<IN, OUT>>> getConsumers() {
        return this.consumers;
    }

    @Nonnull
    protected Registry<NetChannel<IN, OUT>> getChannels() {
        return this.netChannels;
    }

    protected void doClose(@Nullable Consumer<Boolean> onClose) {
        this.getReactor().schedule(onClose, (Object)true);
    }

    protected void doCloseChannel(NetChannel<IN, OUT> channel) {
        channel.close();
    }
}

