package akka.persistence;

import akka.actor.ActorKilledException;
import akka.persistence.JournalProtocol;
import akka.persistence.Processor;
import akka.persistence.Recovery;
import scala.Function1;
import scala.PartialFunction;
import scala.Predef$;
import scala.StringContext;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: Processor.scala */
/* loaded from: input_file:akka/persistence/Processor$$anon$2.class */
public final class Processor$$anon$2 implements Recovery.State {
    private boolean batching;
    private final /* synthetic */ Processor $outer;

    @Override // akka.persistence.Recovery.State
    public void process(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Recovery.State.Cclass.process(this, partialFunction, obj);
    }

    @Override // akka.persistence.Recovery.State
    public void processPersistent(PartialFunction<Object, BoxedUnit> partialFunction, Persistent persistent) {
        Recovery.State.Cclass.processPersistent(this, partialFunction, persistent);
    }

    @Override // akka.persistence.Recovery.State
    public void updateLastSequenceNr(Persistent persistent) {
        Recovery.State.Cclass.updateLastSequenceNr(this, persistent);
    }

    @Override // akka.persistence.Recovery.State
    public void updateLastSequenceNr(long j) {
        Recovery.State.Cclass.updateLastSequenceNr(this, j);
    }

    @Override // akka.persistence.Recovery.State
    public void withCurrentPersistent(Persistent persistent, Function1<Persistent, BoxedUnit> function1) {
        Recovery.State.Cclass.withCurrentPersistent(this, persistent, function1);
    }

    @Override // akka.persistence.Recovery.State
    public void recordFailure(Throwable th) {
        Recovery.State.Cclass.recordFailure(this, th);
    }

    public String toString() {
        return "processing";
    }

    private boolean batching() {
        return this.batching;
    }

    private void batching_$eq(boolean z) {
        this.batching = z;
    }

    @Override // akka.persistence.Recovery.State
    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        if (obj instanceof Recover) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        if (obj instanceof JournalProtocol.ReplayedMessage) {
            processPersistent(partialFunction, (Persistent) ((JournalProtocol.ReplayedMessage) obj).persistent());
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            return;
        }
        if (obj instanceof JournalProtocol.WriteMessageSuccess) {
            processPersistent(partialFunction, (Persistent) ((JournalProtocol.WriteMessageSuccess) obj).persistent());
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            return;
        }
        if (obj instanceof JournalProtocol.WriteMessageFailure) {
            JournalProtocol.WriteMessageFailure writeMessageFailure = (JournalProtocol.WriteMessageFailure) obj;
            PersistentRepr message = writeMessageFailure.message();
            Throwable cause = writeMessageFailure.cause();
            PersistenceFailure persistenceFailure = new PersistenceFailure(message.payload(), message.sequenceNr(), cause);
            if (!partialFunction.isDefinedAt(persistenceFailure)) {
                throw new ActorKilledException(new StringBuilder().append("Processor killed after persistence failure ").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"(processor id = [", "], sequence nr = [", "], payload class = [", "]). "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.$outer.processorId(), BoxesRunTime.boxToLong(message.sequenceNr()), message.payload().getClass().getName()}))).append("To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. ").append("PersistenceFailure was caused by: ").append(cause).toString());
            }
            process(partialFunction, persistenceFailure);
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            return;
        }
        if (obj instanceof JournalProtocol.LoopMessageSuccess) {
            process(partialFunction, ((JournalProtocol.LoopMessageSuccess) obj).message());
            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
            return;
        }
        if (JournalProtocol$WriteMessagesSuccess$.MODULE$.equals(obj) ? true : obj instanceof JournalProtocol.WriteMessagesFailure) {
            if (this.$outer.akka$persistence$Processor$$processorBatch().isEmpty()) {
                batching_$eq(false);
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                journalBatch();
                boxedUnit2 = BoxedUnit.UNIT;
            }
            return;
        }
        if (obj instanceof PersistentRepr) {
            addToBatch((PersistentRepr) obj);
            if (!batching() || maxBatchSizeReached()) {
                journalBatch();
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
        if (!(obj instanceof PersistentBatch)) {
            if (this.$outer.akka$persistence$Processor$$processorBatch().isEmpty()) {
                batching_$eq(false);
            } else {
                journalBatch();
            }
            this.$outer.journal().forward(new JournalProtocol.LoopMessage(obj, this.$outer.self()), this.$outer.context());
            BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
            return;
        }
        PersistentBatch persistentBatch = (PersistentBatch) obj;
        if (!this.$outer.akka$persistence$Processor$$processorBatch().isEmpty()) {
            journalBatch();
        }
        addToBatch(persistentBatch);
        journalBatch();
        BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
    }

    public void addToBatch(PersistentRepr persistentRepr) {
        Processor processor = this.$outer;
        Vector<PersistentRepr> akka$persistence$Processor$$processorBatch = this.$outer.akka$persistence$Processor$$processorBatch();
        String processorId = this.$outer.processorId();
        processor.akka$persistence$Processor$$processorBatch_$eq((Vector) akka$persistence$Processor$$processorBatch.$colon$plus(persistentRepr.update(Processor.Cclass.akka$persistence$Processor$$nextSequenceNr(this.$outer), processorId, persistentRepr.update$default$3(), persistentRepr.update$default$4(), persistentRepr.update$default$5(), persistentRepr.update$default$6(), persistentRepr.update$default$7(), this.$outer.sender()), Vector$.MODULE$.canBuildFrom()));
    }

    public void addToBatch(PersistentBatch persistentBatch) {
        persistentBatch.persistentReprList().foreach(new Processor$$anon$2$$anonfun$addToBatch$1(this));
    }

    public boolean maxBatchSizeReached() {
        return this.$outer.akka$persistence$Processor$$processorBatch().length() >= this.$outer.extension().settings().journal().maxMessageBatchSize();
    }

    public void journalBatch() {
        akka.actor.package$.MODULE$.actorRef2Scala(this.$outer.journal()).$bang(new JournalProtocol.WriteMessages(this.$outer.akka$persistence$Processor$$processorBatch(), this.$outer.self()), this.$outer.self());
        this.$outer.akka$persistence$Processor$$processorBatch_$eq(scala.package$.MODULE$.Vector().empty());
        batching_$eq(true);
    }

    @Override // akka.persistence.Recovery.State
    public /* synthetic */ Recovery akka$persistence$Recovery$State$$$outer() {
        return this.$outer;
    }

    public Processor$$anon$2(Processor processor) {
        if (processor == null) {
            throw null;
        }
        this.$outer = processor;
        Recovery.State.Cclass.$init$(this);
        this.batching = false;
    }
}
