package com.hazelcast.jet.impl.connector;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamJmsP.class */
public class StreamJmsP<T> extends AbstractProcessor {
    public static final int PREFERRED_LOCAL_PARALLELISM = 4;
    private final Connection connection;
    private final FunctionEx<? super Connection, ? extends Session> newSessionFn;
    private final FunctionEx<? super Session, ? extends MessageConsumer> consumerFn;
    private final ConsumerEx<? super Session> flushFn;
    private final FunctionEx<? super Message, ? extends T> projectionFn;
    private final EventTimeMapper<? super T> eventTimeMapper;
    private Session session;
    private MessageConsumer consumer;
    private Traverser<Object> traverser;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamJmsP$Supplier.class */
    public static final class Supplier<T> implements ProcessorSupplier {
        static final long serialVersionUID = 1;
        private final SupplierEx<? extends Connection> newConnectionFn;
        private final FunctionEx<? super Connection, ? extends Session> sessionFn;
        private final FunctionEx<? super Session, ? extends MessageConsumer> consumerFn;
        private final ConsumerEx<? super Session> flushFn;
        private final FunctionEx<? super Message, ? extends T> projectionFn;
        private final EventTimePolicy<? super T> eventTimePolicy;
        private transient Connection connection;

        private Supplier(SupplierEx<? extends Connection> supplierEx, FunctionEx<? super Connection, ? extends Session> functionEx, FunctionEx<? super Session, ? extends MessageConsumer> functionEx2, ConsumerEx<? super Session> consumerEx, FunctionEx<? super Message, ? extends T> functionEx3, EventTimePolicy<? super T> eventTimePolicy) {
            this.newConnectionFn = supplierEx;
            this.sessionFn = functionEx;
            this.consumerFn = functionEx2;
            this.flushFn = consumerEx;
            this.projectionFn = functionEx3;
            this.eventTimePolicy = eventTimePolicy;
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void init(@Nonnull ProcessorSupplier.Context context) throws Exception {
            this.connection = this.newConnectionFn.get();
            this.connection.start();
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void close(@Nullable Throwable th) throws Exception {
            if (this.connection != null) {
                this.connection.close();
            }
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        @Nonnull
        public Collection<? extends Processor> get(int i) {
            return (Collection) IntStream.range(0, i).mapToObj(i2 -> {
                return new StreamJmsP(this.connection, this.sessionFn, this.consumerFn, this.flushFn, this.projectionFn, this.eventTimePolicy);
            }).collect(Collectors.toList());
        }
    }

    StreamJmsP(Connection connection, FunctionEx<? super Connection, ? extends Session> functionEx, FunctionEx<? super Session, ? extends MessageConsumer> functionEx2, ConsumerEx<? super Session> consumerEx, FunctionEx<? super Message, ? extends T> functionEx3, EventTimePolicy<? super T> eventTimePolicy) {
        this.connection = connection;
        this.newSessionFn = functionEx;
        this.consumerFn = functionEx2;
        this.flushFn = consumerEx;
        this.projectionFn = functionEx3;
        this.eventTimeMapper = new EventTimeMapper<>(eventTimePolicy);
        this.eventTimeMapper.addPartitions(1);
    }

    @Nonnull
    public static <T> ProcessorSupplier supplier(@Nonnull SupplierEx<? extends Connection> supplierEx, @Nonnull FunctionEx<? super Connection, ? extends Session> functionEx, @Nonnull FunctionEx<? super Session, ? extends MessageConsumer> functionEx2, @Nonnull ConsumerEx<? super Session> consumerEx, @Nonnull FunctionEx<? super Message, ? extends T> functionEx3, @Nonnull EventTimePolicy<? super T> eventTimePolicy) {
        Util.checkSerializable(supplierEx, "newConnectionFn");
        Util.checkSerializable(functionEx, "newSessionFn");
        Util.checkSerializable(functionEx2, "consumerFn");
        Util.checkSerializable(consumerEx, "flushFn");
        Util.checkSerializable(functionEx3, "projectionFn");
        return new Supplier(supplierEx, functionEx, functionEx2, consumerEx, functionEx3, eventTimePolicy);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) {
        this.session = this.newSessionFn.apply(this.connection);
        this.consumer = this.consumerFn.apply(this.session);
        Traverser traverser = () -> {
            return (Message) Util.uncheckCall(() -> {
                return this.consumer.receiveNoWait();
            });
        };
        this.traverser = traverser.flatMap(message -> {
            return this.eventTimeMapper.flatMapEvent(this.projectionFn.apply(message), 0, handleJmsTimestamp(message));
        }).peek(obj -> {
            this.flushFn.accept(this.session);
        });
    }

    private static long handleJmsTimestamp(Message message) {
        try {
            if (message.getJMSTimestamp() == 0) {
                return Long.MIN_VALUE;
            }
            return message.getJMSTimestamp();
        } catch (JMSException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        emitFromTraverser(this.traverser);
        return false;
    }

    @Override // com.hazelcast.jet.core.Processor
    public void close() throws Exception {
        this.consumer.close();
        this.session.close();
    }
}
