/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.wan.impl;

import com.hazelcast.config.AbstractWanPublisherConfig;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.config.WanBatchPublisherConfig;
import com.hazelcast.config.WanCustomPublisherConfig;
import com.hazelcast.config.WanReplicationConfig;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.management.events.AddWanConfigIgnoredEvent;
import com.hazelcast.internal.management.events.WanConsistencyCheckIgnoredEvent;
import com.hazelcast.internal.management.events.WanSyncIgnoredEvent;
import com.hazelcast.internal.monitor.LocalWanStats;
import com.hazelcast.internal.monitor.WanSyncState;
import com.hazelcast.internal.nio.ClassLoaderUtil;
import com.hazelcast.internal.partition.FragmentedMigrationAwareService;
import com.hazelcast.internal.partition.PartitionMigrationEvent;
import com.hazelcast.internal.partition.PartitionReplicationEvent;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.ObjectNamespace;
import com.hazelcast.internal.services.ServiceNamespace;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.ConstructorFunction;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.version.Version;
import com.hazelcast.wan.WanEventCounters;
import com.hazelcast.wan.WanMigrationAwarePublisher;
import com.hazelcast.wan.WanPublisher;
import com.hazelcast.wan.impl.AddWanConfigResult;
import com.hazelcast.wan.impl.DelegatingWanScheme;
import com.hazelcast.wan.impl.WanEventContainerReplicationOperation;
import com.hazelcast.wan.impl.WanEventCounterRegistry;
import com.hazelcast.wan.impl.WanReplicationService;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

