/*
 * Decompiled with CFR 0.152.
 */
package com.basho.riak.client.core;

import com.basho.riak.client.core.StreamingRiakFuture;
import com.basho.riak.client.core.operations.PBFutureOperation;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
import shaded.com.google.protobuf.GeneratedMessage;
import shaded.com.google.protobuf.Parser;

public abstract class PBStreamingFutureOperation<ReturnType, ResponseType, QueryInfoType>
extends PBFutureOperation<ReturnType, ResponseType, QueryInfoType>
implements StreamingRiakFuture<ReturnType, QueryInfoType> {
    private final TransferQueue<ReturnType> responseQueue;
    private boolean streamResults;

    protected PBStreamingFutureOperation(byte reqMessageCode, byte respMessageCode, GeneratedMessage.Builder<?> reqBuilder, Parser<ResponseType> respParser, boolean streamResults) {
        super(reqMessageCode, respMessageCode, reqBuilder, respParser);
        this.streamResults = streamResults;
        this.responseQueue = streamResults ? new LinkedTransferQueue() : null;
    }

    @Override
    protected void processMessage(ResponseType decodedMessage) {
        if (!this.streamResults) {
            super.processMessage(decodedMessage);
            return;
        }
        ReturnType r = this.processStreamingChunk(decodedMessage);
        assert (this.responseQueue != null);
        boolean chunkAdded = this.responseQueue.offer(r);
        assert (chunkAdded);
    }

    protected abstract ReturnType processStreamingChunk(ResponseType var1);

    @Override
    public final TransferQueue<ReturnType> getResultsQueue() {
        assert (this.responseQueue != null);
        return this.responseQueue;
    }
}

