package com.oracle.coherence.patterns.eventdistribution.distributors;

import com.oracle.coherence.common.builders.ParameterizedBuilder;
import com.oracle.coherence.common.events.Event;
import com.oracle.coherence.common.threading.ExecutorServiceFactory;
import com.oracle.coherence.common.threading.ThreadFactories;
import com.oracle.coherence.common.tuples.Pair;
import com.oracle.coherence.configuration.parameters.Parameter;
import com.oracle.coherence.configuration.parameters.ParameterProvider;
import com.oracle.coherence.configuration.parameters.ScopedParameterProvider;
import com.oracle.coherence.environment.Environment;
import com.oracle.coherence.patterns.eventdistribution.EventChannel;
import com.oracle.coherence.patterns.eventdistribution.EventChannelController;
import com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean;
import com.oracle.coherence.patterns.eventdistribution.EventChannelNotReadyException;
import com.oracle.coherence.patterns.eventdistribution.EventDistributor;
import com.oracle.coherence.patterns.eventdistribution.EventIteratorTransformer;
import com.oracle.coherence.patterns.eventdistribution.EventTransformer;
import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntryEvent;
import com.oracle.coherence.patterns.eventdistribution.transformers.MutatingEventIteratorTransformer;
import com.tangosol.io.ExternalizableLite;
import com.tangosol.io.Serializer;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheFactory;
import com.tangosol.util.ExternalizableHelper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController.class */
public abstract class AbstractEventChannelController<T> implements EventChannelController, EventChannelControllerMBean {
    private static Logger logger = Logger.getLogger(AbstractEventChannelController.class.getName());
    protected Environment environment;
    protected EventDistributor.Identifier distributorIdentifier;
    protected EventChannelController.Identifier controllerIdentifier;
    protected EventChannelController.Dependencies controllerDependencies;
    protected EventChannel channel;
    protected EventIteratorTransformer transformer;
    protected ScheduledExecutorService executorService;
    protected Serializer serializer;
    protected long lastDistributionDurationMS;
    protected long minimumDistributionDurationMS;
    protected long maximumDistributionDurationMS;
    protected long totalDistributionDurationMS;
    protected int consecutiveDistributionFailures;
    protected int totalBatchesDistributed;
    protected int totalCandidateEvents;
    protected int totalEventsDistributed;
    private State state;

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController$DefaultDependencies.class */
    public static class DefaultDependencies implements EventChannelController.Dependencies, PortableObject, ExternalizableLite {
        public static final EventChannelController.Mode STARTING_MODE_DEFAULT = EventChannelController.Mode.ENABLED;
        public static final long BATCH_DISTRIBUTION_DELAY_DEFAULT = 1000;
        public static final int BATCH_SIZE_DEFAULT = 100;
        public static final long RESTART_DELAY_DEFAULT = 10000;
        public static final int TOTAL_CONSECUTIVE_FAILURES_BEFORE_SUSPENDING = -1;
        private String channelName;
        private String externalName;
        private ParameterizedBuilder<EventChannel> eventChannelBuilder;
        private ParameterizedBuilder<EventIteratorTransformer> transformerBuilder;
        private EventChannelController.Mode startingMode;
        private long batchDistributionDelayMS;
        private int batchSize;
        private long restartDelay;
        private int totalConsecutiveFailuresBeforeSuspending;

        public DefaultDependencies() {
        }

