package org.apache.ignite.internal.processors.streamer;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.streamer.StreamerConfiguration;
import org.apache.ignite.streamer.StreamerContext;
import org.apache.ignite.streamer.StreamerEventRouter;
import org.apache.ignite.streamer.StreamerFailureListener;
import org.apache.ignite.streamer.StreamerMetrics;
import org.apache.ignite.streamer.StreamerStage;
import org.apache.ignite.streamer.StreamerWindow;
import org.apache.ignite.streamer.router.StreamerLocalEventRouter;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jdk8.backport.ConcurrentHashMap8;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.class */
public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
    private static final long serialVersionUID = 0;
    private static final int SEND_RETRY_COUNT = 3;
    private static final int SEND_RETRY_DELAY = 1000;
    private static final int CANCELLED_FUTS_HISTORY_SIZE = 4096;
    private IgniteLogger log;
    private GridKernalContext ctx;
    private StreamerContext streamerCtx;
    private GridSpinReadWriteLock lock;
    private boolean stopping;
    private StreamerConfiguration c;
    private String name;

    @GridToStringInclude
    private Map<String, StreamerStageWrapper> stages;

    @GridToStringInclude
    private Map<String, StreamerWindow> winMap;
    private StreamerWindow dfltWin;
    private String firstStage;
    private StreamerEventRouter router;
    private boolean atLeastOnce;
    private volatile StreamerMetricsHolder streamerMetrics;
    private Object topic;
    private ConcurrentMap<IgniteUuid, GridStreamerStageExecutionFuture> stageFuts;
    private ConcurrentMap<IgniteUuid, BatchExecutionFuture> batchFuts;
    private ExecutorService execSvc;
    private Semaphore sem;
    private Class<?> depCls;
    private int execSvcCap;
    static final /* synthetic */ boolean $assertionsDisabled;
    private Collection<StreamerFailureListener> failureLsnrs = new ConcurrentLinkedQueue();
    private Collection<IgniteUuid> cancelledFutIds = new GridBoundedConcurrentLinkedHashSet(4096);
    private final GridSpinReadWriteLock winLock = new GridSpinReadWriteLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl$BatchExecutionFuture.class */
    public static class BatchExecutionFuture extends GridCompoundFuture<Object, Object> {
        private static final long serialVersionUID = 0;
        private BatchWorker w;
        static final /* synthetic */ boolean $assertionsDisabled;

        private BatchExecutionFuture() {
        }

        @Override // org.apache.ignite.internal.util.future.GridCompoundFuture, org.apache.ignite.internal.util.future.GridFutureAdapter, org.apache.ignite.internal.IgniteInternalFuture
        public boolean cancel() throws IgniteCheckedException {
            if (!$assertionsDisabled && this.w == null) {
                throw new AssertionError();
            }
            if (!super.cancel()) {
                return false;
            }
            this.w.cancel();
            return true;
        }

        public void setWorker(BatchWorker batchWorker) {
            if (!$assertionsDisabled && batchWorker == null) {
                throw new AssertionError();
            }
            this.w = batchWorker;
        }

        static {
            $assertionsDisabled = !IgniteStreamerImpl.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl$BatchWorker.class */
    public class BatchWorker extends GridWorker {
        private GridStreamerExecutionBatch batch;
        private StreamerStageWrapper stageWrapper;
        private StreamerMetricsHolder streamerHolder;
        private long schedTs;
        private BatchExecutionFuture fut;
        static final /* synthetic */ boolean $assertionsDisabled;

        private BatchWorker(GridStreamerExecutionBatch gridStreamerExecutionBatch, StreamerStageWrapper streamerStageWrapper, StreamerMetricsHolder streamerMetricsHolder) {
            super(IgniteStreamerImpl.this.ctx.gridName(), "streamer-batch-worker-" + gridStreamerExecutionBatch.stageName(), log);
            this.fut = new BatchExecutionFuture();
            if (!$assertionsDisabled && streamerStageWrapper == null) {
                throw new AssertionError();
            }
            this.batch = gridStreamerExecutionBatch;
            this.stageWrapper = streamerStageWrapper;
            this.streamerHolder = streamerMetricsHolder;
            this.schedTs = U.currentTimeMillis();
            this.fut.setWorker(this);
        }

        public BatchExecutionFuture completionFuture() {
            return this.fut;
        }

        /* JADX WARN: Finally extract failed */
        @Override // org.apache.ignite.internal.util.worker.GridWorker
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            GridStreamerContextDelegate gridStreamerContextDelegate;
            try {
                long currentTimeMillis = U.currentTimeMillis();
                this.streamerHolder.onStageExecutionStarted(this.stageWrapper.index(), currentTimeMillis - this.schedTs);
                try {
                    try {
                        if (log.isDebugEnabled()) {
                            log.debug("Running streamer stage [stage=" + this.stageWrapper.name() + ", futId=" + this.batch.futureId() + ']');
                        }
                        gridStreamerContextDelegate = new GridStreamerContextDelegate(IgniteStreamerImpl.this.context(), this.stageWrapper.nextStageName());
                        IgniteStreamerImpl.this.winLock.readLock();
                    } catch (Throwable th) {
                        this.streamerHolder.onStageExecutionFinished(this.stageWrapper.index(), (0 == 0 ? U.currentTimeMillis() : 0L) - currentTimeMillis);
                        throw th;
                    }
                } catch (IgniteCheckedException e) {
                    if (!IgniteStreamerImpl.this.atLeastOnce) {
                        IgniteStreamerImpl.this.notifyFailure(this.batch.stageName(), this.batch.events(), e);
                        this.streamerHolder.onStageFailure(this.stageWrapper.index());
                    }
                    this.fut.onDone((Throwable) e);
                    this.streamerHolder.onStageExecutionFinished(this.stageWrapper.index(), (0 == 0 ? U.currentTimeMillis() : 0L) - currentTimeMillis);
                }
                try {
                    Map<String, Collection<?>> run = this.stageWrapper.run(gridStreamerContextDelegate, this.batch.events());
                    IgniteStreamerImpl.this.winLock.readUnlock();
                    GridDeployment deployment = this.batch.deployment();
                    if (deployment != null && deployment.obsolete()) {
                        IgniteStreamerImpl.this.unwindUndeploys(deployment.classLoader(), false);
                    }
                    if (run != null) {
                        for (Map.Entry<String, Collection<?>> entry : run.entrySet()) {
                            if (entry.getKey() == null) {
                                throw new IgniteCheckedException("Failed to pass events to next stage (stage name cannot be null).");
                            }
                            GridStreamerStageExecutionFuture addEvents0 = IgniteStreamerImpl.this.addEvents0(this.batch.executionId(), 0, this.batch.executionStartTimeStamp(), this.batch.futureId(), this.batch.executionNodeIds(), entry.getKey(), entry.getValue());
                            if (IgniteStreamerImpl.this.atLeastOnce) {
                                this.fut.add(addEvents0);
                            }
                        }
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("Finished pipeline execution [stage=" + this.stageWrapper.name() + ", futId=" + this.batch.futureId() + ']');
                        }
                        r13 = U.currentTimeMillis();
                        this.streamerHolder.onPipelineCompleted(r13 - this.batch.executionStartTimeStamp(), this.batch.executionNodeIds().size());
                    }
                    if (r13 == 0) {
                        r13 = U.currentTimeMillis();
                    }
                    this.streamerHolder.onStageExecutionFinished(this.stageWrapper.index(), r13 - currentTimeMillis);
                } catch (Throwable th2) {
                    IgniteStreamerImpl.this.winLock.readUnlock();
                    throw th2;
                }
            } finally {
                this.fut.markInitialized();
            }
        }

        static {
            $assertionsDisabled = !IgniteStreamerImpl.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl$StreamerPda.class */
    public class StreamerPda implements GridPeerDeployAware {
        private static final long serialVersionUID = 0;
        private Class<?> cls;
        private ClassLoader ldr;
        private Collection<Object> objs;
        static final /* synthetic */ boolean $assertionsDisabled;

        private StreamerPda(Collection<Object> collection) {
            this.objs = collection;
        }

        @Override // org.apache.ignite.internal.util.lang.GridPeerDeployAware
        public Class<?> deployClass() {
            if (this.cls == null) {
                Class<?> cls = null;
                if (IgniteStreamerImpl.this.depCls != null) {
                    cls = IgniteStreamerImpl.this.depCls;
                } else {
                    Iterator<Object> it = this.objs.iterator();
                    while (true) {
                        if ((cls == null || U.isJdk(cls)) && it.hasNext()) {
                            cls = U.detectClass(it.next());
                        }
                    }
                    if (cls == null || U.isJdk(cls)) {
                        cls = IgniteStreamerImpl.class;
                    }
                }
                if (!$assertionsDisabled && cls == null) {
                    throw new AssertionError("Failed to detect deploy class [objs=" + this.objs + ']');
                }
                this.cls = cls;
            }
            return this.cls;
        }

        @Override // org.apache.ignite.internal.util.lang.GridPeerDeployAware
        public ClassLoader classLoader() {
            if (this.ldr == null) {
                ClassLoader classLoader = deployClass().getClassLoader();
                if (classLoader == null) {
                    classLoader = U.gridClassLoader();
                }
                if (!$assertionsDisabled && classLoader == null) {
                    throw new AssertionError("Failed to detect classloader [objs=" + this.objs + ']');
                }
                this.ldr = classLoader;
            }
            return this.ldr;
        }

        static {
            $assertionsDisabled = !IgniteStreamerImpl.class.desiredAssertionStatus();
        }
    }

    public IgniteStreamerImpl() {
    }

    public IgniteStreamerImpl(GridKernalContext gridKernalContext, StreamerConfiguration streamerConfiguration) {
        this.ctx = gridKernalContext;
        this.log = gridKernalContext.log(IgniteStreamerImpl.class);
        this.atLeastOnce = streamerConfiguration.isAtLeastOnce();
        this.name = streamerConfiguration.getName();
        this.router = streamerConfiguration.getRouter();
        this.c = streamerConfiguration;
        if (this.atLeastOnce && streamerConfiguration.getMaximumConcurrentSessions() > 0) {
            this.sem = new Semaphore(streamerConfiguration.getMaximumConcurrentSessions());
        }
        this.topic = this.name == null ? GridTopic.TOPIC_STREAM : GridTopic.TOPIC_STREAM.topic(this.name);
        this.lock = new GridSpinReadWriteLock();
        this.stageFuts = new ConcurrentHashMap8();
        this.batchFuts = new ConcurrentHashMap8();
        this.streamerCtx = new GridStreamerContextImpl(gridKernalContext, streamerConfiguration, this);
    }

    public void start() throws IgniteCheckedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Starting streamer: " + this.name);
        }
        if (F.isEmpty((Collection<?>) this.c.getStages())) {
            throw new IgniteCheckedException("Streamer should have at least one stage configured (fix configuration and restart): " + this.name);
        }
        if (F.isEmpty((Collection<?>) this.c.getWindows())) {
            throw new IgniteCheckedException("Streamer should have at least one window configured (fix configuration and restart): " + this.name);
        }
        prepareResources();
        U.startLifecycleAware(lifecycleAwares());
        this.stages = U.newLinkedHashMap(this.c.getStages().size());
        int i = 0;
        StreamerStageWrapper streamerStageWrapper = null;
        for (StreamerStage streamerStage : this.c.getStages()) {
            String name = streamerStage.name();
            if (F.isEmpty(name)) {
                throw new IgniteCheckedException("Streamer stage should have non-empty name [streamerName=" + this.name + ", stage=" + streamerStage + ']');
            }
            if (this.stages.containsKey(name)) {
                throw new IgniteCheckedException("Streamer stages have duplicate names (all names should be unique) [streamerName=" + this.name + ", stage=" + streamerStage + ", stageName=" + name + ']');
            }
            if (this.firstStage == null) {
                this.firstStage = name;
            }
            StreamerStageWrapper streamerStageWrapper2 = new StreamerStageWrapper(streamerStage, i);
            this.stages.put(name, streamerStageWrapper2);
            if (streamerStageWrapper != null) {
                streamerStageWrapper.nextStageName(streamerStage.name());
            }
            streamerStageWrapper = streamerStageWrapper2;
            i++;
        }
        this.winMap = new LinkedHashMap();
        for (StreamerWindow streamerWindow : this.c.getWindows()) {
            String name2 = streamerWindow.name();
            if (F.isEmpty(name2)) {
                throw new IgniteCheckedException("Streamer window should have non-empty name [streamerName=" + this.name + ", window=" + streamerWindow + ']');
            }
            if (this.winMap.containsKey(name2)) {
                throw new IgniteCheckedException("Streamer windows have duplicate names (all names should be unique). If you use two or more windows of the same type you need to assign their names explicitly [streamer=" + this.name + ", windowName=" + name2 + ']');
            }
            this.winMap.put(name2, streamerWindow);
            if (this.dfltWin == null) {
                this.dfltWin = streamerWindow;
            }
        }
        this.execSvc = new IgniteThreadPoolExecutor(this.ctx.gridName(), this.c.getThreadPoolSize(), this.c.getThreadPoolSize(), 0L, new LinkedBlockingQueue());
        this.execSvcCap = this.c.getThreadPoolSize();
        resetMetrics();
        if (this.router == null) {
            this.router = new StreamerLocalEventRouter();
        }
        this.ctx.io().addMessageListener(this.topic, new GridMessageListener() { // from class: org.apache.ignite.internal.processors.streamer.IgniteStreamerImpl.1
            @Override // org.apache.ignite.internal.managers.communication.GridMessageListener
            public void onMessage(UUID uuid, Object obj) {
                if (IgniteStreamerImpl.this.log.isDebugEnabled()) {
                    IgniteStreamerImpl.this.log.debug("Received message [nodeId=" + uuid + ", msg=" + obj + ']');
                }
                IgniteStreamerImpl.this.processStreamerMessage(uuid, obj);
            }
        });
        this.ctx.event().addLocalEventListener(new GridLocalEventListener() { // from class: org.apache.ignite.internal.processors.streamer.IgniteStreamerImpl.2
            @Override // org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener
            public void onEvent(Event event) {
                DiscoveryEvent discoveryEvent = (DiscoveryEvent) event;
                Iterator it = IgniteStreamerImpl.this.stageFuts.values().iterator();
                while (it.hasNext()) {
                    ((GridStreamerStageExecutionFuture) it.next()).onNodeLeft(discoveryEvent.eventNode().id());
                }
            }
        }, 11, 12);
    }

    private void prepareResources() throws IgniteCheckedException {
        Iterator<StreamerStage> it = this.c.getStages().iterator();
        while (it.hasNext()) {
            this.ctx.resource().injectGeneric(it.next());
        }
        if (this.router == null) {
            this.router = new StreamerLocalEventRouter();
        }
        this.ctx.resource().injectGeneric(this.router);
        Iterator<StreamerWindow> it2 = this.c.getWindows().iterator();
        while (it2.hasNext()) {
            this.ctx.resource().injectGeneric(it2.next());
        }
    }

    public void onKernalStop(boolean z) {
        this.lock.writeLock();
        try {
            this.stopping = true;
            this.lock.writeUnlock();
            if (z) {
                for (BatchExecutionFuture batchExecutionFuture : this.batchFuts.values()) {
                    try {
                        batchExecutionFuture.cancel();
                    } catch (IgniteCheckedException e) {
                        U.warn(this.log, "Failed to cancel batch execution future on node stop (will ignore) [execFut=" + batchExecutionFuture + ", err=" + e + ']');
                    }
                }
            } else {
                for (GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture : this.stageFuts.values()) {
                    try {
                        if (gridStreamerStageExecutionFuture.rootExecution()) {
                            if (this.log.isDebugEnabled()) {
                                this.log.debug("Waiting root execution future on kernal stop: " + gridStreamerStageExecutionFuture);
                            }
                            gridStreamerStageExecutionFuture.get();
                        }
                    } catch (IgniteCheckedException e2) {
                    }
                }
                for (BatchExecutionFuture batchExecutionFuture2 : this.batchFuts.values()) {
                    try {
                        batchExecutionFuture2.get();
                    } catch (IgniteCheckedException e3) {
                        if (!e3.hasCause(IgniteInterruptedCheckedException.class)) {
                            U.warn(this.log, "Failed to wait for batch execution future completion (will ignore) [execFut=" + batchExecutionFuture2 + ", e=" + e3 + ']');
                        }
                    }
                }
            }
            for (StreamerStageWrapper streamerStageWrapper : this.stages.values()) {
                try {
                    this.ctx.resource().cleanupGeneric(streamerStageWrapper.unwrap());
                } catch (IgniteCheckedException e4) {
                    U.error(this.log, "Failed to cleanup stage [stage=" + streamerStageWrapper + ", streamer=" + this + ']', e4);
                }
            }
        } catch (Throwable th) {
            this.lock.writeUnlock();
            throw th;
        }
    }

    public void stop(boolean z) {
        this.ctx.io().removeMessageListener(this.topic);
        this.execSvc.shutdownNow();
        U.stopLifecycleAware(this.log, lifecycleAwares());
    }

    private Iterable<Object> lifecycleAwares() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(configuration().getStages());
        arrayList.addAll(configuration().getWindows());
        arrayList.add(this.router);
        return arrayList;
    }

    @Override // org.apache.ignite.IgniteStreamer
    @Nullable
    public String name() {
        return this.name;
    }

    @Override // org.apache.ignite.IgniteStreamer
    public StreamerConfiguration configuration() {
        return this.c;
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void addEvent(Object obj, Object... objArr) {
        A.notNull(obj, "evt");
        if (F.isEmpty(objArr)) {
            addEvents(Collections.singleton(obj));
        } else {
            addEvents(F.concat(false, obj, (Collection<Object>) Arrays.asList(objArr)));
        }
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void addEventToStage(String str, Object obj, Object... objArr) {
        A.notNull(str, "stageName");
        A.notNull(obj, "evt");
        if (F.isEmpty(objArr)) {
            addEventsToStage(str, Collections.singleton(obj));
        } else {
            addEventsToStage(str, F.concat(false, obj, (Collection<Object>) Arrays.asList(objArr)));
        }
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void addEvents(Collection<?> collection) {
        A.ensure(!F.isEmpty(collection), "evts cannot be null or empty");
        addEventsToStage(this.firstStage, collection);
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void addEventsToStage(String str, Collection<?> collection) {
        A.notNull(str, "stageName");
        A.ensure(!F.isEmpty(collection), "evts cannot be empty or null");
        this.ctx.gateway().readLock();
        try {
            try {
                addEvents0(null, 0, U.currentTimeMillis(), null, Collections.singleton(this.ctx.localNodeId()), str, collection);
                this.ctx.gateway().readUnlock();
            } catch (IgniteCheckedException e) {
                throw U.convertException(e);
            }
        } catch (Throwable th) {
            this.ctx.gateway().readUnlock();
            throw th;
        }
    }

    @Override // org.apache.ignite.IgniteStreamer
    public StreamerContext context() {
        return this.streamerCtx;
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void addStreamerFailureListener(StreamerFailureListener streamerFailureListener) {
        this.failureLsnrs.add(streamerFailureListener);
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void removeStreamerFailureListener(StreamerFailureListener streamerFailureListener) {
        this.failureLsnrs.remove(streamerFailureListener);
    }

    @Override // org.apache.ignite.IgniteStreamer
    public StreamerMetrics metrics() {
        StreamerMetricsAdapter streamerMetricsAdapter = new StreamerMetricsAdapter(this.streamerMetrics);
        this.streamerMetrics.sampleCurrentStages();
        return streamerMetricsAdapter;
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void reset() {
        this.winLock.writeLock();
        try {
            Iterator<StreamerWindow> it = this.winMap.values().iterator();
            while (it.hasNext()) {
                it.next().reset();
            }
            this.streamerCtx.localSpace().clear();
            this.winLock.writeUnlock();
        } catch (Throwable th) {
            this.winLock.writeUnlock();
            throw th;
        }
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void resetMetrics() {
        StreamerStageMetricsHolder[] streamerStageMetricsHolderArr = new StreamerStageMetricsHolder[this.c.getStages().size()];
        int i = 0;
        Iterator<StreamerStage> it = this.c.getStages().iterator();
        while (it.hasNext()) {
            streamerStageMetricsHolderArr[i] = new StreamerStageMetricsHolder(it.next().name());
            i++;
        }
        StreamerWindowMetricsHolder[] streamerWindowMetricsHolderArr = new StreamerWindowMetricsHolder[this.c.getWindows().size()];
        int i2 = 0;
        Iterator<StreamerWindow> it2 = this.c.getWindows().iterator();
        while (it2.hasNext()) {
            streamerWindowMetricsHolderArr[i2] = new StreamerWindowMetricsHolder(it2.next());
            i2++;
        }
        this.streamerMetrics = new StreamerMetricsHolder(streamerStageMetricsHolderArr, streamerWindowMetricsHolderArr, this.execSvcCap);
    }

    @Override // org.apache.ignite.IgniteStreamer
    public void deployClass(Class<?> cls) {
        this.depCls = cls;
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public <E> StreamerWindow<E> window() {
        return this.dfltWin;
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public <E> StreamerWindow<E> window(String str) {
        return this.winMap.get(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean atLeastOnce() {
        return this.atLeastOnce;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int stageFutureMapSize() {
        return this.stageFuts.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int batchFutureMapSize() {
        return this.batchFuts.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public GridStreamerStageExecutionFuture addEvents0(@Nullable IgniteUuid igniteUuid, int i, long j, @Nullable IgniteUuid igniteUuid2, @Nullable Collection<UUID> collection, String str, Collection<?> collection2) throws IgniteInterruptedCheckedException {
        if (!$assertionsDisabled && F.isEmpty(collection2)) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && F.isEmpty(str)) {
            throw new AssertionError();
        }
        GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture = new GridStreamerStageExecutionFuture(this, igniteUuid, i, j, igniteUuid2, collection, str, collection2);
        if (this.atLeastOnce && gridStreamerStageExecutionFuture.rootExecution()) {
            StreamerMetricsHolder streamerMetricsHolder = this.streamerMetrics;
            streamerMetricsHolder.onSessionStarted();
            gridStreamerStageExecutionFuture.metrics(streamerMetricsHolder);
        }
        if (this.atLeastOnce && gridStreamerStageExecutionFuture.rootExecution() && gridStreamerStageExecutionFuture.failoverAttemptCount() == 0) {
            try {
                if (this.sem != null) {
                    this.sem.acquire();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IgniteInterruptedCheckedException(e);
            }
        }
        gridStreamerStageExecutionFuture.map();
        if (!this.atLeastOnce && gridStreamerStageExecutionFuture.isFailed()) {
            notifyFailure(gridStreamerStageExecutionFuture.stageName(), gridStreamerStageExecutionFuture.events(), gridStreamerStageExecutionFuture.error());
        }
        for (UUID uuid : gridStreamerStageExecutionFuture.executionNodeIds()) {
            if (!this.ctx.discovery().alive(uuid)) {
                gridStreamerStageExecutionFuture.onNodeLeft(uuid);
            }
        }
        return gridStreamerStageExecutionFuture;
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public GridKernalContext kernalContext() {
        return this.ctx;
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public void onFutureMapped(GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture) {
        if (this.atLeastOnce) {
            GridStreamerStageExecutionFuture putIfAbsent = this.stageFuts.putIfAbsent(gridStreamerStageExecutionFuture.id(), gridStreamerStageExecutionFuture);
            if (!$assertionsDisabled && putIfAbsent != null) {
                throw new AssertionError("Streamer execution future should be mapped only once: " + putIfAbsent);
            }
        }
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public void onFutureCompleted(GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture) {
        if (this.atLeastOnce) {
            if (gridStreamerStageExecutionFuture.rootExecution() && !gridStreamerStageExecutionFuture.isFailed() && this.sem != null) {
                this.sem.release();
            }
            GridStreamerStageExecutionFuture remove = this.stageFuts.remove(gridStreamerStageExecutionFuture.id());
            if (!$assertionsDisabled && gridStreamerStageExecutionFuture != remove) {
                throw new AssertionError("Invalid future in map [fut=" + gridStreamerStageExecutionFuture + ", old=" + remove + ']');
            }
            if (gridStreamerStageExecutionFuture.isFailed() || gridStreamerStageExecutionFuture.isCancelled()) {
                cancelChildStages(gridStreamerStageExecutionFuture);
            }
            if (gridStreamerStageExecutionFuture.rootExecution() && gridStreamerStageExecutionFuture.isFailed()) {
                failover(gridStreamerStageExecutionFuture);
            }
        }
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public StreamerEventRouter eventRouter() {
        return this.router;
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public void scheduleExecutions(GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture, Map<UUID, GridStreamerExecutionBatch> map) throws IgniteCheckedException {
        for (Map.Entry<UUID, GridStreamerExecutionBatch> entry : map.entrySet()) {
            UUID key = entry.getKey();
            GridStreamerExecutionBatch value = entry.getValue();
            if (this.ctx.localNodeId().equals(key)) {
                scheduleLocal(value);
            } else {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Sending batch execution request to remote node [nodeId=" + key + ", futId=" + value.futureId() + ", stageName=" + value.stageName() + ']');
                }
                sendWithRetries(key, createExecutionRequest(value));
                if (!this.ctx.discovery().alive(key)) {
                    gridStreamerStageExecutionFuture.onNodeLeft(key);
                }
            }
        }
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public void onUndeploy(ClassLoader classLoader) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Processing undeployment event undeployedLdr=" + classLoader + ']');
        }
        unwindUndeploys(classLoader, true);
    }

    @Override // org.apache.ignite.internal.processors.streamer.IgniteStreamerEx
    public void onQueryCompleted(long j, int i) {
        this.streamerMetrics.onQueryCompleted(j, i);
    }

    private void scheduleLocal(final GridStreamerExecutionBatch gridStreamerExecutionBatch) throws IgniteCheckedException {
        final IgniteUuid futureId = gridStreamerExecutionBatch.futureId();
        this.lock.readLock();
        try {
            if (this.stopping) {
                throw new IgniteCheckedException("Failed to schedule local batch execution (grid is stopping): " + gridStreamerExecutionBatch);
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Scheduling local batch execution [futId=" + futureId + ", stageName=" + gridStreamerExecutionBatch.stageName() + ']');
            }
            StreamerStageWrapper streamerStageWrapper = this.stages.get(gridStreamerExecutionBatch.stageName());
            if (streamerStageWrapper == null) {
                completeParentStage(this.ctx.localNodeId(), gridStreamerExecutionBatch.futureId(), new IgniteCheckedException("Failed to process streamer batch (stage was not found): " + gridStreamerExecutionBatch.stageName() + ']'));
                this.lock.readUnlock();
                return;
            }
            StreamerMetricsHolder streamerMetricsHolder = this.streamerMetrics;
            BatchWorker batchWorker = new BatchWorker(gridStreamerExecutionBatch, streamerStageWrapper, streamerMetricsHolder);
            BatchExecutionFuture completionFuture = batchWorker.completionFuture();
            BatchExecutionFuture putIfAbsent = this.batchFuts.putIfAbsent(futureId, completionFuture);
            if (!$assertionsDisabled && putIfAbsent != null) {
                throw new AssertionError("Duplicate batch execution future [old=" + putIfAbsent + ", batchFut=" + completionFuture + ']');
            }
            if (cancelled(futureId)) {
                this.batchFuts.remove(futureId, completionFuture);
                this.lock.readUnlock();
            } else {
                streamerMetricsHolder.onStageScheduled();
                this.execSvc.submit(batchWorker);
                completionFuture.listen(new CI1<IgniteInternalFuture<Object>>() { // from class: org.apache.ignite.internal.processors.streamer.IgniteStreamerImpl.3
                    @Override // org.apache.ignite.lang.IgniteInClosure
                    public void apply(IgniteInternalFuture<Object> igniteInternalFuture) {
                        BatchExecutionFuture batchExecutionFuture = (BatchExecutionFuture) igniteInternalFuture;
                        if (IgniteStreamerImpl.this.log.isDebugEnabled()) {
                            IgniteStreamerImpl.this.log.debug("Completed batch execution future: " + batchExecutionFuture);
                        }
                        IgniteStreamerImpl.this.batchFuts.remove(futureId, batchExecutionFuture);
                        if (batchExecutionFuture.isCancelled() || !IgniteStreamerImpl.this.atLeastOnce) {
                            return;
                        }
                        IgniteStreamerImpl.this.completeParentStage(IgniteStreamerImpl.this.ctx.localNodeId(), gridStreamerExecutionBatch.futureId(), batchExecutionFuture.error());
                    }
                });
                this.lock.readUnlock();
            }
        } catch (Throwable th) {
            this.lock.readUnlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeParentStage(UUID uuid, IgniteUuid igniteUuid, @Nullable Throwable th) {
        this.lock.readLock();
        try {
            if (this.stopping && !this.atLeastOnce) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to notify parent stage completion (node is stopping) [futId=" + igniteUuid + ", err=" + th + ']');
                }
                return;
            }
            UUID globalId = igniteUuid.globalId();
            if (this.ctx.localNodeId().equals(globalId)) {
                GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture = this.stageFuts.get(igniteUuid);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Notifying local execution future [completeNodeId=" + uuid + ", stageFut=" + gridStreamerStageExecutionFuture + ", err=" + th + ']');
                }
                if (gridStreamerStageExecutionFuture != null) {
                    gridStreamerStageExecutionFuture.onExecutionCompleted(uuid, th);
                }
            } else {
                try {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Sending completion response to remote node [nodeId=" + globalId + ", futId=" + igniteUuid + ", err=" + th + ']');
                    }
                    sendWithRetries(globalId, new GridStreamerResponse(igniteUuid, th != null ? this.ctx.config().getMarshaller().marshal(th) : null));
                } catch (IgniteCheckedException e) {
                    if (!e.hasCause(ClusterTopologyCheckedException.class)) {
                        this.log.error("Failed to complete parent stage [futId=" + igniteUuid + ", err=" + e + ']');
                    }
                }
            }
            this.lock.readUnlock();
        } finally {
            this.lock.readUnlock();
        }
    }

    private void cancelChildStages(GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture) {
        Iterator<UUID> it = gridStreamerStageExecutionFuture.childExecutions().keySet().iterator();
        while (it.hasNext()) {
            cancelChildStage(it.next(), gridStreamerStageExecutionFuture.id());
        }
    }

    private void cancelChildStage(UUID uuid, IgniteUuid igniteUuid) {
        if (!$assertionsDisabled && !this.atLeastOnce) {
            throw new AssertionError();
        }
        if (!uuid.equals(this.ctx.localNodeId())) {
            try {
                sendWithRetries(uuid, new GridStreamerCancelRequest(igniteUuid));
                return;
            } catch (IgniteCheckedException e) {
                if (e.hasCause(ClusterTopologyCheckedException.class)) {
                    return;
                }
                this.log.error("Failed to send streamer cancel request to remote node [nodeId=" + uuid + ", cancelledFutId=" + igniteUuid + ']', e);
                return;
            }
        }
        this.cancelledFutIds.add(igniteUuid);
        Iterator<Map.Entry<IgniteUuid, BatchExecutionFuture>> it = this.batchFuts.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<IgniteUuid, BatchExecutionFuture> next = it.next();
            if (next.getKey().equals(igniteUuid)) {
                BatchExecutionFuture value = next.getValue();
                try {
                    value.cancel();
                } catch (IgniteCheckedException e2) {
                    this.log.warning("Failed to cancel batch execution future [cancelledFutId=" + igniteUuid + ", batchFut=" + value + ']', e2);
                }
                it.remove();
            }
        }
    }

    private void failover(GridStreamerStageExecutionFuture gridStreamerStageExecutionFuture) {
        if (!$assertionsDisabled && !gridStreamerStageExecutionFuture.rootExecution()) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && gridStreamerStageExecutionFuture.error() == null) {
            throw new AssertionError();
        }
        if (gridStreamerStageExecutionFuture.failoverAttemptCount() >= this.c.getMaximumFailoverAttempts() || this.stopping) {
            if (this.sem != null) {
                this.sem.release();
            }
            notifyFailure(gridStreamerStageExecutionFuture.stageName(), gridStreamerStageExecutionFuture.events(), gridStreamerStageExecutionFuture.error());
        } else {
            try {
                addEvents0(null, gridStreamerStageExecutionFuture.failoverAttemptCount() + 1, 0L, null, Collections.singleton(this.ctx.localNodeId()), gridStreamerStageExecutionFuture.stageName(), gridStreamerStageExecutionFuture.events());
            } catch (IgniteInterruptedCheckedException e) {
                e.printStackTrace();
                if (!$assertionsDisabled) {
                    throw new AssertionError("Failover submit should never attempt to acquire semaphore: " + gridStreamerStageExecutionFuture + ']');
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyFailure(String str, Collection<Object> collection, Throwable th) {
        Iterator<StreamerFailureListener> it = this.failureLsnrs.iterator();
        while (it.hasNext()) {
            it.next().onFailure(str, collection, th);
        }
    }

    public boolean cancelled(IgniteUuid igniteUuid) {
        return this.cancelledFutIds.contains(igniteUuid);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processStreamerMessage(UUID uuid, Object obj) {
        if (obj instanceof GridStreamerExecutionRequest) {
            try {
                GridStreamerExecutionBatch executionBatch = executionBatch(uuid, (GridStreamerExecutionRequest) obj);
                try {
                    scheduleLocal(executionBatch);
                    return;
                } catch (IgniteCheckedException e) {
                    completeParentStage(this.ctx.localNodeId(), executionBatch.futureId(), e);
                    return;
                }
            } catch (IgniteCheckedException e2) {
                U.error(this.log, "Failed to unmarshal execution batch (was class undeployed?) [sndNodeId=" + uuid + ", msg=" + obj + ']', e2);
                return;
            }
        }
        if (obj instanceof GridStreamerCancelRequest) {
            cancelChildStage(this.ctx.localNodeId(), ((GridStreamerCancelRequest) obj).cancelledFutureId());
            return;
        }
        if (obj instanceof GridStreamerResponse) {
            GridStreamerResponse gridStreamerResponse = (GridStreamerResponse) obj;
            if (!$assertionsDisabled && !gridStreamerResponse.futureId().globalId().equals(this.ctx.localNodeId())) {
                throw new AssertionError("Wrong message received [res=" + gridStreamerResponse + ", sndNodeId=" + uuid + ", locNodeId=" + this.ctx.localNodeId() + ']');
            }
            Throwable th = null;
            if (gridStreamerResponse.errorBytes() != null) {
                try {
                    th = (Throwable) this.ctx.config().getMarshaller().unmarshal(gridStreamerResponse.errorBytes(), (ClassLoader) null);
                } catch (IgniteCheckedException e3) {
                    U.error(this.log, "Failed to unmarshal response.", e3);
                }
            }
            completeParentStage(uuid, gridStreamerResponse.futureId(), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unwindUndeploys(ClassLoader classLoader, boolean z) {
        if (!$assertionsDisabled && classLoader == null) {
            throw new AssertionError();
        }
        int i = 0;
        Iterator<E> it = this.streamerCtx.window().iterator();
        while (it.hasNext()) {
            if (classLoader.equals(it.next().getClass().getClassLoader())) {
                it.remove();
                i++;
            }
        }
        int i2 = 0;
        Iterator it2 = this.streamerCtx.localSpace().entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry entry = (Map.Entry) it2.next();
            if (classLoader.equals(entry.getKey().getClass().getClassLoader()) || classLoader.equals(entry.getValue().getClass().getClassLoader())) {
                it2.remove();
                i2++;
            }
        }
        if (z && this.log.isInfoEnabled()) {
            if (i > 0 || i2 > 0) {
                this.log.info("Undeployed all streamer events (if any) for obsolete class loader [undeployedClsLdr=" + classLoader + ", undeployWindowCnt=" + i + ", undeploySpaceCnt=" + i2 + ']');
            }
        }
    }

    private Message createExecutionRequest(GridStreamerExecutionBatch gridStreamerExecutionBatch) throws IgniteCheckedException {
        boolean enabled = this.ctx.deploy().enabled();
        byte[] marshal = this.ctx.config().getMarshaller().marshal(gridStreamerExecutionBatch);
        if (!enabled) {
            return new GridStreamerExecutionRequest(true, marshal, null, null, null, null, null);
        }
        StreamerPda streamerPda = new StreamerPda(gridStreamerExecutionBatch.events());
        GridDeployment deploy = this.ctx.deploy().deploy(streamerPda.deployClass(), streamerPda.classLoader());
        if (deploy == null) {
            throw new IgniteCheckedException("Failed to get deployment for batch request [batch=" + gridStreamerExecutionBatch + ", pda=" + streamerPda + ']');
        }
        return new GridStreamerExecutionRequest(false, marshal, deploy.deployMode(), deploy.sampleClassName(), deploy.userVersion(), deploy.participants(), deploy.classLoaderId());
    }

    private GridStreamerExecutionBatch executionBatch(UUID uuid, GridStreamerExecutionRequest gridStreamerExecutionRequest) throws IgniteCheckedException {
        GridDeployment gridDeployment = null;
        if (!gridStreamerExecutionRequest.forceLocalDeployment()) {
            gridDeployment = this.ctx.deploy().getGlobalDeployment(gridStreamerExecutionRequest.deploymentMode(), gridStreamerExecutionRequest.sampleClassName(), gridStreamerExecutionRequest.sampleClassName(), gridStreamerExecutionRequest.userVersion(), uuid, gridStreamerExecutionRequest.classLoaderId(), gridStreamerExecutionRequest.loaderParticipants(), null);
            if (gridDeployment == null) {
                throw new IgniteCheckedException("Failed to obtain global deployment based on deployment metadata [nodeId=" + uuid + ", req=" + gridStreamerExecutionRequest + ']');
            }
        }
        GridStreamerExecutionBatch gridStreamerExecutionBatch = (GridStreamerExecutionBatch) this.ctx.config().getMarshaller().unmarshal(gridStreamerExecutionRequest.batchBytes(), gridDeployment != null ? gridDeployment.classLoader() : null);
        gridStreamerExecutionBatch.deployment(gridDeployment);
        return gridStreamerExecutionBatch;
    }

    private void sendWithRetries(UUID uuid, Message message) throws IgniteCheckedException {
        for (int i = 0; i < 3; i++) {
            try {
                this.ctx.io().send(uuid, this.topic, message, GridIoPolicy.SYSTEM_POOL);
                return;
            } catch (IgniteCheckedException e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Failed to send message to remote node (will retry) [dstNodeId=" + uuid + ", msg=" + message + ", err=" + e + ']');
                }
                if (!this.ctx.discovery().alive(uuid)) {
                    throw new ClusterTopologyCheckedException("Failed to send message (destination node left grid): " + uuid);
                }
                if (i == 2) {
                    throw e;
                }
                U.sleep(1000L);
            }
        }
    }

    @Override // java.io.Externalizable
    public void writeExternal(ObjectOutput objectOutput) throws IOException {
        objectOutput.writeObject(this.ctx);
        U.writeString(objectOutput, this.name);
    }

    @Override // java.io.Externalizable
    public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
        this.ctx = (GridKernalContext) objectInput.readObject();
        this.name = U.readString(objectInput);
    }

    protected Object readResolve() {
        return this.ctx.stream().streamer(this.name);
    }

    public String toString() {
        return S.toString(IgniteStreamerImpl.class, this);
    }

    static {
        $assertionsDisabled = !IgniteStreamerImpl.class.desiredAssertionStatus();
    }
}
