/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.parse.inbound;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.store.CanalStoreException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.util.Assert;

public class EventTransactionBuffer
extends AbstractCanalLifeCycle {
    private static final long INIT_SQEUENCE = -1L;
    private int bufferSize = 1024;
    private int indexMask;
    private CanalEntry.Entry[] entries;
    private AtomicLong putSequence = new AtomicLong(-1L);
    private AtomicLong flushSequence = new AtomicLong(-1L);
    private TransactionFlushCallback flushCallback;

    public EventTransactionBuffer() {
    }

    public EventTransactionBuffer(TransactionFlushCallback flushCallback) {
        this.flushCallback = flushCallback;
    }

    public void start() throws CanalStoreException {
        super.start();
        if (Integer.bitCount(this.bufferSize) != 1) {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        Assert.notNull((Object)this.flushCallback, (String)"flush callback is null!");
        this.indexMask = this.bufferSize - 1;
        this.entries = new CanalEntry.Entry[this.bufferSize];
    }

    public void stop() throws CanalStoreException {
        this.putSequence.set(-1L);
        this.flushSequence.set(-1L);
        this.entries = null;
        super.stop();
    }

    public void add(List<CanalEntry.Entry> entrys) throws InterruptedException {
        for (CanalEntry.Entry entry : entrys) {
            this.add(entry);
        }
    }

    public void add(CanalEntry.Entry entry) throws InterruptedException {
        switch (entry.getEntryType()) {
            case TRANSACTIONBEGIN: {
                this.flush();
                this.put(entry);
                break;
            }
            case TRANSACTIONEND: {
                this.put(entry);
                this.flush();
                break;
            }
            case ROWDATA: {
                this.put(entry);
                CanalEntry.EventType eventType = entry.getHeader().getEventType();
                if (eventType == null || this.isDml(eventType)) break;
                this.flush();
                break;
            }
        }
    }

    public void reset() {
        this.putSequence.set(-1L);
        this.flushSequence.set(-1L);
    }

    private void put(CanalEntry.Entry data) throws InterruptedException {
        if (this.checkFreeSlotAt(this.putSequence.get() + 1L)) {
            long current = this.putSequence.get();
            long next = current + 1L;
            this.entries[this.getIndex((long)next)] = data;
            this.putSequence.set(next);
        } else {
            this.flush();
            this.put(data);
        }
    }

    private void flush() throws InterruptedException {
        long end;
        long start = this.flushSequence.get() + 1L;
        if (start <= (end = this.putSequence.get())) {
            ArrayList<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
            for (long next = start; next <= end; ++next) {
                transaction.add(this.entries[this.getIndex(next)]);
            }
            this.flushCallback.flush(transaction);
            this.flushSequence.set(end);
        }
    }

    private boolean checkFreeSlotAt(long sequence) {
        long wrapPoint = sequence - (long)this.bufferSize;
        return wrapPoint <= this.flushSequence.get();
    }

    private int getIndex(long sequcnce) {
        return (int)sequcnce & this.indexMask;
    }

    private boolean isDml(CanalEntry.EventType eventType) {
        return eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE || eventType == CanalEntry.EventType.DELETE;
    }

    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public void setFlushCallback(TransactionFlushCallback flushCallback) {
        this.flushCallback = flushCallback;
    }

    public static interface TransactionFlushCallback {
        public void flush(List<CanalEntry.Entry> var1) throws InterruptedException;
    }
}

