package io.cdap.cdap.data2.metadata.lineage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.DatasetId;
import io.cdap.cdap.proto.id.NamespacedEntityId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.proto.id.ProgramRunId;
import io.cdap.cdap.spi.data.StructuredRow;
import io.cdap.cdap.spi.data.StructuredTable;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.TableNotFoundException;
import io.cdap.cdap.spi.data.table.field.Field;
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.twill.api.RunId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/cdap/data2/metadata/lineage/LineageTable.class */
public class LineageTable {
    private static final Logger LOG = LoggerFactory.getLogger(LineageTable.class);
    private final StructuredTableContext structuredTableContext;
    private StructuredTable datasetTable;
    private StructuredTable programTable;

    @VisibleForTesting
    public static LineageTable create(StructuredTableContext structuredTableContext) {
        return new LineageTable(structuredTableContext);
    }

    private LineageTable(StructuredTableContext structuredTableContext) {
        this.structuredTableContext = structuredTableContext;
    }

    private StructuredTable getDatasetTable() {
        if (this.datasetTable == null) {
            try {
                this.datasetTable = this.structuredTableContext.getTable(StoreDefinition.LineageStore.DATASET_LINEAGE_TABLE);
            } catch (TableNotFoundException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
        return this.datasetTable;
    }

    private StructuredTable getProgramTable() {
        if (this.programTable == null) {
            try {
                this.programTable = this.structuredTableContext.getTable(StoreDefinition.LineageStore.PROGRAM_LINEAGE_TABLE);
            } catch (TableNotFoundException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
        return this.programTable;
    }

    @VisibleForTesting
    public void deleteAll() throws IOException {
        getDatasetTable().deleteAll(Range.all());
        getProgramTable().deleteAll(Range.all());
    }

    public void addAccess(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType, long j) throws IOException {
        LOG.trace("Recording access run={}, dataset={}, accessType={}, accessTime={}", new Object[]{programRunId, datasetId, accessType, Long.valueOf(j)});
        List<Field<?>> datasetKey = getDatasetKey(datasetId, programRunId, accessType);
        addAccessTime(datasetKey, j);
        getDatasetTable().upsert(datasetKey);
        List<Field<?>> programKey = getProgramKey(programRunId, datasetId, accessType);
        addAccessTime(programKey, j);
        getProgramTable().upsert(programKey);
    }

    public Set<NamespacedEntityId> getEntitiesForRun(ProgramRunId programRunId) throws IOException {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        CloseableIterator scan = getProgramTable().scan(Range.singleton(getRunScanStartKey(programRunId)), Integer.MAX_VALUE);
        Throwable th = null;
        while (scan.hasNext()) {
            try {
                try {
                    StructuredRow structuredRow = (StructuredRow) scan.next();
                    if (programRunId.getRun().equals(structuredRow.getString("run"))) {
                        builder.add(getProgramFromRow(structuredRow));
                        builder.add(getDatasetFromRow(structuredRow));
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return builder.build();
    }

    public Set<Relation> getRelations(DatasetId datasetId, long j, long j2, Predicate<Relation> predicate) throws IOException {
        return scanRelations(getDatasetTable(), getDatasetScanStartKey(datasetId, j2), getDatasetScanEndKey(datasetId, j), predicate);
    }

    public Set<Relation> getRelations(ProgramId programId, long j, long j2, Predicate<Relation> predicate) throws IOException {
        return scanRelations(getProgramTable(), getProgramScanStartKey(programId, j2), getProgramScanEndKey(programId, j), predicate);
    }

    @VisibleForTesting
    public List<Long> getAccessTimesForRun(ProgramRunId programRunId) throws IOException {
        ImmutableList.Builder builder = ImmutableList.builder();
        CloseableIterator scan = getProgramTable().scan(Range.singleton(getRunScanStartKey(programRunId)), Integer.MAX_VALUE);
        Throwable th = null;
        while (scan.hasNext()) {
            try {
                try {
                    StructuredRow structuredRow = (StructuredRow) scan.next();
                    if (programRunId.getRun().equals(structuredRow.getString("run"))) {
                        builder.add(structuredRow.getLong(StoreDefinition.LineageStore.ACCESS_TIME_FIELD));
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return builder.build();
    }

    private Set<Relation> scanRelations(StructuredTable structuredTable, List<Field<?>> list, List<Field<?>> list2, Predicate<Relation> predicate) throws IOException {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        CloseableIterator scan = structuredTable.scan(Range.create(list, Range.Bound.INCLUSIVE, list2, Range.Bound.INCLUSIVE), Integer.MAX_VALUE);
        Throwable th = null;
        while (scan.hasNext()) {
            try {
                try {
                    Relation relation = toRelation((StructuredRow) scan.next());
                    if (predicate.test(relation)) {
                        builder.add(relation);
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (scan != null) {
                    if (th != null) {
                        try {
                            scan.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scan.close();
                    }
                }
                throw th2;
            }
        }
        if (scan != null) {
            if (0 != 0) {
                try {
                    scan.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                scan.close();
            }
        }
        return builder.build();
    }

    private List<Field<?>> getDatasetKey(DatasetId datasetId, ProgramRunId programRunId, AccessType accessType) {
        ArrayList arrayList = new ArrayList();
        addDataset(arrayList, datasetId);
        addDataKey(arrayList, programRunId, accessType);
        return arrayList;
    }

    private void addDataKey(List<Field<?>> list, ProgramRunId programRunId, AccessType accessType) {
        list.add(Fields.longField("start_time", Long.valueOf(getInvertedStartTime(programRunId))));
        addProgram(list, programRunId.getParent());
        list.add(Fields.stringField("run", programRunId.getEntityName()));
        list.add(Fields.stringField(StoreDefinition.LineageStore.ACCESS_TYPE_FIELD, Character.toString(accessType.getType())));
    }

    private List<Field<?>> getProgramKey(ProgramRunId programRunId, DatasetId datasetId, AccessType accessType) {
        long invertedStartTime = getInvertedStartTime(programRunId);
        ArrayList arrayList = new ArrayList();
        addProgram(arrayList, programRunId.getParent());
        arrayList.add(Fields.longField("start_time", Long.valueOf(invertedStartTime)));
        addDataset(arrayList, datasetId);
        arrayList.add(Fields.stringField("run", programRunId.getEntityName()));
        arrayList.add(Fields.stringField(StoreDefinition.LineageStore.ACCESS_TYPE_FIELD, Character.toString(accessType.getType())));
        return arrayList;
    }

    private void addAccessTime(List<Field<?>> list, long j) {
        list.add(Fields.longField(StoreDefinition.LineageStore.ACCESS_TIME_FIELD, Long.valueOf(j)));
    }

    private List<Field<?>> getDatasetScanKey(DatasetId datasetId, long j) {
        long invertTime = invertTime(j);
        ArrayList arrayList = new ArrayList();
        addDataset(arrayList, datasetId);
        arrayList.add(Fields.longField("start_time", Long.valueOf(invertTime)));
        return arrayList;
    }

    private List<Field<?>> getDatasetScanStartKey(DatasetId datasetId, long j) {
        return getDatasetScanKey(datasetId, j == KeyValue.LATEST_TIMESTAMP ? j : j + 1);
    }

    private List<Field<?>> getDatasetScanEndKey(DatasetId datasetId, long j) {
        return getDatasetScanKey(datasetId, j == 0 ? j : j - 1);
    }

    private List<Field<?>> getProgramScanKey(ProgramId programId, long j) {
        long invertTime = invertTime(j);
        ArrayList arrayList = new ArrayList();
        addProgram(arrayList, programId);
        arrayList.add(Fields.longField("start_time", Long.valueOf(invertTime)));
        return arrayList;
    }

    private List<Field<?>> getProgramScanStartKey(ProgramId programId, long j) {
        return getProgramScanKey(programId, j == KeyValue.LATEST_TIMESTAMP ? j : j + 1);
    }

    private List<Field<?>> getProgramScanEndKey(ProgramId programId, long j) {
        return getProgramScanKey(programId, j == 0 ? j : j - 1);
    }

    private List<Field<?>> getRunScanStartKey(ProgramRunId programRunId) {
        ArrayList arrayList = new ArrayList();
        addProgram(arrayList, programRunId.getParent());
        arrayList.add(Fields.longField("start_time", Long.valueOf(getInvertedStartTime(programRunId))));
        return arrayList;
    }

    private void addDataset(List<Field<?>> list, DatasetId datasetId) {
        list.add(Fields.stringField("namespace", datasetId.getNamespace()));
        list.add(Fields.stringField("dataset", datasetId.getEntityName()));
    }

    private void addProgram(List<Field<?>> list, ProgramId programId) {
        list.add(Fields.stringField(StoreDefinition.LineageStore.PROGRAM_NAMESPACE_FIELD, programId.getNamespace()));
        list.add(Fields.stringField(StoreDefinition.LineageStore.PROGRAM_APPLICATION_FIELD, programId.getParent().getEntityName()));
        list.add(Fields.stringField("program_type", programId.getType().getCategoryName()));
        list.add(Fields.stringField("program", programId.getEntityName()));
    }

    private ProgramId getProgramFromRow(StructuredRow structuredRow) {
        return new ProgramId(structuredRow.getString(StoreDefinition.LineageStore.PROGRAM_NAMESPACE_FIELD), structuredRow.getString(StoreDefinition.LineageStore.PROGRAM_APPLICATION_FIELD), ProgramType.valueOfCategoryName(structuredRow.getString("program_type")), structuredRow.getString("program"));
    }

    private DatasetId getDatasetFromRow(StructuredRow structuredRow) {
        return new DatasetId(structuredRow.getString("namespace"), structuredRow.getString("dataset"));
    }

    private long invertTime(long j) {
        return KeyValue.LATEST_TIMESTAMP - j;
    }

    private long getInvertedStartTime(ProgramRunId programRunId) {
        return invertTime(RunIds.getTime(RunIds.fromString(programRunId.getRun()), TimeUnit.MILLISECONDS));
    }

    private Relation toRelation(StructuredRow structuredRow) {
        RunId fromString = RunIds.fromString(structuredRow.getString("run"));
        LOG.trace("Got runId {}", fromString);
        AccessType fromType = AccessType.fromType(structuredRow.getString(StoreDefinition.LineageStore.ACCESS_TYPE_FIELD).charAt(0));
        LOG.trace("Got access type {}", fromType);
        DatasetId datasetFromRow = getDatasetFromRow(structuredRow);
        LOG.trace("Got datasetInstance {}", datasetFromRow);
        ProgramId programFromRow = getProgramFromRow(structuredRow);
        LOG.trace("Got program {}", programFromRow);
        return new Relation(datasetFromRow, programFromRow, fromType, fromString, Collections.emptySet());
    }
}
