/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSegmentManager;
import org.apache.druid.metadata.MetadataSegmentManagerConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.UnknownSegmentIdException;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadableInterval;
import org.skife.jdbi.v2.BaseResultSetMapper;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

@ManageLifecycle
public class SQLMetadataSegmentManager
implements MetadataSegmentManager {
    private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);
    private final ReentrantReadWriteLock startStopLock = new ReentrantReadWriteLock();
    private final Object pollLock = new Object();
    private final ObjectMapper jsonMapper;
    private final Supplier<MetadataSegmentManagerConfig> config;
    private final Supplier<MetadataStorageTablesConfig> dbTables;
    private final SQLMetadataConnector connector;
    @Nullable
    private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
    private long startCount = 0L;
    private long currentStartOrder = -1L;
    private ScheduledExecutorService exec = null;

    @Inject
    public SQLMetadataSegmentManager(ObjectMapper jsonMapper, Supplier<MetadataSegmentManagerConfig> config, Supplier<MetadataStorageTablesConfig> dbTables, SQLMetadataConnector connector) {
        this.jsonMapper = jsonMapper;
        this.config = config;
        this.dbTables = dbTables;
        this.connector = connector;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @LifecycleStart
    public void start() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopLock.writeLock();
        lock.lock();
        try {
            if (this.isStarted()) {
                return;
            }
            ++this.startCount;
            long localStartOrder = this.currentStartOrder = this.startCount;
            this.exec = Execs.scheduledSingleThreaded((String)"DatabaseSegmentManager-Exec--%d");
            Duration delay = ((MetadataSegmentManagerConfig)this.config.get()).getPollDuration().toStandardDuration();
            this.exec.scheduleWithFixedDelay(this.createPollTaskForStartOrder(localStartOrder), 0L, delay.getMillis(), TimeUnit.MILLISECONDS);
        }
        finally {
            lock.unlock();
        }
    }

    private Runnable createPollTaskForStartOrder(long startOrder) {
        return () -> {
            ReentrantReadWriteLock.ReadLock lock = this.startStopLock.readLock();
            lock.lock();
            try {
                if (startOrder == this.currentStartOrder) {
                    this.poll();
                } else {
                    log.debug("startOrder = currentStartOrder = %d, skipping poll()", new Object[]{startOrder});
                }
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "uncaught exception in segment manager polling thread", new Object[0]).emit();
            }
            finally {
                lock.unlock();
            }
        };
    }

    @Override
    @LifecycleStop
    public void stop() {
        ReentrantReadWriteLock.WriteLock lock = this.startStopLock.writeLock();
        lock.lock();
        try {
            if (!this.isStarted()) {
                return;
            }
            this.dataSourcesSnapshot = null;
            this.currentStartOrder = -1L;
            this.exec.shutdownNow();
            this.exec = null;
        }
        finally {
            lock.unlock();
        }
    }

    private Pair<DataSegment, Boolean> usedPayloadMapper(int index, ResultSet resultSet, StatementContext context) throws SQLException {
        try {
            return new Pair(this.jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class), (Object)resultSet.getBoolean("used"));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(String dataSource, Interval interval) {
        return (List)this.connector.inReadOnlyTransaction((handle, status) -> ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start", (Object[])new Object[]{this.getSegmentsTable(), this.connector.getQuoteString()})).setFetchSize(this.connector.getStreamingFetchSize()).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map(this::usedPayloadMapper).list());
    }

    private List<Pair<DataSegment, Boolean>> getDataSegments(String dataSource, Collection<String> segmentIds, Handle handle) {
        return segmentIds.stream().map(segmentId -> (Pair)Optional.ofNullable(((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT used, payload FROM %1$s WHERE dataSource = :dataSource AND id = :id", (Object[])new Object[]{this.getSegmentsTable()})).bind("dataSource", dataSource)).bind("id", segmentId)).map(this::usedPayloadMapper).first()).orElseThrow(() -> new UnknownSegmentIdException(StringUtils.format((String)"Cannot find segment id [%s]", (Object[])new Object[]{segmentId})))).collect(Collectors.toList());
    }

    private VersionedIntervalTimeline<String, DataSegment> buildVersionedIntervalTimeline(String dataSource, Collection<Interval> intervals, Handle handle) {
        return VersionedIntervalTimeline.forSegments(intervals.stream().flatMap(interval -> ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end AND %2$send%2$s > :start AND used = true", (Object[])new Object[]{this.getSegmentsTable(), this.connector.getQuoteString()})).setFetchSize(this.connector.getStreamingFetchSize()).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((i, resultSet, context) -> {
            try {
                return (DataSegment)this.jsonMapper.readValue(resultSet.getBytes("payload"), DataSegment.class);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).list().stream()).iterator());
    }

    @Override
    public boolean enableDataSource(String dataSource) {
        try {
            return this.enableSegments(dataSource, Intervals.ETERNITY) != 0;
        }
        catch (Exception e) {
            log.error((Throwable)e, "Exception enabling datasource %s", new Object[]{dataSource});
            return false;
        }
    }

    @Override
    public int enableSegments(String dataSource, Interval interval) {
        List<Pair<DataSegment, Boolean>> segments = this.getDataSegmentsOverlappingInterval(dataSource, interval);
        List<DataSegment> segmentsToEnable = segments.stream().filter(segment -> (Boolean)segment.rhs == false && interval.contains((ReadableInterval)((DataSegment)segment.lhs).getInterval())).map(segment -> (DataSegment)segment.lhs).collect(Collectors.toList());
        VersionedIntervalTimeline versionedIntervalTimeline = VersionedIntervalTimeline.forSegments(segments.stream().filter(segment -> (Boolean)segment.rhs).map(segment -> (DataSegment)segment.lhs).iterator());
        VersionedIntervalTimeline.addSegments((VersionedIntervalTimeline)versionedIntervalTimeline, segmentsToEnable.iterator());
        return this.enableSegments(segmentsToEnable, (VersionedIntervalTimeline<String, DataSegment>)versionedIntervalTimeline);
    }

    @Override
    public int enableSegments(String dataSource, Collection<String> segmentIds) {
        Pair data = (Pair)this.connector.inReadOnlyTransaction((handle, status) -> {
            List segments = this.getDataSegments(dataSource, segmentIds, handle).stream().filter(pair -> (Boolean)pair.rhs == false).map(pair -> (DataSegment)pair.lhs).collect(Collectors.toList());
            VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline = this.buildVersionedIntervalTimeline(dataSource, JodaUtils.condenseIntervals((Iterable)segments.stream().map(segment -> segment.getInterval()).collect(Collectors.toList())), handle);
            VersionedIntervalTimeline.addSegments(versionedIntervalTimeline, segments.iterator());
            return new Pair(segments, versionedIntervalTimeline);
        });
        return this.enableSegments((Collection)data.lhs, (VersionedIntervalTimeline<String, DataSegment>)((VersionedIntervalTimeline)data.rhs));
    }

    private int enableSegments(Collection<DataSegment> segments, VersionedIntervalTimeline<String, DataSegment> versionedIntervalTimeline) {
        if (segments.isEmpty()) {
            log.warn("No segments found to update!", new Object[0]);
            return 0;
        }
        return (Integer)this.connector.getDBI().withHandle(handle -> {
            Batch batch = handle.createBatch();
            segments.stream().map(segment -> segment.getId()).filter(segmentId -> !versionedIntervalTimeline.isOvershadowed(segmentId.getInterval(), (Object)segmentId.getVersion())).forEach(segmentId -> batch.add(StringUtils.format((String)"UPDATE %s SET used=true WHERE id = '%s'", (Object[])new Object[]{this.getSegmentsTable(), segmentId})));
            return batch.execute().length;
        });
    }

    @Override
    public boolean enableSegment(final String segmentId) {
        try {
            this.connector.getDBI().withHandle((HandleCallback)new HandleCallback<Void>(){

                public Void withHandle(Handle handle) {
                    ((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=true WHERE id = :id", (Object[])new Object[]{SQLMetadataSegmentManager.this.getSegmentsTable()})).bind("id", segmentId)).execute();
                    return null;
                }
            });
        }
        catch (Exception e) {
            log.error((Throwable)e, "Exception enabling segment %s", new Object[]{segmentId});
            return false;
        }
        return true;
    }

    @Override
    public boolean removeDataSource(String dataSource) {
        try {
            int removed = (Integer)this.connector.getDBI().withHandle(handle -> ((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=false WHERE dataSource = :dataSource", (Object[])new Object[]{this.getSegmentsTable()})).bind("dataSource", dataSource)).execute());
            if (removed == 0) {
                return false;
            }
        }
        catch (Exception e) {
            log.error((Throwable)e, "Error removing datasource %s", new Object[]{dataSource});
            return false;
        }
        return true;
    }

    @Override
    public boolean removeSegment(String segmentId) {
        try {
            return this.removeSegmentFromTable(segmentId);
        }
        catch (Exception e) {
            log.error((Throwable)e, e.toString(), new Object[0]);
            return false;
        }
    }

    @Override
    public long disableSegments(String dataSource, Collection<String> segmentIds) {
        if (segmentIds.isEmpty()) {
            return 0L;
        }
        long[] result = new long[1];
        try {
            this.connector.getDBI().withHandle(handle -> {
                Batch batch = handle.createBatch();
                segmentIds.forEach(segmentId -> batch.add(StringUtils.format((String)"UPDATE %s SET used=false WHERE datasource = '%s' AND id = '%s' ", (Object[])new Object[]{this.getSegmentsTable(), dataSource, segmentId})));
                int[] resultArr = batch.execute();
                result[0] = Arrays.stream(resultArr).filter(x -> x > 0).count();
                return result[0];
            });
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return result[0];
    }

    @Override
    public int disableSegments(String dataSource, Interval interval) {
        try {
            return (Integer)this.connector.getDBI().withHandle(handle -> ((Update)((Update)((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=false WHERE datasource = :datasource AND start >= :start AND %2$send%2$s <= :end", (Object[])new Object[]{this.getSegmentsTable(), this.connector.getQuoteString()})).bind("datasource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).execute());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean removeSegmentFromTable(String segmentId) {
        int removed = (Integer)this.connector.getDBI().withHandle(handle -> ((Update)handle.createStatement(StringUtils.format((String)"UPDATE %s SET used=false WHERE id = :segmentID", (Object[])new Object[]{this.getSegmentsTable()})).bind("segmentID", segmentId)).execute());
        return removed > 0;
    }

    @Override
    public boolean isStarted() {
        ReentrantReadWriteLock.ReadLock lock = this.startStopLock.readLock();
        lock.lock();
        try {
            boolean bl = this.currentStartOrder >= 0L;
            return bl;
        }
        finally {
            lock.unlock();
        }
    }

    @Override
    @Nullable
    public ImmutableDruidDataSource getDataSource(String dataSourceName) {
        ImmutableDruidDataSource dataSource = Optional.ofNullable(this.dataSourcesSnapshot).map(m -> m.getDataSourcesMap().get(dataSourceName)).orElse(null);
        return dataSource == null ? null : dataSource;
    }

    @Override
    @Nullable
    public Collection<ImmutableDruidDataSource> getDataSources() {
        return Optional.ofNullable(this.dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null);
    }

    @Override
    @Nullable
    public Iterable<DataSegment> iterateAllSegments() {
        Collection dataSources = Optional.ofNullable(this.dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null);
        if (dataSources == null) {
            return null;
        }
        return () -> dataSources.stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
    }

    @Override
    @Nullable
    public Set<SegmentId> getOvershadowedSegments() {
        return Optional.ofNullable(this.dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null);
    }

    @Override
    @Nullable
    public DataSourcesSnapshot getDataSourcesSnapshot() {
        return this.dataSourcesSnapshot;
    }

    @Override
    public Collection<String> getAllDataSourceNames() {
        return (Collection)this.connector.getDBI().withHandle(handle -> (List)handle.createQuery(StringUtils.format((String)"SELECT DISTINCT(datasource) FROM %s", (Object[])new Object[]{this.getSegmentsTable()})).fold(new ArrayList(), (Folder3)new Folder3<List<String>, Map<String, Object>>(){

            public List<String> fold(List<String> druidDataSources, Map<String, Object> stringObjectMap, FoldController foldController, StatementContext statementContext) {
                druidDataSources.add(MapUtils.getString(stringObjectMap, (String)"datasource"));
                return druidDataSources;
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void poll() {
        Object object = this.pollLock;
        synchronized (object) {
            try {
                this.doPoll();
            }
            catch (Exception e) {
                log.makeAlert((Throwable)e, "Problem polling DB.", new Object[0]).emit();
            }
        }
    }

    private void doPoll() {
        log.debug("Starting polling of segment table", new Object[0]);
        List<DataSegment> segments = this.connector.inReadOnlyTransaction(new TransactionCallback<List<DataSegment>>(){

            public List<DataSegment> inTransaction(Handle handle, TransactionStatus status) {
                return handle.createQuery(StringUtils.format((String)"SELECT payload FROM %s WHERE used=true", (Object[])new Object[]{SQLMetadataSegmentManager.this.getSegmentsTable()})).setFetchSize(SQLMetadataSegmentManager.this.connector.getStreamingFetchSize()).map((ResultSetMapper)new ResultSetMapper<DataSegment>(){

                    public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException {
                        try {
                            DataSegment segment = (DataSegment)SQLMetadataSegmentManager.this.jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
                            return SQLMetadataSegmentManager.this.replaceWithExistingSegmentIfPresent(segment);
                        }
                        catch (IOException e) {
                            log.makeAlert((Throwable)e, "Failed to read segment from db.", new Object[0]).emit();
                            return null;
                        }
                    }
                }).list();
            }
        });
        if (segments == null || segments.isEmpty()) {
            log.warn("No segments found in the database!", new Object[0]);
            return;
        }
        log.info("Polled and found %,d segments in the database", new Object[]{segments.size()});
        ConcurrentHashMap newDataSources = new ConcurrentHashMap();
        ImmutableMap dataSourceProperties = ImmutableMap.of((Object)"created", (Object)DateTimes.nowUtc().toString());
        segments.stream().filter(Objects::nonNull).forEach(segment -> newDataSources.computeIfAbsent(segment.getDataSource(), dsName -> new DruidDataSource((String)dsName, (Map<String, String>)dataSourceProperties)).addSegmentIfAbsent((DataSegment)segment));
        Map updatedDataSources = CollectionUtils.mapValues(newDataSources, v -> v.toImmutableDruidDataSource());
        this.dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources);
    }

    private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) {
        ImmutableDruidDataSource dataSource = Optional.ofNullable(this.dataSourcesSnapshot).map(m -> m.getDataSourcesMap().get(segment.getDataSource())).orElse(null);
        if (dataSource == null) {
            return segment;
        }
        DataSegment alreadyExistingSegment = dataSource.getSegment(segment.getId());
        return alreadyExistingSegment != null ? alreadyExistingSegment : segment;
    }

    private String getSegmentsTable() {
        return ((MetadataStorageTablesConfig)this.dbTables.get()).getSegmentsTable();
    }

    @Override
    public List<Interval> getUnusedSegmentIntervals(final String dataSource, final Interval interval, final int limit) {
        return this.connector.inReadOnlyTransaction(new TransactionCallback<List<Interval>>(){

            public List<Interval> inTransaction(Handle handle, TransactionStatus status) {
                ResultIterator iter = ((Query)((Query)((Query)handle.createQuery(StringUtils.format((String)"SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource and start >= :start and %2$send%2$s <= :end and used = false ORDER BY start, %2$send%2$s", (Object[])new Object[]{SQLMetadataSegmentManager.this.getSegmentsTable(), SQLMetadataSegmentManager.this.connector.getQuoteString()})).setFetchSize(SQLMetadataSegmentManager.this.connector.getStreamingFetchSize()).setMaxRows(limit).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map((ResultSetMapper)new BaseResultSetMapper<Interval>(){

                    protected Interval mapInternal(int index, Map<String, Object> row) {
                        return new Interval((ReadableInstant)DateTimes.of((String)((String)row.get("start"))), (ReadableInstant)DateTimes.of((String)((String)row.get("end"))));
                    }
                }).iterator();
                ArrayList result = Lists.newArrayListWithCapacity((int)limit);
                for (int i = 0; i < limit && iter.hasNext(); ++i) {
                    try {
                        result.add(iter.next());
                        continue;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                return result;
            }
        });
    }
}

