package org.apache.accumulo.master.replication;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/master/replication/FinishedWorkUpdater.class */
public class FinishedWorkUpdater implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(FinishedWorkUpdater.class);
    private final Connector conn;

    public FinishedWorkUpdater(Connector connector) {
        this.conn = connector;
    }

    @Override // java.lang.Runnable
    public void run() {
        log.debug("Looking for finished replication work");
        if (!ReplicationTable.isOnline(this.conn)) {
            log.debug("Replication table is not yet online, will retry");
            return;
        }
        try {
            BatchScanner<Map.Entry> batchScanner = ReplicationTable.getBatchScanner(this.conn, 4);
            BatchWriter batchWriter = ReplicationTable.getBatchWriter(this.conn);
            batchScanner.addScanIterator(new IteratorSetting(50, WholeRowIterator.class));
            ReplicationSchema.WorkSection.limit(batchScanner);
            batchScanner.setRanges(Collections.singleton(new Range()));
            try {
                for (Map.Entry entry : batchScanner) {
                    try {
                        SortedMap decodeRow = WholeRowIterator.decodeRow((Key) entry.getKey(), (Value) entry.getValue());
                        log.debug("Processing work progress for {} with {} columns", ((Key) entry.getKey()).getRow(), Integer.valueOf(decodeRow.size()));
                        HashMap hashMap = new HashMap();
                        boolean z = false;
                        Text text = new Text();
                        for (Map.Entry entry2 : decodeRow.entrySet()) {
                            try {
                                Replication.Status parseFrom = Replication.Status.parseFrom(((Value) entry2.getValue()).get());
                                ((Key) entry2.getKey()).getColumnQualifier(text);
                                ReplicationTarget from = ReplicationTarget.from(text);
                                if (!hashMap.containsKey(from.getSourceTableId())) {
                                    hashMap.put(from.getSourceTableId(), Long.MAX_VALUE);
                                }
                                hashMap.put(from.getSourceTableId(), Long.valueOf(Math.min(((Long) hashMap.get(from.getSourceTableId())).longValue(), parseFrom.getBegin())));
                            } catch (InvalidProtocolBufferException e) {
                                log.warn("Could not deserialize protobuf for {}", entry2.getKey(), e);
                                z = true;
                            }
                        }
                        if (!z) {
                            for (Map.Entry entry3 : hashMap.entrySet()) {
                                if (0 != ((Long) entry3.getValue()).longValue()) {
                                    ((Key) entry.getKey()).getRow(text);
                                    log.debug("For {}, source table ID {} has replicated through {}", new Object[]{((Key) entry.getKey()).getRow(), entry3.getKey(), entry3.getValue()});
                                    Mutation mutation = new Mutation(text);
                                    Replication.Status replicated = StatusUtil.replicated(((Long) entry3.getValue()).longValue());
                                    Value value = ProtobufUtil.toValue(replicated);
                                    text.set((String) entry3.getKey());
                                    ReplicationSchema.StatusSection.add(mutation, text, value);
                                    log.debug("Updating replication status entry for {} with {}", ((Key) entry.getKey()).getRow(), replicated);
                                    try {
                                        batchWriter.addMutation(mutation);
                                    } catch (MutationsRejectedException e2) {
                                        log.error("Error writing mutations to update replication Status messages in StatusSection, will retry", e2);
                                        log.debug("Finished updating files with completed replication work");
                                        batchScanner.close();
                                        try {
                                            batchWriter.close();
                                            return;
                                        } catch (MutationsRejectedException e3) {
                                            log.error("Error writing mutations to update replication Status messages in StatusSection, will retry", e3);
                                            return;
                                        }
                                    }
                                }
                            }
                        }
                    } catch (IOException e4) {
                        log.warn("Could not deserialize whole row with key {}", ((Key) entry.getKey()).toStringNoTruncate(), e4);
                    }
                }
                log.debug("Finished updating files with completed replication work");
                batchScanner.close();
                try {
                    batchWriter.close();
                } catch (MutationsRejectedException e5) {
                    log.error("Error writing mutations to update replication Status messages in StatusSection, will retry", e5);
                }
            } catch (Throwable th) {
                log.debug("Finished updating files with completed replication work");
                batchScanner.close();
                try {
                    batchWriter.close();
                } catch (MutationsRejectedException e6) {
                    log.error("Error writing mutations to update replication Status messages in StatusSection, will retry", e6);
                }
                throw th;
            }
        } catch (ReplicationTableOfflineException e7) {
            log.debug("Table is no longer online, will retry");
        }
    }
}
