/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.BatchlogManagerMBean;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchlogManager
implements BatchlogManagerMBean {
    private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
    private static final int VERSION = 6;
    private static final long TIMEOUT = 2L * DatabaseDescriptor.getWriteRpcTimeout();
    private static final ByteBuffer WRITTEN_AT = BatchlogManager.columnName("written_at");
    private static final ByteBuffer DATA = BatchlogManager.columnName("data");
    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
    public static final BatchlogManager instance = new BatchlogManager();
    private final AtomicLong totalBatchesReplayed = new AtomicLong();
    private final AtomicBoolean isReplaying = new AtomicBoolean();

    public void start() {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        WrappedRunnable runnable = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws ExecutionException, InterruptedException {
                BatchlogManager.this.replayAllFailedBatches();
            }
        };
        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, 600000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public int countAllBatches() {
        int count = 0;
        for (Row row : BatchlogManager.getRangeSlice(new NamesQueryFilter((SortedSet<ByteBuffer>)ImmutableSortedSet.of()))) {
            if (row.cf == null || row.cf.isMarkedForDelete()) continue;
            ++count;
        }
        return count;
    }

    @Override
    public long getTotalBatchesReplayed() {
        return this.totalBatchesReplayed.longValue();
    }

    @Override
    public void forceBatchlogReplay() {
        WrappedRunnable runnable = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws ExecutionException, InterruptedException {
                BatchlogManager.this.replayAllFailedBatches();
            }
        };
        StorageService.optionalTasks.execute(runnable);
    }

    public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid) {
        long timestamp = FBUtilities.timestampMicros();
        ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000L);
        ByteBuffer data = BatchlogManager.serializeRowMutations(mutations);
        ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCF);
        cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
        cf.addColumn(new Column(DATA, data, timestamp));
        RowMutation rm = new RowMutation("system", UUIDType.instance.decompose(uuid));
        rm.add(cf);
        return rm;
    }

    private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations) {
        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        try {
            dos.writeInt(mutations.size());
            for (RowMutation rm : mutations) {
                RowMutation.serializer.serialize(rm, (DataOutput)dos, 6);
            }
        }
        catch (IOException e) {
            throw new AssertionError();
        }
        return ByteBuffer.wrap(bos.toByteArray());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void replayAllFailedBatches() throws ExecutionException, InterruptedException {
        if (!this.isReplaying.compareAndSet(false, true)) {
            return;
        }
        try {
            logger.debug("Started replayAllFailedBatches");
            for (Row row : BatchlogManager.getRangeSlice(new NamesQueryFilter(WRITTEN_AT))) {
                IColumn writtenAt;
                if (row.cf == null || row.cf.isMarkedForDelete() || (writtenAt = row.cf.getColumn(WRITTEN_AT)) != null && System.currentTimeMillis() <= LongType.instance.compose(writtenAt.value()) + TIMEOUT) continue;
                this.replayBatch(row.key);
            }
            this.cleanup();
        }
        finally {
            this.isReplaying.set(false);
        }
        logger.debug("Finished replayAllFailedBatches");
    }

    private void replayBatch(DecoratedKey key) {
        UUID uuid = UUIDType.instance.compose(key.key);
        logger.debug("Replaying batch {}", (Object)uuid);
        ColumnFamilyStore store = Table.open("system").getColumnFamilyStore("batchlog");
        QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath("batchlog"));
        ColumnFamily batch = store.getColumnFamily(filter);
        if (batch == null || batch.isMarkedForDelete()) {
            return;
        }
        IColumn dataColumn = batch.getColumn(DATA);
        try {
            if (dataColumn != null) {
                BatchlogManager.writeHintsForSerializedMutations(dataColumn.value());
            }
        }
        catch (IOException e) {
            logger.warn("Skipped batch replay of {} due to {}", (Object)uuid, (Object)e);
        }
        BatchlogManager.deleteBatch(key);
        this.totalBatchesReplayed.incrementAndGet();
    }

    private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException {
        DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
        int size = in.readInt();
        for (int i = 0; i < size; ++i) {
            BatchlogManager.writeHintsForMutation(RowMutation.serializer.deserialize(in, 6));
        }
    }

    private static void writeHintsForMutation(RowMutation mutation) throws IOException {
        String table = mutation.getTable();
        Object tk = StorageService.getPartitioner().getToken(mutation.key());
        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, (RingPosition)tk);
        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor((Token)tk, table);
        for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints)) {
            if (target.equals(FBUtilities.getBroadcastAddress())) {
                mutation.apply();
                continue;
            }
            StorageProxy.writeHintForMutation(mutation, target);
        }
    }

    private static void deleteBatch(DecoratedKey key) {
        RowMutation rm = new RowMutation("system", key.key);
        rm.delete(new QueryPath("batchlog"), FBUtilities.timestampMicros());
        rm.apply();
    }

    private static ByteBuffer columnName(String name) {
        ByteBuffer raw = UTF8Type.instance.decompose(name);
        return CFMetaData.BatchlogCF.getCfDef().getColumnNameBuilder().add(raw).build();
    }

    private static List<Row> getRangeSlice(IDiskAtomFilter columnFilter) {
        ColumnFamilyStore store = Table.open("system").getColumnFamilyStore("batchlog");
        IPartitioner partitioner = StorageService.getPartitioner();
        Token.KeyBound minPosition = ((Token)partitioner.getMinimumToken()).minKeyBound();
        Range<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
        return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null);
    }

    private void cleanup() throws ExecutionException, InterruptedException {
        ColumnFamilyStore cfs = Table.open("system").getColumnFamilyStore("batchlog");
        cfs.forceBlockingFlush();
        ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
        for (SSTableReader sstr : cfs.getSSTables()) {
            descriptors.add(sstr.descriptor);
        }
        if (!descriptors.isEmpty()) {
            CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
        }
    }
}

