package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.InstructionResult;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import java.lang.invoke.MethodHandles;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/impl/AbstractIncomingInstructionStream.class */
public abstract class AbstractIncomingInstructionStream<IN, OUT> extends FlowControlledStream<IN, OUT> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final InstructionAck NO_HANDLER_FOR_INSTRUCTION = InstructionAck.newBuilder().setSuccess(false).setError(ErrorMessage.newBuilder().setErrorCode(ErrorCategory.UNSUPPORTED_INSTRUCTION.errorCode()).setMessage("No handler for instruction").m121build()).m215build();
    private static final InstructionAck SUCCESS_ACK = InstructionAck.newBuilder().setSuccess(true).m215build();
    private final Consumer<Throwable> disconnectHandler;
    private final Consumer<CallStreamObserver<OUT>> beforeStartHandler;
    private final String clientId;
    private CallStreamObserver<OUT> instructionsForPlatform;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractIncomingInstructionStream(String str, int i, int i2, Consumer<Throwable> consumer, Consumer<CallStreamObserver<OUT>> consumer2) {
        super(str, i, i2);
        this.clientId = str;
        this.disconnectHandler = consumer;
        this.beforeStartHandler = consumer2;
    }

    public void onNext(IN in) {
        InstructionHandler<IN, OUT> handler = getHandler(in);
        String instructionId = getInstructionId(in);
        if (handler != null) {
            if (instructionId != null && !instructionId.isEmpty()) {
                this.instructionsForPlatform.onNext(buildAckMessage(SUCCESS_ACK));
            }
            handler.handle(in, new ForwardingReplyChannel(getInstructionId(in), this.clientId, this.instructionsForPlatform, this::buildResultMessage, this::markConsumed));
            return;
        }
        logger.debug("Unsupported instruction received: {}", in);
        markConsumed();
        if (instructionId == null || instructionId.isEmpty()) {
            return;
        }
        this.instructionsForPlatform.onNext(buildAckMessage(NO_HANDLER_FOR_INSTRUCTION));
    }

    protected abstract OUT buildAckMessage(InstructionAck instructionAck);

    protected Optional<OUT> buildResultMessage(InstructionResult instructionResult) {
        return Optional.empty();
    }

    protected abstract String getInstructionId(IN in);

    protected abstract InstructionHandler<IN, OUT> getHandler(IN in);

    public void onCompleted() {
        logger.debug("Stream completed from server side");
        if (unregisterOutboundStream(this.instructionsForPlatform)) {
            logger.debug("Instruction stream disconnected. Scheduling reconnect");
            this.disconnectHandler.accept(new StreamUnexpectedlyCompletedException("Stream unexpectedly completed by server"));
            this.instructionsForPlatform.onCompleted();
        }
    }

    public void onError(Throwable th) {
        logger.debug("Error received", th);
        if (unregisterOutboundStream(this.instructionsForPlatform)) {
            logger.debug("Instruction stream disconnected. Scheduling reconnect");
            this.disconnectHandler.accept(th);
            this.instructionsForPlatform.onCompleted();
        }
    }

    @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
    public void beforeStart(ClientCallStreamObserver<OUT> clientCallStreamObserver) {
        SynchronizedRequestStream synchronizedRequestStream = new SynchronizedRequestStream(clientCallStreamObserver);
        super.beforeStart(synchronizedRequestStream);
        this.instructionsForPlatform = synchronizedRequestStream;
        this.beforeStartHandler.accept(getInstructionsForPlatform());
    }

    public ClientCallStreamObserver<OUT> getInstructionsForPlatform() {
        return outboundStream();
    }

    protected abstract boolean unregisterOutboundStream(CallStreamObserver<OUT> callStreamObserver);
}
