/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.lindorm.client.core;

import com.alibaba.lindorm.client.AsyncCallback;
import com.alibaba.lindorm.client.FeedStreamService;
import com.alibaba.lindorm.client.LindormClientConfig;
import com.alibaba.lindorm.client.LindormServiceProvider;
import com.alibaba.lindorm.client.TableService;
import com.alibaba.lindorm.client.core.LindormBasicService;
import com.alibaba.lindorm.client.core.LindormTableService;
import com.alibaba.lindorm.client.core.feedstreamservice.LMessage;
import com.alibaba.lindorm.client.core.feedstreamservice.LMessageScanner;
import com.alibaba.lindorm.client.core.feedstreamservice.StreamScan;
import com.alibaba.lindorm.client.core.ipc.ClientCompletableFuture;
import com.alibaba.lindorm.client.core.meta.TableCategory;
import com.alibaba.lindorm.client.core.meta.TableState;
import com.alibaba.lindorm.client.core.tableservice.LUpsert;
import com.alibaba.lindorm.client.core.utils.Bytes;
import com.alibaba.lindorm.client.core.utils.CollectionUtils;
import com.alibaba.lindorm.client.core.utils.FeedStreamUtils;
import com.alibaba.lindorm.client.dml.ColumnKey;
import com.alibaba.lindorm.client.dml.Condition;
import com.alibaba.lindorm.client.dml.ConditionFactory;
import com.alibaba.lindorm.client.dml.ConditionList;
import com.alibaba.lindorm.client.dml.QueryResults;
import com.alibaba.lindorm.client.dml.Row;
import com.alibaba.lindorm.client.dml.Select;
import com.alibaba.lindorm.client.dml.Upsert;
import com.alibaba.lindorm.client.exception.IllegalRequestException;
import com.alibaba.lindorm.client.exception.LindormException;
import com.alibaba.lindorm.client.schema.LindormPipeDescriptor;
import com.alibaba.lindorm.client.schema.LindormTableDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LindormFeedStreamService
extends LindormBasicService
implements FeedStreamService {
    public static final Log LOG = LogFactory.getLog(LindormFeedStreamService.class);
    private static AtomicInteger feedStreamServiceCount = new AtomicInteger(0);
    private LindormTableService service;

    public LindormFeedStreamService(LindormClientConfig config) throws LindormException {
        this(config, "LindormFeedStreamService" + feedStreamServiceCount.getAndIncrement());
    }

    public LindormFeedStreamService(LindormClientConfig config, String serviceName) throws LindormException {
        super(config, serviceName);
        if (config.getBoolean("lindorm.client.prefetch.routecache", false)) {
            // empty if block
        }
        this.service = (LindormTableService)LindormServiceProvider.TableServiceProvider.create(config, serviceName + "_internalTableService");
    }

    public TableService getInternalTableService() {
        return this.service;
    }

    @Override
    public void close() {
        try {
            this.service.close();
        }
        catch (Throwable t) {
            LOG.error((Object)"Failed closing internal table service, error ignored.", t);
        }
        super.close();
    }

    @Override
    public String getNamespace() throws LindormException {
        if (this.namespace == null) {
            throw new LindormException("Namespace is not specified!");
        }
        return this.namespace;
    }

    @Override
    public void useNamespace(String namespace) {
        this.namespace = namespace;
        this.service.useNamespace(namespace);
    }

    @Override
    public void createPipe(LindormPipeDescriptor pipe) throws LindormException {
        this.createPipe(pipe, 4);
    }

    @Override
    public void createPipe(LindormPipeDescriptor pipe, int initialPartitions) throws LindormException {
        this.checkOpen();
        if (pipe == null) {
            throw new IllegalRequestException("Must specify a valid LindormPipeDescriptor, but has null.");
        }
        FeedStreamUtils.validatePipeDescriptor(pipe);
        LindormTableDescriptor desc = FeedStreamUtils.createPipeTableDescriptor(pipe);
        if (initialPartitions == 1) {
            this.service.createTable(desc);
        } else {
            byte[] startKey = Bytes.EMPTY_BYTE_ARRAY;
            byte[] endKey = new byte[]{-1, -1, -1, -1};
            byte[][] splits = Bytes.split(startKey, endKey, true, initialPartitions - 1);
            this.service.createTable(desc, (byte[][])Arrays.copyOfRange(splits, 1, splits.length - 1));
        }
        LOG.info((Object)("FeedStreamPipe created, pipe=" + pipe.toString() + ", tableDesc=" + desc.toString()));
    }

    @Override
    public LindormPipeDescriptor describePipe(String pipeName) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        LindormTableDescriptor desc = this.service.describeTable(pipeName);
        if (!FeedStreamUtils.isFeedStreamTable(desc.getTableAttributes())) {
            throw new IllegalRequestException("Must describe a pipe, pipe name not found.");
        }
        LindormPipeDescriptor pipe = FeedStreamUtils.createPipeDescriptor(desc);
        LOG.info((Object)("Get pipe descriptor " + pipe.toString()));
        return pipe;
    }

    @Override
    public void offlinePipe(String pipeName) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        this.service.offlineTable(pipeName);
    }

    @Override
    public void onlinePipe(String pipeName) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        this.service.onlineTable(pipeName);
    }

    @Override
    public void offlinePipe(String pipeName, int timeout) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        this.service.offlineTable(pipeName, timeout);
    }

    @Override
    public void onlinePipe(String pipeName, int timeout) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        this.service.onlineTable(pipeName, timeout);
    }

    @Override
    public void deletePipe(String pipeName) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        this.service.deleteTable(pipeName);
    }

    @Override
    public List<String> listPipes() throws LindormException {
        this.checkOpen();
        List<String> pipes = this.service.listTables(TableCategory.STREAM_TABLES);
        return pipes;
    }

    @Override
    public TableState getPipeState(String pipeName) throws LindormException {
        FeedStreamUtils.checkPipeName(pipeName);
        return this.service.getTableState(pipeName);
    }

    @Override
    public void truncatePipe(String pipeName) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        this.service.truncateTable(pipeName);
    }

    @Override
    public Long getStreamLatestMessageId(String pipeName, String streamName) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkStreamName(streamName);
        byte[] hash = FeedStreamUtils.computeHashByStreamName(streamName);
        ConditionList where = ConditionFactory.and(ConditionFactory.compare("hash", ConditionFactory.CompareOp.EQUAL, (Object)hash), ConditionFactory.compare("stream_name", ConditionFactory.CompareOp.EQUAL, (Object)streamName));
        Select select = this.service.select().from(pipeName).where(where);
        FeedStreamUtils.markSelectSequenceFamily(select);
        QueryResults results = select.execute();
        Row r = results.next();
        if (r == null) {
            return null;
        }
        return r.getColumnValue(FeedStreamUtils.SEQUENCE_FAMILY_NAME_BYTES, FeedStreamUtils.MESSAGE_ID_COLUMN_NAME_BYTES).getLong();
    }

    private Upsert generateUpsert(String pipeName, LMessage message) throws LindormException {
        Upsert upsert = this.service.upsert().into(pipeName);
        upsert.add(FeedStreamUtils.messageToRow(message));
        return upsert;
    }

    private Upsert generateUpsert(String pipeName, List<LMessage> messages) throws LindormException {
        Upsert upsert = this.service.upsert().into(pipeName);
        for (LMessage m : messages) {
            upsert.add(FeedStreamUtils.messageToRow(m));
        }
        return upsert;
    }

    @Override
    public long append(String pipeName, LMessage m) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        try {
            Upsert upsert = this.generateUpsert(pipeName, m);
            List<Object> results = upsert.executeWithResults();
            return (Long)results.get(0);
        }
        catch (Throwable t) {
            if (t instanceof LindormException) {
                throw (LindormException)t;
            }
            throw new LindormException(t);
        }
    }

    @Override
    public long update(String pipeName, LMessage previousMessage, LMessage m) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkUpdatable(m, previousMessage);
        try {
            Upsert upsert = this.generateUpsert(pipeName, m);
            ArrayList<LMessage> messages = new ArrayList<LMessage>(1);
            messages.add(previousMessage);
            FeedStreamUtils.prepareAttrs(messages, (LUpsert)upsert);
            List<Object> results = upsert.executeWithResults();
            return (Long)results.get(0);
        }
        catch (Throwable t) {
            if (t instanceof LindormException) {
                throw (LindormException)t;
            }
            throw new LindormException(t);
        }
    }

    @Override
    public List<Long> update(String pipeName, List<LMessage> previousMessages, List<LMessage> messages) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        if (previousMessages == null || messages == null) {
            throw new IllegalRequestException("Previous messages and current messages should not be null");
        }
        if (previousMessages.size() != messages.size()) {
            throw new IllegalRequestException("Previous messages and messages should be same size, previous messages " + previousMessages.size() + ", messages " + messages.size());
        }
        if (previousMessages.isEmpty()) {
            return Collections.emptyList();
        }
        for (int i = 0; i < previousMessages.size(); ++i) {
            LMessage current = messages.get(i);
            LMessage previous = previousMessages.get(i);
            FeedStreamUtils.checkUpdatable(current, previous);
        }
        try {
            Upsert upsert = this.generateUpsert(pipeName, messages);
            FeedStreamUtils.prepareAttrs(previousMessages, (LUpsert)upsert);
            List<Object> tmpResults = upsert.executeWithResults();
            ArrayList<Long> results = new ArrayList<Long>(tmpResults.size());
            for (Object o : tmpResults) {
                results.add((Long)o);
            }
            return results;
        }
        catch (Throwable t) {
            if (t instanceof LindormException) {
                throw (LindormException)t;
            }
            throw new LindormException(t);
        }
    }

    @Override
    public List<Long> append(String pipeName, List<LMessage> messages) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        if (messages == null || messages.isEmpty()) {
            return Collections.emptyList();
        }
        try {
            Upsert upsert = this.generateUpsert(pipeName, messages);
            List<Object> tmpResults = upsert.executeWithResults();
            ArrayList<Long> results = new ArrayList<Long>(tmpResults.size());
            for (Object o : tmpResults) {
                results.add((Long)o);
            }
            return results;
        }
        catch (Throwable t) {
            if (t instanceof LindormException) {
                throw (LindormException)t;
            }
            throw new LindormException(t);
        }
    }

    @Override
    public Future<Long> appendAsync(String pipeName, LMessage m) throws LindormException {
        final ClientCompletableFuture<Long> future = new ClientCompletableFuture<Long>();
        this.appendAsync(pipeName, m, new AsyncCallback<Long>(){

            @Override
            public void onComplete(Long result) {
                future.complete(result);
            }

            @Override
            public void onError(Throwable exception) {
                future.completeExceptionally(exception);
            }

            @Override
            public boolean shouldProcessResultInPool() {
                return false;
            }
        });
        return future;
    }

    @Override
    public Future<List<Long>> appendAsync(String pipeName, List<LMessage> messages) throws LindormException {
        final ClientCompletableFuture<List<Long>> future = new ClientCompletableFuture<List<Long>>();
        this.appendAsync(pipeName, messages, new AsyncCallback<List<Long>>(){

            @Override
            public void onComplete(List<Long> result) {
                future.complete(result);
            }

            @Override
            public void onError(Throwable exception) {
                future.completeExceptionally(exception);
            }

            @Override
            public boolean shouldProcessResultInPool() {
                return false;
            }
        });
        return future;
    }

    @Override
    public void appendAsync(String pipeName, LMessage m, final AsyncCallback<Long> callback) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        Upsert upsert = this.generateUpsert(pipeName, m);
        AsyncCallback<List<Object>> delegate = new AsyncCallback<List<Object>>(){

            @Override
            public void onComplete(List<Object> result) {
                if (result == null || result.isEmpty()) {
                    callback.onComplete(null);
                } else {
                    callback.onComplete((Long)result.get(0));
                }
            }

            @Override
            public void onError(Throwable exception) {
                callback.onError(exception);
            }

            @Override
            public boolean shouldProcessResultInPool() {
                return callback.shouldProcessResultInPool();
            }

            @Override
            public boolean isRetrying() {
                return callback.isRetrying();
            }
        };
        upsert.executeWithResultsAsync(delegate);
    }

    @Override
    public void appendAsync(String pipeName, List<LMessage> messages, final AsyncCallback<List<Long>> callback) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        if (messages == null || messages.isEmpty()) {
            callback.onComplete(Collections.emptyList());
            return;
        }
        AsyncCallback<List<Object>> delegate = new AsyncCallback<List<Object>>(){

            @Override
            public void onComplete(List<Object> result) {
                if (result == null || result.isEmpty()) {
                    callback.onComplete(Collections.emptyList());
                } else {
                    ArrayList<Long> temp = new ArrayList<Long>(result.size());
                    for (Object o : result) {
                        temp.add((Long)o);
                    }
                    callback.onComplete(temp);
                }
            }

            @Override
            public void onError(Throwable exception) {
                callback.onError(exception);
            }

            @Override
            public boolean shouldProcessResultInPool() {
                return callback.shouldProcessResultInPool();
            }

            @Override
            public boolean isRetrying() {
                return callback.isRetrying();
            }
        };
        Upsert upsert = this.generateUpsert(pipeName, messages);
        upsert.executeWithResultsAsync(delegate);
    }

    @Override
    public Future<Long> updateAsync(String pipeName, LMessage previousMessage, LMessage m) throws LindormException {
        final ClientCompletableFuture<Long> future = new ClientCompletableFuture<Long>();
        this.updateAsync(pipeName, previousMessage, m, new AsyncCallback<Long>(){

            @Override
            public void onComplete(Long result) {
                future.complete(result);
            }

            @Override
            public void onError(Throwable exception) {
                future.completeExceptionally(exception);
            }

            @Override
            public boolean shouldProcessResultInPool() {
                return false;
            }
        });
        return future;
    }

    @Override
    public void updateAsync(String pipeName, LMessage previousMessage, LMessage m, final AsyncCallback<Long> callback) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkUpdatable(m, previousMessage);
        Upsert upsert = this.generateUpsert(pipeName, m);
        ArrayList<LMessage> messages = new ArrayList<LMessage>(1);
        messages.add(previousMessage);
        try {
            FeedStreamUtils.prepareAttrs(messages, (LUpsert)upsert);
        }
        catch (IOException ioe) {
            throw new LindormException(ioe);
        }
        AsyncCallback<List<Object>> delegate = new AsyncCallback<List<Object>>(){

            @Override
            public void onComplete(List<Object> result) {
                if (result == null || result.isEmpty()) {
                    callback.onComplete(null);
                } else {
                    callback.onComplete((Long)result.get(0));
                }
            }

            @Override
            public void onError(Throwable exception) {
                callback.onError(exception);
            }

            @Override
            public boolean shouldProcessResultInPool() {
                return callback.shouldProcessResultInPool();
            }

            @Override
            public boolean isRetrying() {
                return callback.isRetrying();
            }
        };
        upsert.executeWithResultsAsync(delegate);
    }

    @Override
    public LMessage get(String pipeName, String streamName, long messageId) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkStreamName(streamName);
        FeedStreamUtils.validateMessageId(messageId);
        byte[] hash = FeedStreamUtils.computeHashByStreamName(streamName);
        ConditionList where = ConditionFactory.and(ConditionFactory.compare("hash", ConditionFactory.CompareOp.EQUAL, (Object)hash), ConditionFactory.compare("stream_name", ConditionFactory.CompareOp.EQUAL, (Object)streamName), ConditionFactory.compare("seq", ConditionFactory.CompareOp.EQUAL, (Object)messageId));
        Select select = this.service.select().from(pipeName).where(where);
        FeedStreamUtils.markSelectValueFamily(select);
        QueryResults results = select.execute();
        Row r = results.next();
        if (r == null) {
            return null;
        }
        return FeedStreamUtils.rowToMessage(streamName, r);
    }

    @Override
    public List<LMessage> get(String pipeName, String streamName, List<Long> messageIds) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkStreamName(streamName);
        if (messageIds == null || messageIds.size() == 0) {
            return new ArrayList<LMessage>();
        }
        byte[] hash = FeedStreamUtils.computeHashByStreamName(streamName);
        ConditionList cl = ConditionFactory.or();
        for (Long messageId : messageIds) {
            if (messageId == null) {
                throw new LindormException("Message Id must not be null.");
            }
            FeedStreamUtils.validateMessageId(messageId);
            cl.add(ConditionFactory.compare("seq", ConditionFactory.CompareOp.EQUAL, (Object)messageId));
        }
        ConditionList conditions = ConditionFactory.and(ConditionFactory.compare("hash", ConditionFactory.CompareOp.EQUAL, (Object)hash), ConditionFactory.compare("stream_name", ConditionFactory.CompareOp.EQUAL, (Object)streamName), cl);
        Select select = this.service.select().from(pipeName).where(conditions);
        FeedStreamUtils.markSelectValueFamily(select);
        QueryResults results = select.execute();
        Map<Long, LMessage> idMessageMap = this.resultsToMessageIdMap(results, streamName, messageIds.size());
        ArrayList<LMessage> ret = CollectionUtils.newArrayListWithCapacity(messageIds.size());
        if (idMessageMap.isEmpty()) {
            this.fillNull(ret, messageIds.size());
        } else {
            for (Long id : messageIds) {
                ret.add(idMessageMap.get(id));
            }
        }
        return ret;
    }

    private <T> List<T> fillNull(List<T> list, int n) {
        for (int i = 0; i < n; ++i) {
            list.add(null);
        }
        return list;
    }

    private Map<Long, LMessage> resultsToMessageIdMap(QueryResults results, String streamName, int size) throws LindormException {
        HashMap<Long, LMessage> idMessageMap = CollectionUtils.newHashMapWithExpectedSize(size);
        for (Row r : results) {
            LMessage msg = FeedStreamUtils.rowToMessage(streamName, r);
            idMessageMap.put(msg.getMessageId(), msg);
        }
        return idMessageMap;
    }

    @Override
    public Long getMessageId(String pipeName, String streamName, byte[] idempotentId) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkStreamName(streamName);
        FeedStreamUtils.validateIdempotentId(idempotentId);
        byte[] hash = FeedStreamUtils.computeHashByStreamName(streamName);
        ConditionList where = ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)hash), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)streamName), ConditionFactory.compare(FeedStreamUtils.ZERO_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)Bytes.ZERO_BYTE), ConditionFactory.compare(FeedStreamUtils.IDEMPOTENT_ID_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)idempotentId));
        Select select = this.service.select().from(pipeName).where(where);
        FeedStreamUtils.markSelectIdempotentFamily(select);
        QueryResults results = select.execute();
        Row r = results.next();
        if (r == null) {
            return null;
        }
        return r.getColumnValue(FeedStreamUtils.IDEMPOTENT_FAMILY_NAME_BYTES, FeedStreamUtils.MESSAGE_ID_COLUMN_NAME_BYTES).getLong();
    }

    @Override
    public List<Long> getMessageId(String pipeName, String streamName, List<byte[]> idempotentIds) throws LindormException {
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkStreamName(streamName);
        if (idempotentIds == null || idempotentIds.size() == 0) {
            return new ArrayList<Long>();
        }
        byte[] hash = FeedStreamUtils.computeHashByStreamName(streamName);
        ConditionList cl = ConditionFactory.or();
        for (byte[] idempotentId : idempotentIds) {
            FeedStreamUtils.validateIdempotentId(idempotentId);
            Condition where = ConditionFactory.compare(FeedStreamUtils.IDEMPOTENT_ID_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)idempotentId);
            cl.add(where);
        }
        ConditionList conditions = ConditionFactory.and(ConditionFactory.compare(FeedStreamUtils.HASH_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)hash), ConditionFactory.compare(FeedStreamUtils.STREAM_NAME_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)streamName), ConditionFactory.compare(FeedStreamUtils.ZERO_COLUMN_NAME_BYTES, ConditionFactory.CompareOp.EQUAL, (Object)Bytes.ZERO_BYTE), cl);
        Select select = this.service.select().from(pipeName).where(conditions);
        FeedStreamUtils.markSelectIdempotentFamily(select);
        QueryResults results = select.execute();
        TreeMap<byte[], Long> idempotentToMsgIdMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
        for (Row r : results) {
            long messageId = r.getColumnValue("i", "seq").getLong();
            byte[] idempotentId = r.getColumnValue("idem").getBinary();
            idempotentToMsgIdMap.put(idempotentId, messageId);
        }
        ArrayList<Long> ret = CollectionUtils.newArrayListWithCapacity(idempotentIds.size());
        if (idempotentToMsgIdMap.isEmpty()) {
            this.fillNull(ret, idempotentIds.size());
        } else {
            for (byte[] idemId : idempotentIds) {
                ret.add((Long)idempotentToMsgIdMap.get(idemId));
            }
        }
        return ret;
    }

    @Override
    public LMessage get(String pipeName, String streamName, byte[] idempotentId) throws LindormException {
        this.checkOpen();
        Long messageId = this.getMessageId(pipeName, streamName, idempotentId);
        if (messageId == null) {
            return null;
        }
        return this.get(pipeName, streamName, messageId);
    }

    @Override
    public List<LMessage> batchGet(String pipeName, String streamName, List<byte[]> idempotentIds) throws LindormException {
        this.checkOpen();
        if (idempotentIds == null || idempotentIds.size() == 0) {
            return new ArrayList<LMessage>();
        }
        List<Long> messageIds = this.getMessageId(pipeName, streamName, idempotentIds);
        ArrayList<Long> messageIdsWithoutNull = new ArrayList<Long>(idempotentIds.size());
        for (Long id : messageIds) {
            if (id == null) continue;
            messageIdsWithoutNull.add(id);
        }
        List<LMessage> messages = this.get(pipeName, streamName, messageIdsWithoutNull);
        TreeMap<byte[], LMessage> msgMap = new TreeMap<byte[], LMessage>(Bytes.BYTES_COMPARATOR);
        for (LMessage m : messages) {
            msgMap.put(m.getIdempotentId(), m);
        }
        ArrayList<LMessage> ret = CollectionUtils.newArrayListWithCapacity(idempotentIds.size());
        if (msgMap.isEmpty()) {
            this.fillNull(ret, idempotentIds.size());
        } else {
            for (byte[] idempId : idempotentIds) {
                ret.add((LMessage)msgMap.get(idempId));
            }
        }
        return ret;
    }

    @Override
    public LMessageScanner getScanner(String pipeName, String streamName, long startMessageId, long endMessageId) throws LindormException {
        StreamScan scan = new StreamScan();
        scan.setStartMessageId(startMessageId);
        scan.setEndMessageId(endMessageId);
        return this.getScanner(pipeName, streamName, scan);
    }

    @Override
    public LMessageScanner getScanner(String pipeName, String streamName, long startMessageId, long endMessageId, int limit) throws LindormException {
        StreamScan scan = new StreamScan();
        scan.setStartMessageId(startMessageId);
        scan.setEndMessageId(endMessageId);
        scan.setLimit(limit);
        return this.getScanner(pipeName, streamName, scan);
    }

    @Override
    public LMessageScanner getScanner(String pipeName, String streamName, StreamScan scan) throws LindormException {
        List<String> tagsToRead;
        this.checkOpen();
        FeedStreamUtils.checkPipeName(pipeName);
        FeedStreamUtils.checkStreamName(streamName);
        FeedStreamUtils.validateMessageId(scan.getStartMessageId());
        if (scan.getStartMessageId() > scan.getEndMessageId()) {
            throw new IllegalRequestException("Illegal start messageId " + scan.getStartMessageId() + ", end messageId " + scan.getEndMessageId());
        }
        Select select = this.service.select().from(pipeName);
        byte[] hash = FeedStreamUtils.computeHashByStreamName(streamName);
        ConditionList where = ConditionFactory.and(ConditionFactory.compare("hash", ConditionFactory.CompareOp.EQUAL, (Object)hash), ConditionFactory.compare("stream_name", ConditionFactory.CompareOp.EQUAL, (Object)streamName));
        if (scan.isGetScan()) {
            where.add(ConditionFactory.compare("seq", ConditionFactory.CompareOp.EQUAL, (Object)scan.getStartMessageId()));
        } else {
            where.add(ConditionFactory.compare("seq", ConditionFactory.CompareOp.GREATER_OR_EQUAL, (Object)scan.getStartMessageId()));
            where.add(ConditionFactory.compare("seq", ConditionFactory.CompareOp.LESS, (Object)scan.getEndMessageId()));
        }
        select.where(where);
        if (scan.getLimit() > 0) {
            select.limit(scan.getLimit());
        }
        if ((tagsToRead = scan.getTagsToRead()) != StreamScan.READ_ALL_TAGS) {
            ArrayList<ColumnKey> columns = new ArrayList<ColumnKey>();
            columns.add(FeedStreamUtils.CKV_STREAM_NAME);
            columns.add(FeedStreamUtils.CKV_MESSAGE_ID);
            columns.add(FeedStreamUtils.CKV_BODY);
            columns.add(FeedStreamUtils.CKV_IDEMPOTENT_ID);
            if (tagsToRead != StreamScan.READ_NO_TAGS) {
                for (String tag : tagsToRead) {
                    columns.add(new ColumnKey("v", tag));
                }
            }
            select.columns(columns);
        }
        QueryResults rs = select.execute();
        return new LMessageScanner(rs, streamName);
    }
}

