/*
 * Decompiled with CFR 0.152.
 */
package org.distributeme.support.eventservice;

import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.anotheria.anoprise.eventservice.EventChannel;
import net.anotheria.anoprise.eventservice.EventService;
import net.anotheria.anoprise.eventservice.EventServiceFactory;
import net.anotheria.anoprise.eventservice.EventServiceListener;
import net.anotheria.anoprise.eventservice.EventTransportShell;
import net.anotheria.anoprise.eventservice.ProxyType;
import net.anotheria.anoprise.eventservice.RemoteEventChannelConsumerProxy;
import net.anotheria.anoprise.eventservice.RemoteEventChannelSupplierProxy;
import net.anotheria.anoprise.eventservice.RemoteEventChannelSupportFactory;
import net.anotheria.util.IdCodeGenerator;
import org.distributeme.core.RMIRegistryUtil;
import org.distributeme.core.RegistryUtil;
import org.distributeme.core.ServiceDescriptor;
import org.distributeme.core.util.EventServiceRegistryUtil;
import org.distributeme.support.eventservice.DiMeRemoteEventChannelConsumerProxy;
import org.distributeme.support.eventservice.DiMeRemoteEventChannelSupplierProxy;
import org.distributeme.support.eventservice.EventServiceRMIBridgeService;
import org.distributeme.support.eventservice.EventServiceRMIBridgeServiceException;
import org.distributeme.support.eventservice.RemoteConsumerWrapper;
import org.distributeme.support.eventservice.generated.EventServiceRMIBridgeServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DiMeRemoteEventChannelRMISupport
implements RemoteEventChannelSupportFactory,
EventServiceListener {
    private static ServiceDescriptor descriptor = null;
    private static final String INSTANCE_ID = IdCodeGenerator.generateCode((int)10);
    private static final ConcurrentHashMap<ServiceDescriptor, EventServiceRMIBridgeService> bridges = new ConcurrentHashMap();
    private static EventService es;
    private ExecutorService executorService = Executors.newFixedThreadPool(5);
    private static Logger LOG;

    DiMeRemoteEventChannelRMISupport() {
        descriptor = RegistryUtil.createLocalServiceDescription((ServiceDescriptor.Protocol)ServiceDescriptor.Protocol.RMI, (String)"org_distributeme_support_eventservice_EventServiceRMIBridgeService", (String)INSTANCE_ID, (int)RMIRegistryUtil.getRmiRegistryPort());
        es = EventServiceFactory.createEventService();
    }

    protected ServiceDescriptor getHomeReference() {
        return descriptor;
    }

    public RemoteEventChannelConsumerProxy createRemoteEventChannelConsumerProxy(String channelName) {
        return new DiMeRemoteEventChannelConsumerProxy(channelName);
    }

    public RemoteEventChannelSupplierProxy createRemoteEventChannelSupplierProxy(String channelName) {
        return new DiMeRemoteEventChannelSupplierProxy(channelName);
    }

    public void channelCreated(String channelName, ProxyType type) {
        switch (type) {
            case PUSH_CONSUMER_PROXY: {
                this.localConsumerProxyCreated(channelName);
                break;
            }
            case PUSH_SUPPLIER_PROXY: {
                this.localSupplierProxyCreated(channelName);
                break;
            }
            default: {
                throw new AssertionError((Object)("Unknown proxy type " + type));
            }
        }
    }

    private EventServiceRMIBridgeService getBridge(ServiceDescriptor descriptor) {
        EventServiceRMIBridgeService ret = bridges.get(descriptor);
        if (ret != null) {
            return ret;
        }
        try {
            Class<?> clazz = Class.forName("org.distributeme.support.eventservice.generated.RemoteEventServiceRMIBridgeServiceStub");
            Constructor<?> c = clazz.getConstructor(ServiceDescriptor.class);
            EventServiceRMIBridgeService newBridge = (EventServiceRMIBridgeService)c.newInstance(descriptor);
            EventServiceRMIBridgeService old = bridges.putIfAbsent(descriptor, newBridge);
            return old == null ? newBridge : old;
        }
        catch (ClassNotFoundException e) {
            throw new AssertionError((Object)"Misconfigured? can't find org.distributeme.support.eventservice.generated.RemoteEventServiceRMIBridgeServiceStub");
        }
        catch (NoSuchMethodException e) {
            throw new AssertionError((Object)"Misconfigured? can't find org.distributeme.support.eventservice.generated.RemoteEventServiceRMIBridgeServiceStub constructor");
        }
        catch (Exception e) {
            LOG.error("getBridge(" + descriptor + ")", (Throwable)e);
            return null;
        }
    }

    private void localConsumerProxyCreated(String channelName) {
        ServiceDescriptor me = this.getHomeReference();
        List suppliers = EventServiceRegistryUtil.registerConsumerAtRegistryAndGetSuppliers((String)channelName, (ServiceDescriptor)descriptor);
        for (ServiceDescriptor s : suppliers) {
            if (me.equals((Object)s)) {
                LOG.debug("Skipped registering at myself");
                continue;
            }
            this.registerAsConsumerAtRemoteSupplier(channelName, s);
        }
    }

    private void registerAsConsumerAtRemoteSupplier(String channelName, ServiceDescriptor supplier) {
        EventServiceRMIBridgeService bridge = this.getBridge(supplier);
        if (bridge == null) {
            this.notifyBrokenSupplier(supplier);
            return;
        }
        try {
            String bridgeInstanceId = bridge.getInstanceId();
            if (!bridgeInstanceId.equals(supplier.getInstanceId())) {
                LOG.info("Instanceid mismatch, expected " + supplier.getInstanceId() + ", received " + bridgeInstanceId + " throwing away");
                this.notifyBrokenSupplier(supplier);
                return;
            }
            LOG.debug("Registering @ " + supplier);
            bridge.registerRemoteConsumer(channelName, descriptor);
        }
        catch (EventServiceRMIBridgeServiceException e) {
            LOG.error("can't connect to : registerAsConsumerAtRemoteSupplier(" + channelName + ", " + supplier + ")", (Throwable)e);
            this.notifyBrokenSupplier(supplier);
        }
        catch (RuntimeException e) {
            LOG.error("can't connect to : registerAsConsumerAtRemoteSupplier(" + channelName + ", " + supplier + ")", (Throwable)e);
            this.notifyBrokenSupplier(supplier);
        }
    }

    private void notifyBrokenSupplier(ServiceDescriptor supplier) {
        try {
            EventServiceRegistryUtil.notifySupplierNotAvailable((ServiceDescriptor)supplier);
        }
        catch (Exception e) {
            LOG.warn("notifyBrokenSupplier(" + supplier + ") failed", (Throwable)e);
        }
        bridges.remove(supplier);
    }

    private void notifyBrokenConsumer(ServiceDescriptor consumer) {
        try {
            EventServiceRegistryUtil.notifyConsumerNotAvailable((ServiceDescriptor)consumer);
        }
        catch (Exception e) {
            LOG.warn("notifyBrokenConsumer(" + consumer + ") failed ", (Throwable)e);
        }
        bridges.remove(consumer);
    }

    void notifyBrokenConsumer(RemoteConsumerWrapper wrapper) {
        LOG.debug("NOTIFY brokenConsumer: " + wrapper);
        EventChannel channel = es.obtainEventChannel(wrapper.getChannelName(), ProxyType.REMOTE_CONSUMER_PROXY);
        ((DiMeRemoteEventChannelConsumerProxy)channel).removeRemoteConsumer(wrapper);
        this.notifyBrokenConsumer(wrapper.getHomeReference());
    }

    private void localSupplierProxyCreated(String channelName) {
        ServiceDescriptor me = this.getHomeReference();
        List consumers = EventServiceRegistryUtil.registerSupplierAtRegistryAndGetConsumers((String)channelName, (ServiceDescriptor)me);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Consumers: " + consumers);
        }
        for (ServiceDescriptor c : consumers) {
            EventServiceRMIBridgeService bridge = this.getBridge(c);
            if (bridge == null) {
                this.notifyBrokenConsumer(c);
                continue;
            }
            try {
                String bridgeInstanceId = bridge.getInstanceId();
                if (!bridgeInstanceId.equals(c.getInstanceId())) {
                    LOG.debug("Instanceid mismatch, expected " + c.getInstanceId() + ", received " + bridgeInstanceId + " throwing away");
                    this.notifyBrokenConsumer(c);
                    continue;
                }
                LOG.debug("Registering @ " + c);
                bridge.registerRemoteSupplier(channelName, me);
            }
            catch (EventServiceRMIBridgeServiceException e) {
                LOG.error("localSupplierProxyCreated", (Throwable)e);
            }
        }
    }

    public void channelDestroyed(String channelName, ProxyType type) {
        LOG.info("Channel " + channelName + " destroyed");
    }

    public static final void initEventService() {
        try {
            EventServiceRMIBridgeServer.init();
            EventServiceRMIBridgeServer.createServiceAndRegisterLocally();
            descriptor = RegistryUtil.createLocalServiceDescription((ServiceDescriptor.Protocol)ServiceDescriptor.Protocol.RMI, (String)"org_distributeme_support_eventservice_EventServiceRMIBridgeService", (String)INSTANCE_ID, (int)RMIRegistryUtil.getRmiRegistryPort());
        }
        catch (Exception e) {
            LOG.error("Can't init eventservice - probably running without events", (Throwable)e);
        }
    }

    void registerRemoteConsumer(String channelName, ServiceDescriptor myReference) {
        EventChannel channel = es.obtainEventChannel(channelName, ProxyType.REMOTE_CONSUMER_PROXY);
        LOG.debug("REGISTER REMOTE CONSUMER @ channel " + channel + ", consumer: " + myReference);
        RemoteConsumerWrapper wrapper = new RemoteConsumerWrapper(this, channelName, myReference, this.getBridge(myReference));
        ((DiMeRemoteEventChannelConsumerProxy)channel).addRemoteConsumer(wrapper);
        LOG.debug("REGISTER REMOTE CONSUMER @ channel " + channel + ", consumer: " + myReference + " DONE!");
    }

    void registerRemoteSupplier(final String channelName, final ServiceDescriptor myReference) {
        LOG.debug("Register remote supplier " + myReference + " for channel " + channelName + " called.");
        this.executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    DiMeRemoteEventChannelRMISupport.this.registerAsConsumerAtRemoteSupplier(channelName, myReference);
                }
                catch (Exception e) {
                    LOG.error("Can't register as consumer at remote supplier, channel: " + channelName + ", myRef: " + myReference);
                }
            }
        });
    }

    public void deliverEvent(EventTransportShell shell) {
        EventChannel channel = es.obtainEventChannel(shell.getChannelName(), ProxyType.REMOTE_SUPPLIER_PROXY);
        ((DiMeRemoteEventChannelSupplierProxy)channel).deliverEvent(shell.getData());
    }

    static {
        LOG = LoggerFactory.getLogger(DiMeRemoteEventChannelRMISupport.class);
    }
}

