package org.wso2.broker.core.store.dao;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.wso2.broker.core.BrokerException;
import org.wso2.broker.core.Message;
import org.wso2.broker.core.store.DbEventMatcher;
import org.wso2.broker.core.store.DbOperation;
import org.wso2.broker.core.store.DbWriter;
import org.wso2.broker.core.store.LogExceptionHandler;

/* loaded from: input_file:org/wso2/broker/core/store/dao/SharedMessageStore.class */
public class SharedMessageStore {
    private final Map<Long, Message> pendingMessages = new ConcurrentHashMap();
    private final Disruptor<DbOperation> disruptor;
    private static final EventTranslatorOneArg<DbOperation, Message> INSERT_MESSAGE = (dbOperation, j, message) -> {
        dbOperation.insertMessage(message);
    };
    private static final EventTranslatorTwoArg<DbOperation, String, Long> DETACH_FROM_QUEUE = (dbOperation, j, str, l) -> {
        dbOperation.detachFromQueue(str, l);
    };
    private static final EventTranslatorOneArg<DbOperation, Long> DELETE_MESSAGE = (dbOperation, j, l) -> {
        dbOperation.deleteMessage(l.longValue());
    };
    private final MessageDao messageDao;

    public SharedMessageStore(MessageDao messageDao, int i, int i2) {
        this.disruptor = new Disruptor<>(DbOperation.getFactory(), i, new ThreadFactoryBuilder().setNameFormat("DisruptorMessageStoreThread-%d").build(), ProducerType.MULTI, new SleepingWaitStrategy());
        this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler());
        this.disruptor.handleEventsWith(new EventHandler[]{new DbEventMatcher(i)}).then(new EventHandler[]{new DbWriter(messageDao, i2)}).then(new EventHandler[]{(dbOperation, j, z) -> {
            dbOperation.clear();
        }});
        this.disruptor.start();
        this.messageDao = messageDao;
    }

    public void add(Message message) {
        this.pendingMessages.put(Long.valueOf(message.getMetadata().getInternalId()), message.shallowCopy());
    }

    public void attach(String str, long j) throws BrokerException {
        Message message = this.pendingMessages.get(Long.valueOf(j));
        if (message == null) {
            throw new BrokerException("Unknown message id " + j + " cannot attach to queue " + str);
        }
        message.getMetadata().addOwnedQueue(str);
    }

    private void delete(long j) {
        this.disruptor.publishEvent(DELETE_MESSAGE, Long.valueOf(j));
    }

    public void detach(String str, Message message) {
        message.getMetadata().removeAttachedQueue(str);
        if (message.getMetadata().hasAttachedQueues()) {
            this.disruptor.publishEvent(DETACH_FROM_QUEUE, str, Long.valueOf(message.getMetadata().getInternalId()));
        } else {
            delete(message.getMetadata().getInternalId());
        }
    }

    public void flush(long j) {
        Message remove = this.pendingMessages.remove(Long.valueOf(j));
        if (remove != null) {
            if (remove.getMetadata().hasAttachedQueues()) {
                this.disruptor.publishEvent(INSERT_MESSAGE, remove);
            } else {
                remove.release();
            }
        }
    }

    public Collection<Message> readStoredMessages(String str) throws BrokerException {
        return this.messageDao.readAll(str);
    }
}
