package io.cdap.cdap.data2.metadata.lineage.field;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.api.lineage.field.EndPoint;
import io.cdap.cdap.api.lineage.field.Operation;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import io.cdap.cdap.proto.codec.OperationTypeAdapter;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.proto.metadata.lineage.ProgramRunOperations;
import io.cdap.cdap.spi.data.StructuredRow;
import io.cdap.cdap.spi.data.StructuredTable;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.table.field.Field;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.class */
public class FieldLineageTable {
    private static final String INCOMING_DIRECTION_MARKER = "i";
    private static final String OUTGOING_DIRECTION_MARKER = "o";
    private final StructuredTableContext structuredTableContext;
    private StructuredTable endpointChecksumTable;
    private StructuredTable operationsTable;
    private StructuredTable destinationFieldsTable;
    private StructuredTable summaryFieldsTable;
    private static final Logger LOG = LoggerFactory.getLogger(FieldLineageTable.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Operation.class, new OperationTypeAdapter()).create();
    private static final Type SET_FIELD_TYPE = new TypeToken<HashSet<String>>() { // from class: io.cdap.cdap.data2.metadata.lineage.field.FieldLineageTable.1
    }.getType();
    private static final Type SET_ENDPOINT_FIELD_TYPE = new TypeToken<HashSet<EndPointField>>() { // from class: io.cdap.cdap.data2.metadata.lineage.field.FieldLineageTable.2
    }.getType();
    private static final Type SET_OPERATION_TYPE = new TypeToken<HashSet<Operation>>() { // from class: io.cdap.cdap.data2.metadata.lineage.field.FieldLineageTable.3
    }.getType();

    private FieldLineageTable(StructuredTableContext structuredTableContext) {
        this.structuredTableContext = structuredTableContext;
    }

    @VisibleForTesting
    public static FieldLineageTable create(StructuredTableContext structuredTableContext) {
        return new FieldLineageTable(structuredTableContext);
    }

    private StructuredTable getEndpointChecksumTable() {
        if (this.endpointChecksumTable == null) {
            this.endpointChecksumTable = this.structuredTableContext.getTable(StoreDefinition.FieldLineageStore.ENDPOINT_CHECKSUM_TABLE);
        }
        return this.endpointChecksumTable;
    }

    private StructuredTable getOperationsTable() {
        if (this.operationsTable == null) {
            this.operationsTable = this.structuredTableContext.getTable(StoreDefinition.FieldLineageStore.OPERATIONS_TABLE);
        }
        return this.operationsTable;
    }

    private StructuredTable getDestinationFieldsTable() {
        if (this.destinationFieldsTable == null) {
            this.destinationFieldsTable = this.structuredTableContext.getTable(StoreDefinition.FieldLineageStore.DESTINATION_FIELDS_TABLE);
        }
        return this.destinationFieldsTable;
    }

    private StructuredTable getSummaryFieldsTable() {
        if (this.summaryFieldsTable == null) {
            this.summaryFieldsTable = this.structuredTableContext.getTable(StoreDefinition.FieldLineageStore.SUMMARY_FIELDS_TABLE);
        }
        return this.summaryFieldsTable;
    }

    public void addFieldLineageInfo(ProgramRunId programRunId, FieldLineageInfo fieldLineageInfo) throws IOException {
        long checksum = fieldLineageInfo.getChecksum();
        if (readOperations(checksum) == null) {
            writeOperation(checksum, fieldLineageInfo.getOperations());
            for (Map.Entry<EndPoint, Set<String>> entry : fieldLineageInfo.getDestinationFields().entrySet()) {
                addDestinationEntry(checksum, entry.getKey(), GSON.toJson(entry.getValue()));
            }
            addSummary(checksum, INCOMING_DIRECTION_MARKER, fieldLineageInfo.getIncomingSummary());
            addSummary(checksum, OUTGOING_DIRECTION_MARKER, fieldLineageInfo.getOutgoingSummary());
        }
        addFieldLineageInfoReferenceRecords(programRunId, fieldLineageInfo);
    }

    @VisibleForTesting
    public void deleteAll() throws IOException {
        getEndpointChecksumTable().deleteAll(Range.all());
        getDestinationFieldsTable().deleteAll(Range.all());
        getOperationsTable().deleteAll(Range.all());
        getSummaryFieldsTable().deleteAll(Range.all());
    }

