package org.apache.accumulo.master.replication;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/master/replication/WorkMaker.class */
public class WorkMaker {
    private static final Logger log = LoggerFactory.getLogger(WorkMaker.class);
    private final AccumuloServerContext context;
    private Connector conn;
    private BatchWriter writer;

    public WorkMaker(AccumuloServerContext accumuloServerContext, Connector connector) {
        this.context = accumuloServerContext;
        this.conn = connector;
    }

    public void run() {
        TableConfiguration tableConfiguration;
        if (!ReplicationTable.isOnline(this.conn)) {
            log.info("Replication table is not yet online");
            return;
        }
        Span start = Trace.start("replicationWorkMaker");
        try {
            try {
                Scanner<Map.Entry> scanner = ReplicationTable.getScanner(this.conn);
                if (null == this.writer) {
                    setBatchWriter(ReplicationTable.getBatchWriter(this.conn));
                }
                ReplicationSchema.StatusSection.limit(scanner);
                Text text = new Text();
                Text text2 = new Text();
                for (Map.Entry entry : scanner) {
                    ReplicationSchema.StatusSection.getFile((Key) entry.getKey(), text);
                    ReplicationSchema.StatusSection.getTableId((Key) entry.getKey(), text2);
                    log.info("Processing replication status record for " + text + " on table " + text2);
                    try {
                        if (shouldCreateWork(Replication.Status.parseFrom(((Value) entry.getValue()).get())) && null != (tableConfiguration = this.context.getServerConfigurationFactory().getTableConfiguration(text2.toString()))) {
                            Map<String, String> replicationTargets = getReplicationTargets(tableConfiguration);
                            if (replicationTargets.isEmpty()) {
                                log.warn("No configured targets for table with ID {}", text2);
                            } else {
                                start = Trace.start("createWorkMutations");
                                try {
                                    addWorkRecord(text, (Value) entry.getValue(), replicationTargets, text2.toString());
                                    start.stop();
                                } finally {
                                    start.stop();
                                }
                            }
                        }
                    } catch (InvalidProtocolBufferException e) {
                        log.error("Could not parse protobuf for {} from table {}", text, text2);
                    }
                }
                start.stop();
            } catch (ReplicationTableOfflineException e2) {
                log.warn("Replication table was online, but not anymore");
                this.writer = null;
            }
        } catch (Throwable th) {
            start.stop();
            throw th;
        }
    }

    protected void setBatchWriter(BatchWriter batchWriter) {
        this.writer = batchWriter;
    }

    protected Map<String, String> getReplicationTargets(TableConfiguration tableConfiguration) {
        Map allPropertiesWithPrefix = tableConfiguration.getAllPropertiesWithPrefix(Property.TABLE_REPLICATION_TARGET);
        HashMap hashMap = new HashMap();
        int length = Property.TABLE_REPLICATION_TARGET.getKey().length();
        for (Map.Entry entry : allPropertiesWithPrefix.entrySet()) {
            hashMap.put(((String) entry.getKey()).substring(length), entry.getValue());
        }
        return hashMap;
    }

    protected boolean shouldCreateWork(Replication.Status status) {
        return StatusUtil.isWorkRequired(status);
    }

    protected void addWorkRecord(Text text, Value value, Map<String, String> map, String str) {
        log.info("Adding work records for " + text + " to targets " + map);
        try {
            try {
                Mutation mutation = new Mutation(text);
                ReplicationTarget replicationTarget = new ReplicationTarget();
                DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                Text text2 = new Text();
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    dataOutputBuffer.reset();
                    replicationTarget.setPeerName(entry.getKey());
                    replicationTarget.setRemoteIdentifier(entry.getValue());
                    replicationTarget.setSourceTableId(str);
                    replicationTarget.write(dataOutputBuffer);
                    text2.set(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
                    ReplicationSchema.WorkSection.add(mutation, text2, value);
                }
                try {
                    this.writer.addMutation(mutation);
                } catch (MutationsRejectedException e) {
                    log.warn("Failed to write work mutations for replication, will retry", e);
                }
            } catch (IOException e2) {
                log.warn("Failed to serialize data to Text, will retry", e2);
                try {
                    this.writer.flush();
                } catch (MutationsRejectedException e3) {
                    log.warn("Failed to write work mutations for replication, will retry", e3);
                }
            }
        } finally {
            try {
                this.writer.flush();
            } catch (MutationsRejectedException e4) {
                log.warn("Failed to write work mutations for replication, will retry", e4);
            }
        }
    }
}
