package org.apache.samza.operators.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.Counter;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/PartialJoinOperatorImpl.class */
class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class);
    private final PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec;
    private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
    private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
    private final long ttlMs;
    private final Clock clock;
    private Counter keysRemoved;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOperatorSpec, Config config, TaskContext taskContext, Clock clock) {
        this.partialJoinOpSpec = partialJoinOperatorSpec;
        this.thisPartialJoinFn = partialJoinOperatorSpec.getThisPartialJoinFn();
        this.otherPartialJoinFn = partialJoinOperatorSpec.getOtherPartialJoinFn();
        this.ttlMs = partialJoinOperatorSpec.getTtlMs();
        this.clock = clock;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Config config, TaskContext taskContext) {
        this.keysRemoved = taskContext.getMetricsRegistry().newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed");
        this.thisPartialJoinFn.init(config, taskContext);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<RM> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        K key = this.thisPartialJoinFn.getKey(m);
        this.thisPartialJoinFn.getState().put(key, new PartialJoinFunction.PartialJoinMessage(m, this.clock.currentTimeMillis()));
        PartialJoinFunction.PartialJoinMessage partialJoinMessage = (PartialJoinFunction.PartialJoinMessage) this.otherPartialJoinFn.getState().get(key);
        return (partialJoinMessage == null || partialJoinMessage.getReceivedTimeMs() <= this.clock.currentTimeMillis() - this.ttlMs) ? Collections.emptyList() : Collections.singletonList(this.thisPartialJoinFn.apply(m, partialJoinMessage.getMessage()));
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<RM> handleTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        long currentTimeMillis = this.clock.currentTimeMillis();
        KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<M>> state = this.thisPartialJoinFn.getState();
        KeyValueIterator all = state.all();
        ArrayList arrayList = new ArrayList();
        while (all.hasNext()) {
            Entry entry = (Entry) all.next();
            if (((PartialJoinFunction.PartialJoinMessage) entry.getValue()).getReceivedTimeMs() >= currentTimeMillis - this.ttlMs) {
                break;
            }
            arrayList.add(entry.getKey());
        }
        all.close();
        state.deleteAll(arrayList);
        this.keysRemoved.inc(arrayList.size());
        return Collections.emptyList();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<RM> getOperatorSpec() {
        return this.partialJoinOpSpec;
    }
}