    @Nullable
    private Set<Operation> readOperations(long j) throws IOException {
        Optional read = getOperationsTable().read(getOperationsKey(j));
        if (read.isPresent()) {
            return (Set) GSON.fromJson(((StructuredRow) read.get()).getString(StoreDefinition.FieldLineageStore.OPERATIONS_FIELD), SET_OPERATION_TYPE);
        }
        return null;
    }

    private void writeOperation(long j, Set<Operation> set) throws IOException {
        List<Field<?>> operationsKey = getOperationsKey(j);
        operationsKey.add(Fields.stringField(StoreDefinition.FieldLineageStore.OPERATIONS_FIELD, GSON.toJson(set)));
        getOperationsTable().upsert(operationsKey);
    }

    private void addSummary(long j, String str, Map<EndPointField, Set<EndPointField>> map) throws IOException {
        for (Map.Entry<EndPointField, Set<EndPointField>> entry : map.entrySet()) {
            addSummaryEntry(j, str, entry.getKey(), GSON.toJson(entry.getValue()));
        }
    }

    private void addFieldLineageInfoReferenceRecords(ProgramRunId programRunId, FieldLineageInfo fieldLineageInfo) throws IOException {
        Iterator<EndPoint> it = fieldLineageInfo.getDestinations().iterator();
        while (it.hasNext()) {
            addOperationReferenceRecord(INCOMING_DIRECTION_MARKER, it.next(), programRunId, fieldLineageInfo.getChecksum());
        }
        Iterator<EndPoint> it2 = fieldLineageInfo.getSources().iterator();
        while (it2.hasNext()) {
            addOperationReferenceRecord(OUTGOING_DIRECTION_MARKER, it2.next(), programRunId, fieldLineageInfo.getChecksum());
        }
    }

    private void addOperationReferenceRecord(String str, EndPoint endPoint, ProgramRunId programRunId, long j) throws IOException {
        List<Field<?>> operationReferenceRowKey = getOperationReferenceRowKey(str, endPoint, programRunId);
        operationReferenceRowKey.add(Fields.longField(StoreDefinition.FieldLineageStore.CHECKSUM_FIELD, Long.valueOf(j)));
        operationReferenceRowKey.add(Fields.stringField(StoreDefinition.FieldLineageStore.PROGRAM_RUN_FIELD, GSON.toJson(programRunId)));
        getEndpointChecksumTable().upsert(operationReferenceRowKey);
    }

    private void addSummaryEntry(long j, String str, EndPointField endPointField, String str2) throws IOException {
        List<Field<?>> summaryKey = getSummaryKey(j, str, endPointField);
        summaryKey.add(Fields.stringField(StoreDefinition.FieldLineageStore.DESTINATION_DATA_FIELD, str2));
        getSummaryFieldsTable().upsert(summaryKey);
    }

    private void addDestinationEntry(long j, EndPoint endPoint, String str) throws IOException {
        List<Field<?>> destinationKeys = getDestinationKeys(j, endPoint);
        destinationKeys.add(Fields.stringField(StoreDefinition.FieldLineageStore.DESTINATION_DATA_FIELD, str));
        getDestinationFieldsTable().upsert(destinationKeys);
    }

    public Set<String> getFields(EndPoint endPoint, long j, long j2) throws IOException {
        Set<String> destinationFields = getDestinationFields(endPoint, j, j2);
        destinationFields.addAll(getSourceFields(endPoint, j, j2));
        return destinationFields;
    }