        public DefaultDependencies(String str, String str2, ParameterizedBuilder<EventChannel> parameterizedBuilder, ParameterizedBuilder<EventIteratorTransformer> parameterizedBuilder2, EventChannelController.Mode mode, long j, int i, long j2, int i2) {
            this.channelName = str;
            this.externalName = str2;
            this.eventChannelBuilder = parameterizedBuilder;
            this.transformerBuilder = parameterizedBuilder2;
            this.startingMode = mode;
            this.batchDistributionDelayMS = j;
            this.batchSize = i;
            this.restartDelay = j2;
            this.totalConsecutiveFailuresBeforeSuspending = i2;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public String getChannelName() {
            return this.channelName;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public String getExternalName() {
            return this.externalName;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public ParameterizedBuilder<EventChannel> getEventChannelBuilder() {
            return this.eventChannelBuilder;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public ParameterizedBuilder<EventIteratorTransformer> getEventsTransformerBuilder() {
            return this.transformerBuilder;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public EventChannelController.Mode getStartingMode() {
            return this.startingMode;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public long getBatchDistributionDelay() {
            return this.batchDistributionDelayMS;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public int getBatchSize() {
            return this.batchSize;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public long getRestartDelay() {
            return this.restartDelay;
        }

        @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController.Dependencies
        public int getTotalConsecutiveFailuresBeforeSuspending() {
            return this.totalConsecutiveFailuresBeforeSuspending;
        }

        public void readExternal(DataInput dataInput) throws IOException {
            this.channelName = ExternalizableHelper.readSafeUTF(dataInput);
            this.externalName = ExternalizableHelper.readSafeUTF(dataInput);
            this.eventChannelBuilder = (ParameterizedBuilder) ExternalizableHelper.readObject(dataInput);
            this.transformerBuilder = (ParameterizedBuilder) ExternalizableHelper.readObject(dataInput);
            this.startingMode = EventChannelController.Mode.valueOf(ExternalizableHelper.readSafeUTF(dataInput));
            this.batchDistributionDelayMS = ExternalizableHelper.readLong(dataInput);
            this.batchSize = ExternalizableHelper.readInt(dataInput);
            this.restartDelay = ExternalizableHelper.readLong(dataInput);
            this.totalConsecutiveFailuresBeforeSuspending = ExternalizableHelper.readInt(dataInput);
        }

        public void writeExternal(DataOutput dataOutput) throws IOException {
            ExternalizableHelper.writeSafeUTF(dataOutput, this.channelName);
            ExternalizableHelper.writeSafeUTF(dataOutput, this.externalName);
            ExternalizableHelper.writeObject(dataOutput, this.eventChannelBuilder);
            ExternalizableHelper.writeObject(dataOutput, this.transformerBuilder);
            ExternalizableHelper.writeSafeUTF(dataOutput, this.startingMode.name());
            ExternalizableHelper.writeLong(dataOutput, this.batchDistributionDelayMS);
            ExternalizableHelper.writeInt(dataOutput, this.batchSize);
            ExternalizableHelper.writeLong(dataOutput, this.restartDelay);
            ExternalizableHelper.writeInt(dataOutput, this.totalConsecutiveFailuresBeforeSuspending);
        }

        public void readExternal(PofReader pofReader) throws IOException {
            this.channelName = pofReader.readString(1);
            this.externalName = pofReader.readString(2);
            this.eventChannelBuilder = (ParameterizedBuilder) pofReader.readObject(3);
            this.transformerBuilder = (ParameterizedBuilder) pofReader.readObject(4);
            this.startingMode = EventChannelController.Mode.valueOf(pofReader.readString(5));
            this.batchDistributionDelayMS = pofReader.readLong(6);
            this.batchSize = pofReader.readInt(7);
            this.restartDelay = pofReader.readLong(8);
            this.totalConsecutiveFailuresBeforeSuspending = pofReader.readInt(9);
        }

        public void writeExternal(PofWriter pofWriter) throws IOException {
            pofWriter.writeString(1, this.channelName);
            pofWriter.writeString(2, this.externalName);
            pofWriter.writeObject(3, this.eventChannelBuilder);
            pofWriter.writeObject(4, this.transformerBuilder);
            pofWriter.writeString(5, this.startingMode.name());
            pofWriter.writeLong(6, this.batchDistributionDelayMS);
            pofWriter.writeInt(7, this.batchSize);
            pofWriter.writeLong(8, this.restartDelay);
            pofWriter.writeInt(9, this.totalConsecutiveFailuresBeforeSuspending);
        }

        public String toString() {
            return String.format("AbstractEventChannelController.Dependencies{channelName=%s, externalName=%s, eventChannelBuilder=%s, transformerBuilder=%s, startingMode=%s, batchDistributionDelayMS=%d, batchSize=%d, restartDelay=%d, totalConsecutiveFailuresBeforeSuspended=%d}", this.channelName, this.externalName, this.eventChannelBuilder, this.transformerBuilder, this.startingMode, Long.valueOf(this.batchDistributionDelayMS), Integer.valueOf(this.batchSize), Long.valueOf(this.restartDelay), Integer.valueOf(this.totalConsecutiveFailuresBeforeSuspending));
        }
    }

    /* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/AbstractEventChannelController$State.class */
    public enum State {
        DISABLING,
        DISABLED,
        SUSPENDED,
        PAUSED,
        STARTING,
        SCHEDULED,
        DISTRIBUTING,
        WAITING,
        STOPPING,
        STOPPED
    }

    public AbstractEventChannelController(EventDistributor.Identifier identifier, EventChannelController.Identifier identifier2, EventChannelController.Dependencies dependencies, Environment environment, ParameterProvider parameterProvider, ParameterizedBuilder<Serializer> parameterizedBuilder) {
        this.distributorIdentifier = identifier;
        this.controllerIdentifier = identifier2;
        this.controllerDependencies = dependencies;
        this.environment = environment;
        ScopedParameterProvider scopedParameterProvider = new ScopedParameterProvider(parameterProvider);
        scopedParameterProvider.addParameter(new Parameter("class-loader", environment.getClassLoader()));
        this.channel = (EventChannel) dependencies.getEventChannelBuilder().realize(scopedParameterProvider);
        this.transformer = dependencies.getEventsTransformerBuilder() != null ? (EventIteratorTransformer) dependencies.getEventsTransformerBuilder().realize(scopedParameterProvider) : null;
        this.executorService = ExecutorServiceFactory.newSingleThreadScheduledExecutor(ThreadFactories.newThreadFactory(true, "EventChannelController", new ThreadGroup("EventChannelController")));
        this.serializer = (Serializer) parameterizedBuilder.realize(scopedParameterProvider);
        switch (dependencies.getStartingMode()) {
            case ENABLED:
                this.state = State.PAUSED;
                break;
            case DISABLED:
                this.state = State.DISABLED;
                break;
            case SUSPENDED:
                this.state = State.SUSPENDED;
                break;
        }
        this.lastDistributionDurationMS = 0L;
        this.maximumDistributionDurationMS = Long.MIN_VALUE;
        this.minimumDistributionDurationMS = Long.MAX_VALUE;
        this.totalDistributionDurationMS = 0L;
        this.consecutiveDistributionFailures = 0;
        this.totalBatchesDistributed = 0;
        this.totalCandidateEvents = 0;
        this.totalEventsDistributed = 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Serializer getSerializer() {
        return this.serializer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void setState(State state) {
        this.state = state;
    }

    protected synchronized State getState() {
        return this.state;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public String getEventDistributorName() {
        return String.format("%s (%s)", this.distributorIdentifier.getSymbolicName(), this.distributorIdentifier.getExternalName());
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public String getEventChannelControllerName() {
        return String.format("%s (%s)", this.controllerIdentifier.getSymbolicName(), this.controllerIdentifier.getExternalName());
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public String getEventChannelControllerState() {
        return this.state.toString();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getLastBatchDistributionDuration() {
        return this.lastDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getMaximumBatchDistributionDuration() {
        if (this.maximumDistributionDurationMS == Long.MIN_VALUE) {
            return 0L;
        }
        return this.maximumDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getMinimumBatchDistributionDuration() {
        if (this.minimumDistributionDurationMS == Long.MAX_VALUE) {
            return 0L;
        }
        return this.minimumDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public long getTotalDistributionDuration() {
        return this.totalDistributionDurationMS;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getConsecutiveEventChannelFailures() {
        return this.consecutiveDistributionFailures;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getEventBatchesDistributedCount() {
        return this.totalBatchesDistributed;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getCandidateEventCount() {
        return this.totalCandidateEvents;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public int getEventsDistributedCount() {
        return this.totalEventsDistributed;
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void start() {
        final State state = getState();
        if (state == State.PAUSED || state == State.DISABLED || state == State.SUSPENDED) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Scheduled EventChannelController {0} to start.", this.controllerIdentifier);
            }
            setState(State.STARTING);
            schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.1
                @Override // java.lang.Runnable
                public void run() {
                    AbstractEventChannelController.this.onStart(state);
                }
            }, 0L, TimeUnit.MILLISECONDS);
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void stop() {
        State state = getState();
        if (state == State.STOPPING || state == State.STOPPED) {
            return;
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Scheduled EventChannelController {0} to stop.", this.controllerIdentifier);
        }
        setState(State.STOPPING);
        schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.2
            @Override // java.lang.Runnable
            public void run() {
                AbstractEventChannelController.this.onStop();
            }
        }, 0L, TimeUnit.MILLISECONDS);
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void preempt() {
        if (getState() == State.WAITING) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Premptively scheduled EventChannelController {0} to commence event distribution.", this.controllerIdentifier);
            }
            setState(State.SCHEDULED);
            schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.3
                @Override // java.lang.Runnable
                public void run() {
                    AbstractEventChannelController.this.onDistribute();
                }
            }, 0L, TimeUnit.MILLISECONDS);
        }
    }

    protected abstract void internalDisable();

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void disable() {
        synchronized (this) {
            State state = getState();
            if (state == State.SCHEDULED || state == State.WAITING || state == State.PAUSED || state == State.DISTRIBUTING) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Scheduled EventChannelController {0} to become disabled.", this.controllerIdentifier);
                }
                setState(State.DISABLING);
                schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.4
                    @Override // java.lang.Runnable
                    public void run() {
                        AbstractEventChannelController.this.onDisable();
                    }
                }, 0L, TimeUnit.MILLISECONDS);
            }
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void suspend() {
        synchronized (this) {
            State state = getState();
            if (state == State.DISTRIBUTING || state == State.SCHEDULED || state == State.PAUSED || state == State.DISABLED || state == State.WAITING) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Suspended EventChannelController {0}.", this.controllerIdentifier);
                }
                setState(State.SUSPENDED);
                if (state == State.DISABLED) {
                    internalEnable();
                }
            }
        }
    }

    protected abstract void internalDrain();

    @Override // com.oracle.coherence.patterns.eventdistribution.EventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void drain() {
        synchronized (this) {
            if (getState() == State.SUSPENDED) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINE, "Commenced Draining events for EventChannelController {0}", this.controllerIdentifier);
                }
                internalDrain();
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINE, "Completed Draining events for EventChannelController {0}", this.controllerIdentifier);
                }
            } else if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, "Can't drain events for {0} as it is not Suspended. Please suspend first before attempting to drain.", this.controllerIdentifier);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void schedule(Runnable runnable, long j, TimeUnit timeUnit) {
        if (getState() != State.STOPPED) {
            this.executorService.schedule(runnable, j, timeUnit);
        }
    }

    protected abstract void internalEnable();

    protected abstract void internalStart() throws EventChannelNotReadyException;

    /* JADX INFO: Access modifiers changed from: private */
    public void onStart(final State state) {
        if (getState() == State.STARTING) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Starting EventChannelController {0}", this.controllerIdentifier);
            }
            try {
                if (state == State.DISABLED) {
                    internalEnable();
                }
                internalStart();
                this.consecutiveDistributionFailures = 0;
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Scheduled EventChannelController {0} to commence event distribution.", this.controllerIdentifier);
                }
                setState(State.SCHEDULED);
                schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.5
                    @Override // java.lang.Runnable
                    public void run() {
                        AbstractEventChannelController.this.onDistribute();
                    }
                }, 0L, TimeUnit.MILLISECONDS);
            } catch (EventChannelNotReadyException e) {
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "The {0} is not ready to start.  Deferring (re)start for {1} ms", new Object[]{this.controllerIdentifier, Long.valueOf(this.controllerDependencies.getRestartDelay())});
                }
                setState(State.PAUSED);
                schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.6
                    @Override // java.lang.Runnable
                    public void run() {
                        AbstractEventChannelController.this.setState(State.STARTING);
                        AbstractEventChannelController.this.onStart(state);
                    }
                }, this.controllerDependencies.getRestartDelay(), TimeUnit.MILLISECONDS);
            } catch (RuntimeException e2) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "Failed while attempting to start {0}", this.controllerIdentifier);
                }
                if (logger.isLoggable(Level.INFO)) {
                    logger.log(Level.INFO, "EventChannel Exception was as follows", (Throwable) e2);
                }
                this.consecutiveDistributionFailures++;
                if (this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending() >= 0 && this.consecutiveDistributionFailures >= this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending()) {
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.log(Level.WARNING, "Suspending {0} as there have been too many (%d) consecutive failures ", this.controllerIdentifier);
                    }
                    setState(State.SUSPENDED);
                } else {
                    if (logger.isLoggable(Level.FINER)) {
                        logger.log(Level.FINER, "Scheduled another attempt to start EventChannelController {0}.", this.controllerIdentifier);
                    }
                    setState(State.PAUSED);
                    schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.7
                        @Override // java.lang.Runnable
                        public void run() {
                            AbstractEventChannelController.this.setState(State.STARTING);
                            AbstractEventChannelController.this.onStart(state);
                        }
                    }, this.controllerDependencies.getRestartDelay(), TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    protected abstract void internalStop();

    /* JADX INFO: Access modifiers changed from: private */
    public void onStop() {
        if (getState() == State.STOPPING) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Stopping EventChannelController {0}", this.controllerIdentifier);
            }
            try {
                try {
                    internalStop();
                    setState(State.STOPPED);
                } catch (RuntimeException e) {
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.log(Level.WARNING, "Failed while attempting to stop {0}", this.controllerIdentifier);
                    }
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "EventChannel Exception was as follows", (Throwable) e);
                    }
                    setState(State.STOPPED);
                }
            } catch (Throwable th) {
                setState(State.STOPPED);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onDisable() {
        if (getState() == State.DISABLING) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "Disabling EventChannelController {0}", this.controllerIdentifier);
            }
            try {
                try {
                    internalDisable();
                    setState(State.DISABLED);
                    if (logger.isLoggable(Level.FINER)) {
                        logger.log(Level.FINER, "Disabled EventChannelController {0}", this.controllerIdentifier);
                    }
                } catch (RuntimeException e) {
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.log(Level.WARNING, "Failed while attempting to disable EventChannelController {0}", this.controllerIdentifier);
                    }
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "EventChannelController Exception was as follows", (Throwable) e);
                    }
                    setState(State.DISABLED);
                    if (logger.isLoggable(Level.FINER)) {
                        logger.log(Level.FINER, "Disabled EventChannelController {0}", this.controllerIdentifier);
                    }
                }
            } catch (Throwable th) {
                setState(State.DISABLED);
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Disabled EventChannelController {0}", this.controllerIdentifier);
                }
                throw th;
            }
        }
    }

    protected abstract Pair<List<Event>, T> getEventsToDistribute();

    protected abstract void acknowledgeDistributedEvents(List<Event> list, T t);

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void onDistribute() {
        List list;
        Object y;
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Commenced distribution with EventChannel {0}", this.controllerIdentifier);
        }
        synchronized (this) {
            State state = getState();
            if (state == State.SCHEDULED || state == State.WAITING) {
                setState(State.DISTRIBUTING);
            }
        }
        while (getState() == State.DISTRIBUTING) {
            try {
                Pair eventsToDistribute = getEventsToDistribute();
                list = (List) eventsToDistribute.getX();
                y = eventsToDistribute.getY();
            } catch (RuntimeException e) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.SEVERE, "EventChannelController {0} failed to distribute events", this.controllerIdentifier);
                }
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.INFO, "Exception was as follows:", (Throwable) e);
                }
                this.consecutiveDistributionFailures++;
                if (this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending() < 0 || this.consecutiveDistributionFailures < this.controllerDependencies.getTotalConsecutiveFailuresBeforeSuspending()) {
                    setState(State.PAUSED);
                    schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.11
                        @Override // java.lang.Runnable
                        public void run() {
                            AbstractEventChannelController.this.setState(State.STARTING);
                            AbstractEventChannelController.this.onStart(State.DISTRIBUTING);
                        }
                    }, this.controllerDependencies.getRestartDelay(), TimeUnit.MILLISECONDS);
                } else {
                    if (logger.isLoggable(Level.WARNING)) {
                        logger.log(Level.WARNING, "Suspending distribution for {0} as there have been too many ({1}) consecutive failures ", new Object[]{this.controllerIdentifier, Integer.valueOf(this.consecutiveDistributionFailures)});
                    }
                    setState(State.SUSPENDED);
                }
            }
            if (list.isEmpty()) {
                synchronized (this) {
                    if (getState() == State.DISTRIBUTING) {
                        setState(State.WAITING);
                        schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.8
                            @Override // java.lang.Runnable
                            public void run() {
                                AbstractEventChannelController.this.onDistribute();
                            }
                        }, this.controllerDependencies.getBatchDistributionDelay(), TimeUnit.MILLISECONDS);
                    }
                }
            } else {
                int size = list.size();
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Commencing distributing {1} event(s) with EventChannel {0}", new Object[]{this.controllerIdentifier, Integer.valueOf(list.size())});
                }
                long currentTimeMillis = System.currentTimeMillis();
                Iterator<Event> transform = new MutatingEventIteratorTransformer(new EventTransformer() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.9
                    public Event transform(Event event) {
                        if (!(event instanceof DistributableEntryEvent)) {
                            return event;
                        }
                        DistributableEntryEvent distributableEntryEvent = (DistributableEntryEvent) event;
                        distributableEntryEvent.m33getEntry().setContext(CacheFactory.getCache(distributableEntryEvent.getCacheName()).getCacheService().getBackingMapManager().getContext());
                        return distributableEntryEvent;
                    }
                }).transform(list.iterator());
                int send = this.channel.send(this.transformer == null ? transform : (Iterator) this.transformer.transform(transform));
                this.totalBatchesDistributed++;
                this.totalCandidateEvents += size;
                this.totalEventsDistributed += send;
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                this.lastDistributionDurationMS = currentTimeMillis2;
                this.maximumDistributionDurationMS = Math.max(this.maximumDistributionDurationMS, currentTimeMillis2);
                this.minimumDistributionDurationMS = Math.min(this.minimumDistributionDurationMS, currentTimeMillis2);
                this.totalDistributionDurationMS += currentTimeMillis2;
                if (logger.isLoggable(Level.FINER)) {
                    logger.log(Level.FINER, "Completed distributing {1} Event(s) with EventChannel {0}", new Object[]{this.controllerIdentifier, Integer.valueOf(list.size())});
                }
                acknowledgeDistributedEvents(list, y);
                this.consecutiveDistributionFailures = 0;
                synchronized (this) {
                    if (getState() == State.DISTRIBUTING && this.controllerDependencies.getBatchDistributionDelay() > 0) {
                        setState(State.SCHEDULED);
                        schedule(new Runnable() { // from class: com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController.10
                            @Override // java.lang.Runnable
                            public void run() {
                                AbstractEventChannelController.this.onDistribute();
                            }
                        }, this.controllerDependencies.getBatchDistributionDelay(), TimeUnit.MILLISECONDS);
                    }
                }
            }
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.log(Level.FINER, "Completed distribution with {0}", this.controllerIdentifier);
        }
    }
}