public class WanReplicationServiceImpl
implements WanReplicationService,
FragmentedMigrationAwareService,
ManagedService {
    private final Node node;
    private final WanEventCounterRegistry receivedWanEventCounters = new WanEventCounterRegistry();
    private final WanEventCounterRegistry sentWanEventCounters = new WanEventCounterRegistry();
    private final ConcurrentMap<String, DelegatingWanScheme> wanReplications = MapUtil.createConcurrentHashMap(1);
    private final ConstructorFunction<String, DelegatingWanScheme> publisherDelegateConstructor;

    public WanReplicationServiceImpl(Node node) {
        this.node = node;
        this.publisherDelegateConstructor = name -> {
            WanReplicationConfig wanReplicationConfig = node.getConfig().getWanReplicationConfig((String)name);
            if (wanReplicationConfig == null) {
                return null;
            }
            List<WanBatchPublisherConfig> batchPublisherConfigs = wanReplicationConfig.getBatchPublisherConfigs();
            if (!batchPublisherConfigs.isEmpty()) {
                throw new InvalidConfigurationException("Built-in batching WAN replication implementation is only available in Hazelcast enterprise edition.");
            }
            return new DelegatingWanScheme((String)name, this.createPublishers(wanReplicationConfig));
        };
    }

    @Override
    public DelegatingWanScheme getWanReplicationPublishers(String wanReplicationScheme) {
        if (!this.wanReplications.containsKey(wanReplicationScheme) && this.node.getConfig().getWanReplicationConfig(wanReplicationScheme) == null) {
            return null;
        }
        return ConcurrencyUtil.getOrPutSynchronized(this.wanReplications, wanReplicationScheme, this, this.publisherDelegateConstructor);
    }

    private ConcurrentMap<String, WanPublisher> createPublishers(WanReplicationConfig wanReplicationConfig) {
        List<WanCustomPublisherConfig> customPublisherConfigs = wanReplicationConfig.getCustomPublisherConfigs();
        int publisherCount = customPublisherConfigs.size();
        if (publisherCount == 0) {
            return MapUtil.createConcurrentHashMap(1);
        }
        ConcurrentMap<String, WanPublisher> publishers = MapUtil.createConcurrentHashMap(publisherCount);
        Map publisherConfigs = MapUtil.createHashMap(publisherCount);
        customPublisherConfigs.forEach(publisherConfig -> {
            String publisherId = WanReplicationServiceImpl.getWanPublisherId(publisherConfig);
            if (publishers.containsKey(publisherId)) {
                throw new InvalidConfigurationException("Detected duplicate publisher ID '" + publisherId + "' for a single WAN replication config");
            }
            WanPublisher publisher = this.createPublisher((AbstractWanPublisherConfig)publisherConfig);
            publishers.put(publisherId, publisher);
            publisherConfigs.put(publisherId, publisherConfig);
        });
        for (Map.Entry publisherEntry : publishers.entrySet()) {
            String publisherId = (String)publisherEntry.getKey();
            WanPublisher publisher = (WanPublisher)publisherEntry.getValue();
            this.node.getSerializationService().getManagedContext().initialize(publisher);
            publisher.init(wanReplicationConfig, (AbstractWanPublisherConfig)publisherConfigs.get(publisherId));
        }
        return publishers;
    }

    private WanPublisher createPublisher(AbstractWanPublisherConfig publisherConfig) {
        WanPublisher publisher = ClassLoaderUtil.getOrCreate(publisherConfig.getImplementation(), this.node.getConfigClassLoader(), publisherConfig.getClassName());
        if (publisher == null) {
            throw new InvalidConfigurationException("Either 'implementation' or 'className' attribute need to be set in the WAN publisher configuration for publisher " + publisherConfig);
        }
        return publisher;
    }

    @Nonnull
    public static String getWanPublisherId(AbstractWanPublisherConfig publisherConfig) {
        String publisherId = null;
        if (!StringUtil.isNullOrEmptyAfterTrim(publisherConfig.getPublisherId())) {
            publisherId = publisherConfig.getPublisherId();
        } else if (publisherConfig instanceof WanBatchPublisherConfig) {
            publisherId = ((WanBatchPublisherConfig)publisherConfig).getClusterName();
        }
        if (publisherId == null) {
            throw new InvalidConfigurationException("Publisher ID or group name is not specified for " + publisherConfig);
        }
        return publisherId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        WanReplicationServiceImpl wanReplicationServiceImpl = this;
        synchronized (wanReplicationServiceImpl) {
            for (DelegatingWanScheme delegate : this.wanReplications.values()) {
                for (WanPublisher publisher : delegate.getPublishers()) {
                    if (publisher == null) continue;
                    publisher.shutdown();
                }
            }
            this.wanReplications.clear();
        }
    }

    @Override
    public void pause(String wanReplicationName, String wanPublisherId) {
        throw new UnsupportedOperationException("Pausing WAN replication is not supported.");
    }

    @Override
    public void stop(String wanReplicationName, String wanPublisherId) {
        throw new UnsupportedOperationException("Stopping WAN replication is not supported");
    }

    @Override
    public void resume(String wanReplicationName, String wanPublisherId) {
        throw new UnsupportedOperationException("Resuming WAN replication is not supported");
    }

    @Override
    public UUID syncMap(String wanReplicationName, String wanPublisherId, String mapName) {
        this.node.getManagementCenterService().log(WanSyncIgnoredEvent.enterpriseOnly(wanReplicationName, wanPublisherId, mapName));
        throw new UnsupportedOperationException("WAN sync for map is not supported.");
    }

    @Override
    public UUID syncAllMaps(String wanReplicationName, String wanPublisherId) {
        this.node.getManagementCenterService().log(WanSyncIgnoredEvent.enterpriseOnly(wanReplicationName, wanPublisherId, null));
        throw new UnsupportedOperationException("WAN sync is not supported.");
    }

    @Override
    public UUID consistencyCheck(String wanReplicationName, String wanPublisherId, String mapName) {
        this.node.getManagementCenterService().log(new WanConsistencyCheckIgnoredEvent(wanReplicationName, wanPublisherId, mapName, "Consistency check is supported for enterprise clusters only."));
        throw new UnsupportedOperationException("Consistency check is not supported.");
    }

    @Override
    public void removeWanEvents(String wanReplicationName, String wanPublisherId) {
        throw new UnsupportedOperationException("Clearing WAN replication queues is not supported.");
    }

    @Override
    public AddWanConfigResult addWanReplicationConfig(WanReplicationConfig wanReplicationConfig) {
        this.node.getManagementCenterService().log(AddWanConfigIgnoredEvent.enterpriseOnly(wanReplicationConfig.getName()));
        throw new UnsupportedOperationException("Adding new WAN config is not supported.");
    }

    @Override
    public void addWanReplicationConfigLocally(WanReplicationConfig wanReplicationConfig) {
        throw new UnsupportedOperationException("Adding new WAN config is not supported.");
    }

    @Override
    public Map<String, LocalWanStats> getStats() {
        return null;
    }

    @Override
    public WanSyncState getWanSyncState() {
        return null;
    }

    @Override
    public WanEventCounters getReceivedEventCounters(String serviceName) {
        return this.receivedWanEventCounters.getWanEventCounter("", "", serviceName);
    }

    @Override
    public WanEventCounters getSentEventCounters(String wanReplicationName, String wanPublisherId, String serviceName) {
        return this.sentWanEventCounters.getWanEventCounter(wanReplicationName, wanPublisherId, serviceName);
    }

    @Override
    public void removeWanEventCounters(String serviceName, String objectName) {
        this.receivedWanEventCounters.removeCounter(serviceName, objectName);
        this.sentWanEventCounters.removeCounter(serviceName, objectName);
    }

    @Override
    public List<Version> getSupportedWanProtocolVersions() {
        return Collections.emptyList();
    }

    @Override
    public void beforeMigration(PartitionMigrationEvent event) {
        this.notifyMigrationAwarePublishers(p -> p.onMigrationStart(event));
    }

    @Override
    public void commitMigration(PartitionMigrationEvent event) {
        this.notifyMigrationAwarePublishers(p -> p.onMigrationCommit(event));
    }

    @Override
    public void rollbackMigration(PartitionMigrationEvent event) {
        this.notifyMigrationAwarePublishers(p -> p.onMigrationRollback(event));
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
    }

    @Override
    public void reset() {
        Collection publishers = this.wanReplications.values();
        for (DelegatingWanScheme publisherDelegate : publishers) {
            for (WanPublisher publisher : publisherDelegate.getPublishers()) {
                publisher.reset();
            }
        }
    }

    @Override
    public void shutdown(boolean terminate) {
        this.reset();
    }

    @Override
    public Collection<ServiceNamespace> getAllServiceNamespaces(PartitionReplicationEvent event) {
        if (this.wanReplications.isEmpty()) {
            return Collections.emptyList();
        }
        HashSet<ServiceNamespace> namespaces = new HashSet<ServiceNamespace>();
        for (DelegatingWanScheme publisher : this.wanReplications.values()) {
            publisher.collectAllServiceNamespaces(event, namespaces);
        }
        return namespaces;
    }

    @Override
    public boolean isKnownServiceNamespace(ServiceNamespace namespace) {
        String serviceName = namespace.getServiceName();
        return namespace instanceof ObjectNamespace && "hz:impl:mapService".equals(serviceName);
    }

    @Override
    public Operation prepareReplicationOperation(PartitionReplicationEvent event, Collection<ServiceNamespace> namespaces) {
        if (this.wanReplications.isEmpty() || namespaces.isEmpty()) {
            return null;
        }
        Map<String, Map<String, Object>> eventContainers = MapUtil.createHashMap(this.wanReplications.size());
        for (Map.Entry wanReplicationEntry : this.wanReplications.entrySet()) {
            String replicationScheme = (String)wanReplicationEntry.getKey();
            DelegatingWanScheme delegate = (DelegatingWanScheme)wanReplicationEntry.getValue();
            Map<String, Object> publisherEventContainers = delegate.prepareEventContainerReplicationData(event, namespaces);
            if (publisherEventContainers.isEmpty()) continue;
            eventContainers.put(replicationScheme, publisherEventContainers);
        }
        if (eventContainers.isEmpty()) {
            return null;
        }
        return new WanEventContainerReplicationOperation(Collections.emptyList(), eventContainers, event.getPartitionId(), event.getReplicaIndex());
    }

    @Override
    public Operation prepareReplicationOperation(PartitionReplicationEvent event) {
        return this.prepareReplicationOperation(event, this.getAllServiceNamespaces(event));
    }

    @Override
    public WanPublisher getPublisherOrFail(String wanReplicationName, String wanPublisherId) {
        WanPublisher publisher = this.getPublisherOrNull(wanReplicationName, wanPublisherId);
        if (publisher == null) {
            throw new InvalidConfigurationException("WAN Replication Config doesn't exist with WAN configuration name " + wanReplicationName + " and publisher ID " + wanPublisherId);
        }
        return publisher;
    }

    @Override
    public void appendWanReplicationConfig(WanReplicationConfig newConfig) {
    }

    private WanPublisher getPublisherOrNull(String wanReplicationName, String wanPublisherId) {
        DelegatingWanScheme publisherDelegate = this.getWanReplicationPublishers(wanReplicationName);
        return publisherDelegate != null ? publisherDelegate.getPublisher(wanPublisherId) : null;
    }

    private void notifyMigrationAwarePublishers(Consumer<WanMigrationAwarePublisher> publisherConsumer) {
        for (DelegatingWanScheme wanReplication : this.wanReplications.values()) {
            for (WanPublisher publisher : wanReplication.getPublishers()) {
                if (!(publisher instanceof WanMigrationAwarePublisher)) continue;
                publisherConsumer.accept((WanMigrationAwarePublisher)((Object)publisher));
            }
        }
    }
}

