package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;

/* loaded from: input_file:org/apache/samza/operators/impl/PartialJoinOperatorImpl.class */
class PartialJoinOperatorImpl<K, M, OM, JM> extends OperatorImpl<M, JM> {
    private final JoinOperatorSpec<K, M, OM, JM> joinOpSpec;
    private final boolean isLeftSide;
    private final PartialJoinFunction<K, M, OM, JM> thisPartialJoinFn;
    private final PartialJoinFunction<K, OM, M, JM> otherPartialJoinFn;
    private final long ttlMs;
    private final Clock clock;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartialJoinOperatorImpl(JoinOperatorSpec<K, M, OM, JM> joinOperatorSpec, boolean z, PartialJoinFunction<K, M, OM, JM> partialJoinFunction, PartialJoinFunction<K, OM, M, JM> partialJoinFunction2, Config config, TaskContext taskContext, Clock clock) {
        this.joinOpSpec = joinOperatorSpec;
        this.isLeftSide = z;
        this.thisPartialJoinFn = partialJoinFunction;
        this.otherPartialJoinFn = partialJoinFunction2;
        this.ttlMs = joinOperatorSpec.getTtlMs();
        this.clock = clock;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Config config, TaskContext taskContext) {
        this.thisPartialJoinFn.init(config, taskContext);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<JM> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        try {
            KeyValueStore<K, TimestampedValue<M>> state = this.thisPartialJoinFn.getState();
            KeyValueStore<K, TimestampedValue<OM>> state2 = this.otherPartialJoinFn.getState();
            K key = this.thisPartialJoinFn.getKey(m);
            state.put(key, new TimestampedValue(m, this.clock.currentTimeMillis()));
            TimestampedValue timestampedValue = (TimestampedValue) state2.get(key);
            return (timestampedValue == null || timestampedValue.getTimestamp() <= this.clock.currentTimeMillis() - this.ttlMs) ? Collections.emptyList() : Collections.singletonList(this.thisPartialJoinFn.apply(m, timestampedValue.getValue()));
        } catch (Exception e) {
            throw new SamzaException("Error handling message in PartialJoinOperatorImpl " + getOpImplId(), e);
        }
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleClose() {
        this.thisPartialJoinFn.close();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<M, JM> getOperatorSpec() {
        return this.joinOpSpec;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.samza.operators.impl.OperatorImpl
    public String getOpImplId() {
        return this.isLeftSide ? this.joinOpSpec.getLeftOpId() : this.joinOpSpec.getRightOpId();
    }
}
