/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.protonj2.test.driver;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.qpid.protonj2.test.driver.DriverSessions;
import org.apache.qpid.protonj2.test.driver.FrameDecoder;
import org.apache.qpid.protonj2.test.driver.FrameEncoder;
import org.apache.qpid.protonj2.test.driver.ScriptedAction;
import org.apache.qpid.protonj2.test.driver.ScriptedElement;
import org.apache.qpid.protonj2.test.driver.actions.ScriptCompleteAction;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.security.SaslDescribedType;
import org.apache.qpid.protonj2.test.driver.codec.security.SaslOutcome;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
import org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat;
import org.apache.qpid.protonj2.test.driver.codec.transport.Open;
import org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType;
import org.apache.qpid.protonj2.test.driver.exceptions.UnexpectedPerformativeError;
import org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation;
import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPTestDriver
implements Consumer<ByteBuffer> {
    private static final Logger LOG = LoggerFactory.getLogger(AMQPTestDriver.class);
    private final String driverName;
    private final FrameDecoder frameParser;
    private final FrameEncoder frameEncoder;
    private Open localOpen;
    private Open remoteOpen;
    private final DriverSessions sessions = new DriverSessions(this);
    private final Consumer<ByteBuffer> frameConsumer;
    private final Consumer<AssertionError> assertionConsumer;
    private final Supplier<NettyEventLoop> schedulerSupplier;
    private volatile List<ByteBuffer> deferredWrites = new ArrayList<ByteBuffer>();
    private volatile AssertionError failureCause;
    private int advertisedIdleTimeout = 0;
    private volatile int emptyFrameCount;
    private volatile int performativeCount;
    private volatile int saslPerformativeCount;
    private int inboundMaxFrameSize = Integer.MAX_VALUE;
    private int outboundMaxFrameSize = Integer.MAX_VALUE;
    private final Queue<ScriptedElement> script = new ArrayDeque<ScriptedElement>();

    public AMQPTestDriver(String name, Consumer<ByteBuffer> frameConsumer, Supplier<NettyEventLoop> scheduler) {
        this(name, frameConsumer, null, scheduler);
    }

    public AMQPTestDriver(String name, Consumer<ByteBuffer> frameConsumer, Consumer<AssertionError> assertionConsumer, Supplier<NettyEventLoop> scheduler) {
        this.frameConsumer = frameConsumer;
        this.assertionConsumer = assertionConsumer;
        this.schedulerSupplier = scheduler;
        this.driverName = name;
        this.frameParser = new FrameDecoder(this);
        this.frameEncoder = new FrameEncoder(this);
    }

    public DriverSessions sessions() {
        return this.sessions;
    }

    public Object getName() {
        return this.driverName;
    }

    public int getAdvertisedIdleTimeout() {
        return this.advertisedIdleTimeout;
    }

    public void setAdvertisedIdleTimeout(int advertisedIdleTimeout) {
        this.advertisedIdleTimeout = advertisedIdleTimeout;
    }

    public int getEmptyFrameCount() {
        return this.emptyFrameCount;
    }

    public int getPerformativeCount() {
        return this.performativeCount;
    }

    public int getSaslPerformativeCount() {
        return this.saslPerformativeCount;
    }

    public int getInboundMaxFrameSize() {
        return this.inboundMaxFrameSize;
    }

    public void setInboundMaxFrameSize(int maxSize) {
        this.inboundMaxFrameSize = maxSize;
    }

    public int getOutboundMaxFrameSize() {
        return this.outboundMaxFrameSize;
    }

    public void setOutboundMaxFrameSize(int maxSize) {
        this.outboundMaxFrameSize = maxSize;
    }

    @Override
    public void accept(ByteBuffer buffer) {
        LOG.trace("{} processing new inbound buffer of size: {}", (Object)this.driverName, (Object)buffer.remaining());
        try {
            while (buffer.remaining() > 0 && this.failureCause == null) {
                LOG.trace("{} ingestion of {} bytes starting now.", (Object)this.driverName, (Object)buffer.remaining());
                this.frameParser.ingest(buffer);
                LOG.trace("{} ingestion completed cycle, remaining bytes in buffer: {}", (Object)this.driverName, (Object)buffer.remaining());
            }
        }
        catch (AssertionError e) {
            this.signalFailure((Throwable)((Object)e));
        }
    }

    public Open getRemoteOpen() {
        return this.remoteOpen;
    }

    public Open getLocalOpen() {
        return this.localOpen;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleConnectedEstablished() throws AssertionError {
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            ScriptedElement peekNext = this.script.peek();
            if (peekNext instanceof ScriptedAction) {
                this.processScript(peekNext);
            }
            this.resetToExpectingAMQPHeader();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleConnectedDropped() throws AssertionError {
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            this.resetToExpectingAMQPHeader();
            this.sessions.reset();
            if (!this.script.isEmpty()) {
                do {
                    ScriptedElement scriptEntry;
                    if ((scriptEntry = this.script.peek()) instanceof ConnectionDropExpectation) {
                        this.processScript(this.script.poll());
                        return;
                    }
                    if (!scriptEntry.isOptional()) {
                        this.signalFailure((Throwable)((Object)new AssertionError((Object)String.format("Scripted elements remaining after connection dropped: %d", this.script.size()))));
                        return;
                    }
                    this.script.poll();
                } while (!this.script.isEmpty());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleHeader(AMQPHeader header) throws AssertionError {
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            ScriptedElement scriptEntry = this.script.poll();
            if (scriptEntry == null) {
                this.signalFailure((Throwable)((Object)new AssertionError((Object)"Received header when not expecting any input.")));
            }
            try {
                header.invoke(scriptEntry, this);
            }
            catch (Throwable t) {
                if (scriptEntry.isOptional()) {
                    this.handleHeader(header);
                }
                LOG.warn(t.getMessage());
                this.signalFailure(t);
                throw t;
            }
            this.processScript(scriptEntry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleSaslPerformative(int frameSize, SaslDescribedType sasl, int channel, ByteBuffer payload) throws AssertionError {
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            ScriptedElement scriptEntry = this.script.poll();
            if (scriptEntry == null) {
                this.signalFailure((Throwable)((Object)new AssertionError((Object)("Received performative[" + String.valueOf(sasl) + "] when not expecting any input."))));
            }
            try {
                sasl.invoke(scriptEntry, frameSize, this);
            }
            catch (UnexpectedPerformativeError e) {
                if (scriptEntry.isOptional()) {
                    this.handleSaslPerformative(frameSize, sasl, channel, payload);
                }
                this.signalFailure((Throwable)((Object)e));
                throw e;
            }
            catch (AssertionError assertion) {
                LOG.warn(((Throwable)((Object)assertion)).getMessage());
                this.signalFailure((Throwable)((Object)assertion));
                throw assertion;
            }
            this.processScript(scriptEntry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handlePerformative(int frameSize, PerformativeDescribedType amqp, int channel, ByteBuffer payload) throws AssertionError {
        switch (amqp.getPerformativeType()) {
            case HEARTBEAT: {
                break;
            }
            case OPEN: {
                this.remoteOpen = (Open)amqp;
            }
            default: {
                ++this.performativeCount;
            }
        }
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            ScriptedElement scriptEntry = this.script.poll();
            if (scriptEntry == null) {
                this.signalFailure((Throwable)((Object)new AssertionError((Object)("Received performative[" + String.valueOf(amqp) + "] when not expecting any input."))));
            }
            try {
                amqp.invoke(scriptEntry, frameSize, payload, channel, this);
            }
            catch (UnexpectedPerformativeError e) {
                if (scriptEntry.isOptional()) {
                    this.handlePerformative(frameSize, amqp, channel, payload);
                }
                this.signalFailure((Throwable)((Object)e));
                throw e;
            }
            catch (AssertionError assertion) {
                LOG.warn(((Throwable)((Object)assertion)).getMessage());
                this.signalFailure((Throwable)((Object)assertion));
                throw assertion;
            }
            this.processScript(scriptEntry);
        }
    }

    void handleHeartbeat(int frameSize, int channel) {
        ++this.emptyFrameCount;
        this.handlePerformative(frameSize, HeartBeat.INSTANCE, channel, null);
    }

    public synchronized void afterDelay(int delay, ScriptedAction action) {
        Objects.requireNonNull(this.schedulerSupplier, "This driver cannot schedule delayed events, no scheduler available");
        NettyEventLoop scheduler = this.schedulerSupplier.get();
        Objects.requireNonNull(scheduler, "This driver cannot schedule delayed events, no scheduler available");
        scheduler.schedule(() -> {
            LOG.trace("{} running delayed action: {}", (Object)this.driverName, (Object)action);
            action.perform(this);
        }, delay, TimeUnit.MILLISECONDS);
    }

    public void resetToExpectingAMQPHeader() {
        this.frameParser.resetToExpectingHeader();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForScriptToComplete() {
        this.checkFailed();
        ScriptCompleteAction possibleWait = null;
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            this.checkFailed();
            if (!this.script.isEmpty()) {
                possibleWait = new ScriptCompleteAction(this).queue();
            }
        }
        if (possibleWait != null) {
            try {
                possibleWait.await();
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                this.signalFailure("Interrupted while waiting for script to complete");
            }
        }
        this.checkFailed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForScriptToCompleteIgnoreErrors() {
        ScriptCompleteAction possibleWait = null;
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            if (!this.script.isEmpty()) {
                possibleWait = new ScriptCompleteAction(this).queue();
            }
        }
        if (possibleWait != null) {
            try {
                possibleWait.await();
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                this.signalFailure("Interrupted while waiting for script to complete");
            }
        }
    }

    public void waitForScriptToComplete(long timeout) {
        this.waitForScriptToComplete(timeout, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForScriptToComplete(long timeout, TimeUnit units) {
        this.checkFailed();
        ScriptCompleteAction possibleWait = null;
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            this.checkFailed();
            if (!this.script.isEmpty()) {
                possibleWait = new ScriptCompleteAction(this).queue();
            }
        }
        if (possibleWait != null) {
            try {
                possibleWait.await(timeout, units);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                this.signalFailure("Interrupted while waiting for script to complete");
            }
        }
        this.checkFailed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addScriptedElement(ScriptedElement element) {
        this.checkFailed();
        Queue<ScriptedElement> queue = this.script;
        synchronized (queue) {
            this.checkFailed();
            this.script.offer(element);
        }
    }

    public final void sendAMQPFrame(int channel, DescribedType performative, ByteBuffer payload) {
        this.sendAMQPFrame(channel, performative, payload, false);
    }

    public void deferAMQPFrame(int channel, DescribedType performative, ByteBuffer payload, boolean splitWrite) {
        LOG.trace("{} Deferring write of performative: {}", (Object)this.driverName, (Object)performative);
        try {
            this.deferredWrites.add(this.frameEncoder.handleWrite(performative, channel, payload, null));
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Frame was not written due to error.", t)));
        }
    }

    public void deferSaslFrame(int channel, DescribedType performative) {
        if (performative instanceof SaslOutcome) {
            this.frameParser.resetToExpectingHeader();
        }
        LOG.trace("{} Deferring SASL performative write: {}", (Object)this.driverName, (Object)performative);
        try {
            this.deferredWrites.add(this.frameEncoder.handleWrite(performative, channel));
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Frame was not written due to error.", t)));
        }
    }

    public void deferHeader(AMQPHeader header) {
        LOG.trace("{} Deferring AMQP Header write: {}", (Object)this.driverName, (Object)header);
        try {
            this.deferredWrites.add(ByteBuffer.wrap(Arrays.copyOf(header.getBuffer(), 8)));
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Frame was not consumed due to error.", t)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendAMQPFrame(int channel, DescribedType performative, ByteBuffer payload, boolean splitWrite) {
        ByteBuffer output;
        LOG.trace("{} Sending performative: {}", (Object)this.driverName, (Object)performative);
        if (performative instanceof PerformativeDescribedType) {
            switch (((PerformativeDescribedType)performative).getPerformativeType()) {
                case OPEN: {
                    this.localOpen = (Open)performative;
                }
            }
        }
        ByteBuffer buffer = this.frameEncoder.handleWrite(performative, channel, payload, null);
        if (!this.deferredWrites.isEmpty()) {
            this.deferredWrites.add(buffer);
            try {
                output = AMQPTestDriver.composeDefferedWrites(this.deferredWrites).asReadOnlyBuffer();
            }
            finally {
                this.deferredWrites.clear();
            }
            LOG.trace("{} appending deferred buffer {} to next write.", (Object)this.driverName, (Object)output);
        } else {
            output = buffer;
        }
        try {
            LOG.trace("{} Writing out buffer {} to consumer: {}", new Object[]{this.driverName, output, this.frameConsumer});
            if (splitWrite) {
                int bufferSplitPoint = output.remaining() / 2;
                byte[] front = new byte[bufferSplitPoint - output.position()];
                byte[] rear = new byte[output.remaining() - bufferSplitPoint];
                output.get(front);
                output.get(rear);
                this.frameConsumer.accept(ByteBuffer.wrap(front));
                this.frameConsumer.accept(ByteBuffer.wrap(rear));
            } else {
                this.frameConsumer.accept(output);
            }
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Frame was not written due to error.", t)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendSaslFrame(int channel, DescribedType performative) {
        if (performative instanceof SaslOutcome) {
            this.frameParser.resetToExpectingHeader();
        }
        try {
            ByteBuffer output;
            ByteBuffer buffer = this.frameEncoder.handleWrite(performative, channel);
            if (!this.deferredWrites.isEmpty()) {
                this.deferredWrites.add(buffer);
                try {
                    output = AMQPTestDriver.composeDefferedWrites(this.deferredWrites).asReadOnlyBuffer();
                }
                finally {
                    this.deferredWrites.clear();
                }
                LOG.trace("{} appending deferred buffer {} to next write.", (Object)this.driverName, (Object)output);
            } else {
                output = buffer;
            }
            LOG.trace("{} Sending SASL performative: {}", (Object)this.driverName, (Object)performative);
            this.frameConsumer.accept(output);
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Frame was not written due to error.", t)));
        }
    }

    public void sendHeader(AMQPHeader header) {
        try {
            ByteBuffer output;
            if (!this.deferredWrites.isEmpty()) {
                LOG.trace("{} appending deferred buffer {} to next write.", (Object)this.driverName, this.deferredWrites);
                this.deferredWrites.add(ByteBuffer.wrap(header.getBuffer()).asReadOnlyBuffer());
                try {
                    output = AMQPTestDriver.composeDefferedWrites(this.deferredWrites).asReadOnlyBuffer();
                }
                finally {
                    this.deferredWrites.clear();
                }
            } else {
                output = ByteBuffer.wrap(header.getBuffer()).asReadOnlyBuffer();
            }
            LOG.trace("{} Sending AMQP Header: {}", (Object)this.driverName, (Object)header);
            this.frameConsumer.accept(output);
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Frame was not consumed due to error.", t)));
        }
    }

    public void sendEmptyFrame(int channel) {
        try {
            this.frameConsumer.accept(this.frameEncoder.handleWrite(null, channel, null, null));
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Frame was not consumed due to error.", t)));
        }
    }

    public void sendBytes(ByteBuffer buffer) {
        LOG.trace("{} Sending bytes from ByteBuffer: {}", (Object)this.driverName, (Object)buffer);
        try {
            this.frameConsumer.accept(buffer.duplicate());
        }
        catch (Throwable t) {
            this.signalFailure((Throwable)((Object)new AssertionError("Buffer was not consumed due to error.", t)));
        }
    }

    public void signalFailure(Throwable ex) throws AssertionError {
        if (this.failureCause == null) {
            if (ex instanceof AssertionError) {
                LOG.trace("{} sending failure assertion due to: ", (Object)this.driverName, (Object)ex);
                this.failureCause = (AssertionError)((Object)ex);
            } else {
                LOG.trace("{} sending failure assertion due to: ", (Object)this.driverName, (Object)ex);
                this.failureCause = new AssertionError((Object)ex);
            }
            this.searchForScriptioCompletionAndTrigger();
            if (this.assertionConsumer != null) {
                this.assertionConsumer.accept(this.failureCause);
            }
        }
    }

    public void signalFailure(String message) throws AssertionError {
        this.signalFailure((Throwable)((Object)new AssertionError((Object)message)));
    }

    private static final ByteBuffer composeDefferedWrites(List<ByteBuffer> deferredWrites) {
        int totalSize = 0;
        for (ByteBuffer component : deferredWrites) {
            totalSize += component.remaining();
        }
        ByteBuffer composite = ByteBuffer.allocate(totalSize);
        for (ByteBuffer component : deferredWrites) {
            composite.put(component);
        }
        deferredWrites.clear();
        return composite.flip().asReadOnlyBuffer();
    }

    private void searchForScriptioCompletionAndTrigger() {
        this.script.forEach(element -> {
            if (element instanceof ScriptCompleteAction) {
                ScriptCompleteAction completed = (ScriptCompleteAction)element;
                completed.perform(this);
            }
        });
    }

    private void processScript(ScriptedElement current) {
        if (current.performAfterwards() != null && this.failureCause == null) {
            current.performAfterwards().perform(this);
        }
        ScriptedElement peekNext = this.script.peek();
        do {
            if (!(peekNext instanceof ScriptedAction)) {
                return;
            }
            this.script.poll();
            ((ScriptedAction)peekNext).perform(this);
        } while ((peekNext = this.script.peek()) != null && this.failureCause == null);
    }

    private void checkFailed() {
        if (this.failureCause != null) {
            throw this.failureCause;
        }
    }
}