    private Set<String> getDestinationFields(EndPoint endPoint, long j, long j2) throws IOException {
        Set<Long> keySet = getChecksumsWithProgramRunsInRange(INCOMING_DIRECTION_MARKER, endPoint, j, j2).keySet();
        HashSet hashSet = new HashSet();
        Iterator<Long> it = keySet.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            Optional read = getDestinationFieldsTable().read(getDestinationKeys(longValue, endPoint));
            if (read.isPresent()) {
                Set set = null;
                try {
                    set = (Set) GSON.fromJson(((StructuredRow) read.get()).getString(StoreDefinition.FieldLineageStore.DESTINATION_DATA_FIELD), SET_FIELD_TYPE);
                } catch (JsonSyntaxException e) {
                    LOG.warn(String.format("Failed to parse json from checksum %d'.", Long.valueOf(longValue)));
                }
                if (set != null) {
                    hashSet.addAll(set);
                }
            }
        }
        return hashSet;
    }

    private Set<String> getSourceFields(EndPoint endPoint, long j, long j2) throws IOException {
        Set<Long> keySet = getChecksumsWithProgramRunsInRange(OUTGOING_DIRECTION_MARKER, endPoint, j, j2).keySet();
        HashSet hashSet = new HashSet();
        Iterator<Long> it = keySet.iterator();
        while (it.hasNext()) {
            CloseableIterator scan = getSummaryFieldsTable().scan(Range.singleton(getSummaryPrefix(it.next().longValue(), OUTGOING_DIRECTION_MARKER, endPoint)), Integer.MAX_VALUE);
            Throwable th = null;
            while (scan.hasNext()) {
                try {
                    try {
                        hashSet.add(((StructuredRow) scan.next()).getString(StoreDefinition.FieldLineageStore.ENDPOINT_FIELD));
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (scan != null) {
                        if (th != null) {
                            try {
                                scan.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            scan.close();
                        }
                    }
                    throw th2;
                }
            }
            if (scan != null) {
                if (0 != 0) {
                    try {
                        scan.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    scan.close();
                }
            }
        }
        return hashSet;
    }

    public Set<EndPointField> getIncomingSummary(EndPointField endPointField, long j, long j2) throws IOException {
        return getSummary(INCOMING_DIRECTION_MARKER, endPointField, j, j2);
    }

    public Set<EndPointField> getOutgoingSummary(EndPointField endPointField, long j, long j2) throws IOException {
        return getSummary(OUTGOING_DIRECTION_MARKER, endPointField, j, j2);
    }

    private Set<EndPointField> getSummary(String str, EndPointField endPointField, long j, long j2) throws IOException {
        Set<Long> keySet = getChecksumsWithProgramRunsInRange(str, endPointField.getEndPoint(), j, j2).keySet();
        HashSet hashSet = new HashSet();
        Iterator<Long> it = keySet.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            Optional read = getSummaryFieldsTable().read(getSummaryKey(longValue, str, endPointField));
            if (read.isPresent()) {
                try {
                    Set set = (Set) GSON.fromJson(((StructuredRow) read.get()).getString(StoreDefinition.FieldLineageStore.DESTINATION_DATA_FIELD), SET_ENDPOINT_FIELD_TYPE);
                    if (set != null) {
                        hashSet.addAll(set);
                    }
                } catch (JsonSyntaxException e) {
                    LOG.warn(String.format("Failed to parse json from checksum %d.", Long.valueOf(longValue)));
                }
            }
        }
        return hashSet;
    }

    public Set<ProgramRunOperations> getIncomingOperations(EndPoint endPoint, long j, long j2) throws IOException {
        return getOperations(INCOMING_DIRECTION_MARKER, endPoint, j, j2);
    }

    public Set<ProgramRunOperations> getOutgoingOperations(EndPoint endPoint, long j, long j2) throws IOException {
        return getOperations(OUTGOING_DIRECTION_MARKER, endPoint, j, j2);
    }

    private Set<ProgramRunOperations> getOperations(String str, EndPoint endPoint, long j, long j2) throws IOException {
        Map<Long, Set<ProgramRunId>> checksumsWithProgramRunsInRange = getChecksumsWithProgramRunsInRange(str, endPoint, j, j2);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        for (Map.Entry<Long, Set<ProgramRunId>> entry : checksumsWithProgramRunsInRange.entrySet()) {
            long longValue = entry.getKey().longValue();
            Optional read = getOperationsTable().read(getOperationsKey(longValue));
            if (read.isPresent()) {
                try {
                    Set set = (Set) GSON.fromJson(((StructuredRow) read.get()).getString(StoreDefinition.FieldLineageStore.OPERATIONS_FIELD), SET_OPERATION_TYPE);
                    if (set != null) {
                        linkedHashSet.add(new ProgramRunOperations(entry.getValue(), set));
                    }
                } catch (JsonSyntaxException e) {
                    LOG.warn(String.format("Failed to parse json from checksum %d'. Ignoring operations.", Long.valueOf(longValue)));
                }
            }
        }
        return linkedHashSet;
    }

    private Map<Long, Set<ProgramRunId>> getChecksumsWithProgramRunsInRange(String str, EndPoint endPoint, long j, long j2) throws IOException {
        List<Field<?>> scanKey = getScanKey(str, endPoint, j2);
        List<Field<?>> scanKey2 = getScanKey(str, endPoint, j);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        CloseableIterator scan = getEndpointChecksumTable().scan(Range.create(scanKey, Range.Bound.INCLUSIVE, scanKey2, Range.Bound.INCLUSIVE), Integer.MAX_VALUE);
        Throwable th = null;
        while (scan.hasNext()) {
            try {
                try {
                    StructuredRow structuredRow = (StructuredRow) scan.next();
                    ((Set) linkedHashMap.computeIfAbsent(Long.valueOf(structuredRow.getLong(StoreDefinition.FieldLineageStore.CHECKSUM_FIELD).longValue()), l -> {
                        return new HashSet();
                    })).add((ProgramRunId) GSON.fromJson(structuredRow.getString(StoreDefinition.FieldLineageStore.PROGRAM_RUN_FIELD), ProgramRunId.class));
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return linkedHashMap;
    }

    private List<Field<?>> getScanKey(String str, EndPoint endPoint, long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Fields.stringField(StoreDefinition.FieldLineageStore.DIRECTION_FIELD, str));
        addEndPoint(arrayList, endPoint);
        long invertTime = invertTime(j);
        arrayList.add(Fields.longField("start_time", Long.valueOf(invertTime == KeyValue.LATEST_TIMESTAMP ? invertTime : invertTime + 1)));
        return arrayList;
    }

    private List<Field<?>> getSummaryPrefix(long j, String str, EndPoint endPoint) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Fields.longField(StoreDefinition.FieldLineageStore.CHECKSUM_FIELD, Long.valueOf(j)));
        arrayList.add(Fields.stringField(StoreDefinition.FieldLineageStore.DIRECTION_FIELD, str));
        addEndPoint(arrayList, endPoint);
        return arrayList;
    }

    private List<Field<?>> getOperationReferenceRowKey(String str, EndPoint endPoint, ProgramRunId programRunId) {
        long invertedStartTime = getInvertedStartTime(programRunId);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Fields.stringField(StoreDefinition.FieldLineageStore.DIRECTION_FIELD, str));
        addEndPoint(arrayList, endPoint);
        arrayList.add(Fields.longField("start_time", Long.valueOf(invertedStartTime)));
        return arrayList;
    }

    private List<Field<?>> getDestinationKeys(long j, EndPoint endPoint) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Fields.longField(StoreDefinition.FieldLineageStore.CHECKSUM_FIELD, Long.valueOf(j)));
        addEndPoint(arrayList, endPoint);
        return arrayList;
    }

    private long invertTime(long j) {
        return KeyValue.LATEST_TIMESTAMP - j;
    }

    private long getInvertedStartTime(ProgramRunId programRunId) {
        return invertTime(RunIds.getTime(RunIds.fromString(programRunId.getEntityName()), TimeUnit.MILLISECONDS));
    }

    private void addEndPoint(List<Field<?>> list, EndPoint endPoint) {
        list.add(Fields.stringField(StoreDefinition.FieldLineageStore.ENDPOINT_NAMESPACE_FIELD, endPoint.getNamespace()));
        list.add(Fields.stringField(StoreDefinition.FieldLineageStore.ENDPOINT_NAME_FIELD, endPoint.getName()));
    }

    private List<Field<?>> getSummaryKey(long j, String str, EndPointField endPointField) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Fields.longField(StoreDefinition.FieldLineageStore.CHECKSUM_FIELD, Long.valueOf(j)));
        arrayList.add(Fields.stringField(StoreDefinition.FieldLineageStore.DIRECTION_FIELD, str));
        addEndPoint(arrayList, endPointField.getEndPoint());
        arrayList.add(Fields.stringField(StoreDefinition.FieldLineageStore.ENDPOINT_FIELD, endPointField.getField()));
        return arrayList;
    }

    private List<Field<?>> getOperationsKey(long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Fields.longField(StoreDefinition.FieldLineageStore.CHECKSUM_FIELD, Long.valueOf(j)));
        return arrayList;
    }
}
