package org.apache.samza.operators.impl;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.Context;
import org.apache.samza.context.InternalTaskContext;
import org.apache.samza.context.TaskContext;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.scheduler.CallbackScheduler;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.system.DrainMessage;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/OperatorImpl.class */
public abstract class OperatorImpl<M, RM> {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorImpl.class);
    private static final String METRICS_GROUP = OperatorImpl.class.getName();
    private boolean initialized;
    private boolean closed;
    private HighResolutionClock highResClock;
    private Counter numMessage;
    private Timer handleMessageNs;
    private Timer handleTimerNs;
    private TaskName taskName;
    Set<OperatorImpl<RM, ?>> registeredOperators;
    Set<OperatorImpl<?, M>> prevOperators;
    Set<SystemStream> inputStreams;
    private TaskModel taskModel;
    private EndOfStreamStates eosStates;
    private DrainStates drainStates;
    private WatermarkStates watermarkStates;
    private CallbackScheduler callbackScheduler;
    private ControlMessageSender controlMessageSender;
    private int elasticityFactor;
    private long currentWatermark = -1;
    private long outputWatermark = -1;
    private boolean usedInCurrentTask = false;

    public final void init(InternalTaskContext internalTaskContext) {
        Context context = internalTaskContext.getContext();
        String opImplId = getOpImplId();
        if (this.initialized) {
            throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opImplId));
        }
        if (this.closed) {
            throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opImplId));
        }
        this.highResClock = createHighResClock(context.getJobContext().getConfig());
        this.registeredOperators = new LinkedHashSet();
        this.prevOperators = new LinkedHashSet();
        this.inputStreams = new LinkedHashSet();
        MetricsRegistry containerMetricsRegistry = context.getContainerContext().getContainerMetricsRegistry();
        this.numMessage = containerMetricsRegistry.newCounter(METRICS_GROUP, opImplId + "-messages");
        this.handleMessageNs = containerMetricsRegistry.newTimer(METRICS_GROUP, opImplId + "-handle-message-ns");
        this.handleTimerNs = containerMetricsRegistry.newTimer(METRICS_GROUP, opImplId + "-handle-timer-ns");
        TaskContext taskContext = context.getTaskContext();
        this.taskName = taskContext.getTaskModel().getTaskName();
        this.eosStates = (EndOfStreamStates) internalTaskContext.fetchObject(EndOfStreamStates.class.getName());
        this.watermarkStates = (WatermarkStates) internalTaskContext.fetchObject(WatermarkStates.class.getName());
        this.drainStates = (DrainStates) internalTaskContext.fetchObject(DrainStates.class.getName());
        this.controlMessageSender = new ControlMessageSender(internalTaskContext.getStreamMetadataCache());
        this.taskModel = taskContext.getTaskModel();
        this.callbackScheduler = taskContext.getCallbackScheduler();
        handleInit(context);
        this.elasticityFactor = new JobConfig(context.getJobContext().getConfig()).getElasticityFactor();
        this.initialized = true;
    }

    protected abstract void handleInit(Context context);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerNextOperator(OperatorImpl<RM, ?> operatorImpl) {
        if (!this.initialized) {
            throw new IllegalStateException(String.format("Attempted to register next operator before initializing operator %s.", getOpImplId()));
        }
        this.registeredOperators.add(operatorImpl);
        operatorImpl.registerPrevOperator(this);
    }

    void registerPrevOperator(OperatorImpl<?, M> operatorImpl) {
        this.prevOperators.add(operatorImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerInputStream(SystemStream systemStream) {
        this.inputStreams.add(systemStream);
        this.usedInCurrentTask = this.usedInCurrentTask || this.taskModel.getSystemStreamPartitions().stream().anyMatch(systemStreamPartition -> {
            return systemStreamPartition.getSystemStream().equals(systemStream);
        });
    }

    public final CompletionStage<Void> onMessageAsync(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        this.numMessage.inc();
        long nanoTime = this.highResClock.nanoTime();
        try {
            CompletionStage thenCompose = handleMessageAsync(m, messageCollector, taskCoordinator).thenCompose(collection -> {
                this.handleMessageNs.update(this.highResClock.nanoTime() - nanoTime);
                return CompletableFuture.allOf((CompletableFuture[]) collection.stream().flatMap(obj -> {
                    return this.registeredOperators.stream().map(operatorImpl -> {
                        return operatorImpl.onMessageAsync(obj, messageCollector, taskCoordinator);
                    });
                }).toArray(i -> {
                    return new CompletableFuture[i];
                }));
            });
            WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
            if (watermarkFn == null) {
                return thenCompose;
            }
            Long outputWatermark = watermarkFn.getOutputWatermark();
            return thenCompose.thenCompose(r9 -> {
                return propagateWatermark(outputWatermark, messageCollector, taskCoordinator);
            });
        } catch (ClassCastException e) {
            throw new SamzaException(String.format("Error applying operator %s (created at %s) to its input message. Expected input message to be of type %s, but found it to be of type %s. Are Serdes for the inputs to this operator configured correctly?", getOpImplId(), getOperatorSpec().getSourceLocation(), e.getMessage().replaceFirst(".* cannot be cast to ", DirIndex.ROOT_DIR_NAME), e.getMessage().replaceFirst(" cannot be cast to .*", DirIndex.ROOT_DIR_NAME)), e);
        }
    }

    protected abstract CompletionStage<Collection<RM>> handleMessageAsync(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator);

    public final CompletionStage<Void> onTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        long nanoTime = this.highResClock.nanoTime();
        Collection<RM> handleTimer = handleTimer(messageCollector, taskCoordinator);
        this.handleTimerNs.update(this.highResClock.nanoTime() - nanoTime);
        return CompletableFuture.allOf((CompletableFuture[]) handleTimer.stream().flatMap(obj -> {
            return this.registeredOperators.stream().map(operatorImpl -> {
                return operatorImpl.onMessageAsync(obj, messageCollector, taskCoordinator);
            });
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).thenCompose(r7 -> {
            return CompletableFuture.allOf((CompletableFuture[]) this.registeredOperators.stream().map(operatorImpl -> {
                return operatorImpl.onTimer(messageCollector, taskCoordinator);
            }).toArray(i2 -> {
                return new CompletableFuture[i2];
            }));
        });
    }

    protected Collection<RM> handleTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    private boolean shouldTaskBroadcastToOtherPartitions(SystemStreamPartition systemStreamPartition) {
        return this.elasticityFactor <= 1 || this.taskModel.getSystemStreamPartitions().stream().filter(systemStreamPartition2 -> {
            return systemStreamPartition.getSystemStream().equals(systemStreamPartition2.getSystemStream()) && systemStreamPartition.getPartition().equals(systemStreamPartition2.getPartition()) && systemStreamPartition2.getKeyBucket() == 0;
        }).count() > 0;
    }

    public final CompletionStage<Void> aggregateEndOfStream(EndOfStreamMessage endOfStreamMessage, SystemStreamPartition systemStreamPartition, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.info("Received end-of-stream message from task {} in {}", endOfStreamMessage.getTaskName(), systemStreamPartition);
        this.eosStates.update(endOfStreamMessage, systemStreamPartition);
        SystemStream systemStream = systemStreamPartition.getSystemStream();
        CompletionStage<Void> completedFuture = CompletableFuture.completedFuture(null);
        if (this.eosStates.isEndOfStream(systemStream)) {
            LOG.info("Input {} reaches the end for task {}", systemStream.toString(), this.taskName.getTaskName());
            if (endOfStreamMessage.getTaskName() != null && shouldTaskBroadcastToOtherPartitions(systemStreamPartition)) {
                this.controlMessageSender.broadcastToOtherPartitions(new EndOfStreamMessage(), systemStreamPartition, messageCollector);
            }
            completedFuture = onEndOfStream(messageCollector, taskCoordinator).thenAccept(r6 -> {
                if (this.eosStates.allEndOfStream()) {
                    LOG.info("All input streams have reached the end for task {}", this.taskName.getTaskName());
                    taskCoordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
                    taskCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
                }
            });
        }
        return completedFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public CompletionStage<Void> onEndOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        CompletionStage completedFuture = CompletableFuture.completedFuture(null);
        if (this.inputStreams.stream().allMatch(systemStream -> {
            return this.eosStates.isEndOfStream(systemStream);
        })) {
            completedFuture = CompletableFuture.allOf((CompletableFuture[]) handleEndOfStream(messageCollector, taskCoordinator).stream().flatMap(obj -> {
                return this.registeredOperators.stream().map(operatorImpl -> {
                    return operatorImpl.onMessageAsync(obj, messageCollector, taskCoordinator);
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).thenCompose((Function<? super Void, ? extends CompletionStage<U>>) r7 -> {
                return CompletableFuture.allOf((CompletableFuture[]) this.registeredOperators.stream().map(operatorImpl -> {
                    return operatorImpl.onEndOfStream(messageCollector, taskCoordinator);
                }).toArray(i2 -> {
                    return new CompletableFuture[i2];
                }));
            });
        }
        return completedFuture;
    }

    protected Collection<RM> handleEndOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    protected Collection<RM> handleDrain(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    public final CompletionStage<Void> aggregateDrainMessages(DrainMessage drainMessage, SystemStreamPartition systemStreamPartition, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.info("Received drain message from task {} in {}", drainMessage.getTaskName(), systemStreamPartition);
        this.drainStates.update(drainMessage, systemStreamPartition);
        SystemStream systemStream = systemStreamPartition.getSystemStream();
        CompletionStage<Void> completedFuture = CompletableFuture.completedFuture(null);
        if (this.drainStates.isDrained(systemStream)) {
            LOG.info("Input {} is drained for task {}", systemStream.toString(), this.taskName.getTaskName());
            if (drainMessage.getTaskName() != null) {
                this.controlMessageSender.broadcastToOtherPartitions(new DrainMessage(drainMessage.getRunId()), systemStreamPartition, messageCollector);
            }
            completedFuture = onDrainOfStream(messageCollector, taskCoordinator).thenAccept(r6 -> {
                if (this.drainStates.areAllStreamsDrained()) {
                    LOG.info("All input streams have been drained for task {}. Requesting shutdown.", this.taskName.getTaskName());
                    taskCoordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
                    taskCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
                }
            });
        }
        return completedFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public CompletionStage<Void> onDrainOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        CompletionStage completedFuture = CompletableFuture.completedFuture(null);
        if (this.inputStreams.stream().allMatch(systemStream -> {
            return this.drainStates.isDrained(systemStream);
        })) {
            completedFuture = CompletableFuture.allOf((CompletableFuture[]) handleDrain(messageCollector, taskCoordinator).stream().flatMap(obj -> {
                return this.registeredOperators.stream().map(operatorImpl -> {
                    return operatorImpl.onMessageAsync(obj, messageCollector, taskCoordinator);
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).thenCompose((Function<? super Void, ? extends CompletionStage<U>>) r7 -> {
                return CompletableFuture.allOf((CompletableFuture[]) this.registeredOperators.stream().map(operatorImpl -> {
                    return operatorImpl.onDrainOfStream(messageCollector, taskCoordinator);
                }).toArray(i2 -> {
                    return new CompletableFuture[i2];
                }));
            });
        }
        return completedFuture;
    }

    public final CompletionStage<Void> aggregateWatermark(WatermarkMessage watermarkMessage, SystemStreamPartition systemStreamPartition, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.debug("Received watermark {} from {}", Long.valueOf(watermarkMessage.getTimestamp()), systemStreamPartition);
        this.watermarkStates.update(watermarkMessage, systemStreamPartition);
        long watermark = this.watermarkStates.getWatermark(systemStreamPartition.getSystemStream());
        CompletionStage<Void> completedFuture = CompletableFuture.completedFuture(null);
        if (this.currentWatermark < watermark) {
            LOG.debug("Got watermark {} from stream {}", Long.valueOf(watermark), systemStreamPartition.getSystemStream());
            if (watermarkMessage.getTaskName() != null && shouldTaskBroadcastToOtherPartitions(systemStreamPartition)) {
                this.controlMessageSender.broadcastToOtherPartitions(new WatermarkMessage(watermark), systemStreamPartition, messageCollector);
            }
            completedFuture = onWatermark(watermark, messageCollector, taskCoordinator).thenAccept(r9 -> {
                this.watermarkStates.updateAggregateMetric(systemStreamPartition, watermark);
            });
        }
        return completedFuture;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletionStage<Void> onWatermark(long j, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        Collection<RM> handleWatermark;
        Long valueOf;
        long longValue = this.prevOperators.isEmpty() ? j : ((Long) this.prevOperators.stream().map(operatorImpl -> {
            return Long.valueOf(operatorImpl.getOutputWatermark());
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        }).get()).longValue();
        CompletionStage completedFuture = CompletableFuture.completedFuture(null);
        if (this.currentWatermark < longValue) {
            this.currentWatermark = longValue;
            LOG.trace("Advance input watermark to {} in operator {}", Long.valueOf(this.currentWatermark), getOpImplId());
            WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
            if (watermarkFn != null) {
                handleWatermark = watermarkFn.processWatermark(this.currentWatermark);
                valueOf = watermarkFn.getOutputWatermark();
            } else {
                handleWatermark = handleWatermark(this.currentWatermark, messageCollector, taskCoordinator);
                valueOf = Long.valueOf(this.currentWatermark);
            }
            if (!handleWatermark.isEmpty()) {
                completedFuture = CompletableFuture.allOf((CompletableFuture[]) handleWatermark.stream().flatMap(obj -> {
                    return this.registeredOperators.stream().map(operatorImpl2 -> {
                        return operatorImpl2.onMessageAsync(obj, messageCollector, taskCoordinator);
                    });
                }).toArray(i -> {
                    return new CompletableFuture[i];
                }));
            }
            Long l = valueOf;
            completedFuture = completedFuture.thenCompose(r9 -> {
                return propagateWatermark(l, messageCollector, taskCoordinator);
            });
        }
        return completedFuture;
    }

    private CompletionStage<Void> propagateWatermark(Long l, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        if (l != null) {
            if (this.outputWatermark < l.longValue()) {
                this.outputWatermark = l.longValue();
                LOG.debug("Advance output watermark to {} in operator {}", Long.valueOf(this.outputWatermark), getOpImplId());
                completedFuture = CompletableFuture.allOf((CompletableFuture[]) this.registeredOperators.stream().map(operatorImpl -> {
                    return operatorImpl.onWatermark(this.outputWatermark, messageCollector, taskCoordinator);
                }).toArray(i -> {
                    return new CompletableFuture[i];
                }));
            } else if (this.outputWatermark > l.longValue()) {
                LOG.warn("Ignore watermark {} that is smaller than the previous watermark {}.", l, Long.valueOf(this.outputWatermark));
            }
        }
        return completedFuture;
    }

    protected Collection<RM> handleWatermark(long j, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    final long getInputWatermark() {
        return this.currentWatermark;
    }

    final long getOutputWatermark() {
        if (this.usedInCurrentTask) {
            return this.outputWatermark;
        }
        return Long.MAX_VALUE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K> Scheduler<K> createOperatorScheduler() {
        return new Scheduler<K>() { // from class: org.apache.samza.operators.impl.OperatorImpl.1
            public void schedule(K k, long j) {
                OperatorImpl.this.callbackScheduler.scheduleCallback(k, j, (obj, messageCollector, taskCoordinator) -> {
                    ScheduledFunction scheduledFn = OperatorImpl.this.getOperatorSpec().getScheduledFn();
                    if (scheduledFn == null) {
                        throw new SamzaException(String.format("Operator %s id %s (created at %s) must implement ScheduledFunction to use system timer.", OperatorImpl.this.getOperatorSpec().getOpCode().name(), OperatorImpl.this.getOpImplId(), OperatorImpl.this.getOperatorSpec().getSourceLocation()));
                    }
                    Collection onCallback = scheduledFn.onCallback(k, j);
                    if (onCallback.isEmpty()) {
                        return;
                    }
                    CompletableFuture.allOf((CompletableFuture[]) onCallback.stream().flatMap(obj -> {
                        return OperatorImpl.this.registeredOperators.stream().map(operatorImpl -> {
                            return operatorImpl.onMessageAsync(obj, messageCollector, taskCoordinator);
                        });
                    }).toArray(i -> {
                        return new CompletableFuture[i];
                    })).join();
                });
            }

            public void delete(K k) {
                OperatorImpl.this.callbackScheduler.deleteCallback(k);
            }
        };
    }

    public void close() {
        if (this.closed) {
            throw new IllegalStateException(String.format("Attempted to close Operator %s more than once.", getOpImplId()));
        }
        handleClose();
        this.closed = true;
    }

    protected abstract void handleClose();

    protected abstract OperatorSpec<M, RM> getOperatorSpec();

    /* JADX INFO: Access modifiers changed from: protected */
    public String getOpImplId() {
        return getOperatorSpec().getOpId();
    }

    @VisibleForTesting
    final void onMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        onMessageAsync(m, messageCollector, taskCoordinator).toCompletableFuture().join();
    }

    @VisibleForTesting
    final Collection<RM> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return handleMessageAsync(m, messageCollector, taskCoordinator).toCompletableFuture().join();
    }

    private HighResolutionClock createHighResClock(Config config) {
        MetricsConfig metricsConfig = new MetricsConfig(config);
        return (metricsConfig.getMetricsTimerEnabled() && metricsConfig.getMetricsTimerDebugEnabled()) ? System::nanoTime : () -> {
            return 0L;
        };
    }
}
