/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.api.upload;

import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZKOperator.class);
    private final PinotHelixResourceManager _pinotHelixResourceManager;
    private final ControllerConf _controllerConf;
    private final ControllerMetrics _controllerMetrics;

    public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._controllerConf = controllerConf;
        this._controllerMetrics = controllerMetrics;
    }

    public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadDownloadClient.FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers) throws Exception {
        String segmentName = segmentMetadata.getName();
        boolean refreshOnly = Boolean.parseBoolean(headers.getHeaderString("REFRESH_ONLY"));
        ZNRecord existingSegmentMetadataZNRecord = this._pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
        if (existingSegmentMetadataZNRecord != null && this.shouldProcessAsNewSegment(tableNameWithType, segmentName, existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
            LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", (Object)tableNameWithType, (Object)segmentName);
            Preconditions.checkState((boolean)this._pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), (String)"Failed to remove segment ZK metadata for table: %s, segment: %s", (Object)tableNameWithType, (Object)segmentName);
            existingSegmentMetadataZNRecord = null;
        }
        if (existingSegmentMetadataZNRecord == null) {
            if (refreshOnly) {
                throw new ControllerApplicationException(LOGGER, String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), Response.Status.GONE);
            }
            LOGGER.info("Adding new segment: {} to table: {}", (Object)segmentName, (Object)tableNameWithType);
            this.processNewSegment(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection, headers);
        } else {
            if (!allowRefresh) {
                throw new ControllerApplicationException(LOGGER, String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, tableNameWithType), Response.Status.CONFLICT);
            }
            LOGGER.info("Segment: {} already exists in table: {}, refreshing it", (Object)segmentName, (Object)tableNameWithType);
            this.processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord, finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection, headers);
        }
    }

    private boolean shouldProcessAsNewSegment(String tableNameWithType, String segmentName, ZNRecord existingSegmentMetadataZNRecord, boolean enableParallelPushProtection) {
        long segmentUploadStartTime;
        IdealState idealState = this._pinotHelixResourceManager.getTableIdealState(tableNameWithType);
        Preconditions.checkState((idealState != null ? 1 : 0) != 0, (String)"Failed to find ideal state for table: %s", (Object)tableNameWithType);
        if (idealState.getInstanceStateMap(segmentName) != null) {
            return false;
        }
        if (enableParallelPushProtection && (segmentUploadStartTime = new SegmentZKMetadata(existingSegmentMetadataZNRecord).getSegmentUploadStartTime()) > 0L) {
            this.handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime);
        }
        return true;
    }

    private void handleParallelPush(String tableNameWithType, String segmentName, long segmentUploadStartTime) {
        assert (segmentUploadStartTime > 0L);
        if (System.currentTimeMillis() - segmentUploadStartTime <= this._controllerConf.getSegmentUploadTimeoutInMillis()) {
            throw new ControllerApplicationException(LOGGER, String.format("Another segment upload is in progress for segment: %s of table: %s, retry later", segmentName, tableNameWithType), Response.Status.CONFLICT);
        }
        LOGGER.error("Segment: {} of table: {} was not properly uploaded, replacing it", (Object)segmentName, (Object)tableNameWithType);
        this._controllerMetrics.addMeteredGlobalValue((AbstractMetrics.Meter)ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED, 1L);
    }

    private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadDownloadClient.FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) throws Exception {
        String segmentName = segmentMetadata.getName();
        int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord);
        long existingCrc = segmentZKMetadata.getCrc();
        this.checkCRC(headers, tableNameWithType, segmentName, existingCrc);
        if (enableParallelPushProtection) {
            long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime();
            if (segmentUploadStartTime > 0L) {
                this.handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime);
            }
            segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis());
            if (!this._pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
                throw new ControllerApplicationException(LOGGER, String.format("Failed to lock the segment: %s of table: %s, retry later", segmentName, tableNameWithType), Response.Status.CONFLICT);
            }
            ++expectedVersion;
        }
        segmentZKMetadata.setSegmentUploadStartTime(-1L);
        try {
            String customMapModifierStr = headers.getHeaderString("Pinot-SegmentZKMetadataCustomMapModifier");
            SegmentZKMetadataCustomMapModifier customMapModifier = customMapModifierStr != null ? new SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null;
            long newCrc = Long.parseLong(segmentMetadata.getCrc());
            if (newCrc == existingCrc) {
                LOGGER.info("New segment crc '{}' is the same as existing segment crc for segment '{}'. Updating ZK metadata without refreshing the segment.", (Object)newCrc, (Object)segmentName);
                segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
                segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
                if (customMapModifier != null) {
                    segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
                } else {
                    segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
                }
                if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) {
                    LOGGER.info("Updating segment download url from: {} to: {} even though crc is the same", (Object)segmentZKMetadata.getDownloadUrl(), (Object)segmentDownloadURIStr);
                    segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr);
                    if (finalSegmentLocationURI != null) {
                        this.copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, finalSegmentLocationURI);
                    }
                }
                if (!this._pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
                    throw new RuntimeException(String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", segmentName, tableNameWithType, expectedVersion));
                }
            } else {
                LOGGER.info("New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing segment {}", new Object[]{newCrc, existingCrc, segmentName});
                if (finalSegmentLocationURI != null) {
                    this.copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, finalSegmentLocationURI);
                }
                if (customMapModifier == null) {
                    segmentZKMetadata.setCustomMap(null);
                    ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, segmentDownloadURIStr, crypterName, segmentSizeInBytes);
                } else {
                    ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, segmentDownloadURIStr, crypterName, segmentSizeInBytes);
                    segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
                }
                if (!this._pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
                    throw new RuntimeException(String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", segmentName, tableNameWithType, expectedVersion));
                }
                LOGGER.info("Updated segment: {} of table: {} to property store", (Object)segmentName, (Object)tableNameWithType);
                this._pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
            }
        }
        catch (Exception e) {
            if (!this._pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
                LOGGER.error("Failed to update ZK metadata for segment: {}, table: {}, expected version: {}", new Object[]{segmentName, tableNameWithType, expectedVersion});
            }
            throw e;
        }
    }

    private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) {
        String expectedCrcStr = headers.getHeaderString("If-Match");
        if (expectedCrcStr != null) {
            long expectedCrc;
            try {
                expectedCrc = Long.parseLong(expectedCrcStr);
            }
            catch (NumberFormatException e) {
                throw new ControllerApplicationException(LOGGER, String.format("Caught exception for segment: %s of table: %s while parsing IF-MATCH CRC: \"%s\"", segmentName, tableNameWithType, expectedCrcStr), Response.Status.PRECONDITION_FAILED);
            }
            if (expectedCrc != existingCrc) {
                throw new ControllerApplicationException(LOGGER, String.format("For segment: %s of table: %s, expected CRC: %d does not match existing CRC: %d", segmentName, tableNameWithType, expectedCrc, existingCrc), Response.Status.PRECONDITION_FAILED);
            }
        }
    }

    private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadDownloadClient.FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) throws Exception {
        String segmentZKMetadataCustomMapModifierStr;
        SegmentZKMetadata newSegmentZKMetadata;
        String segmentName = segmentMetadata.getName();
        try {
            newSegmentZKMetadata = ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, segmentDownloadURIStr, crypterName, segmentSizeInBytes);
        }
        catch (IllegalArgumentException e) {
            throw new ControllerApplicationException(LOGGER, String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName, tableNameWithType, e.getMessage()), Response.Status.BAD_REQUEST);
        }
        long segmentUploadStartTime = System.currentTimeMillis();
        if (enableParallelPushProtection) {
            newSegmentZKMetadata.setSegmentUploadStartTime(segmentUploadStartTime);
        }
        String string = segmentZKMetadataCustomMapModifierStr = headers != null ? headers.getHeaderString("Pinot-SegmentZKMetadataCustomMapModifier") : null;
        if (segmentZKMetadataCustomMapModifierStr != null) {
            SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = new SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr);
            newSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap()));
        }
        if (!this._pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) {
            throw new RuntimeException(String.format("Failed to create ZK metadata for segment: %s of table: %s", segmentName, tableNameWithType));
        }
        if (finalSegmentLocationURI != null) {
            try {
                this.copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, finalSegmentLocationURI);
            }
            catch (Exception e) {
                LOGGER.error("Could not move segment {} from table {} to permanent directory", new Object[]{segmentName, tableNameWithType, e});
                this.deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime, enableParallelPushProtection);
                throw e;
            }
        }
        try {
            this._pinotHelixResourceManager.assignTableSegment(tableNameWithType, segmentMetadata.getName());
        }
        catch (Exception e) {
            LOGGER.error("Caught exception while calling assignTableSegment for adding segment: {} to table: {}", new Object[]{segmentName, tableNameWithType, e});
            this.deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime, enableParallelPushProtection);
            throw e;
        }
        if (enableParallelPushProtection) {
            newSegmentZKMetadata.setSegmentUploadStartTime(-1L);
            if (!this._pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) {
                String errorMsg = String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType);
                LOGGER.error(errorMsg);
                this.deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime, true);
                throw new RuntimeException(errorMsg);
            }
        }
    }

    private void deleteSegmentIfNeeded(String tableNameWithType, String segmentName, long currentSegmentUploadStartTime, boolean enableParallelPushProtection) {
        ZNRecord existingSegmentMetadataZNRecord = this._pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
        if (existingSegmentMetadataZNRecord == null) {
            return;
        }
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord);
        long existingSegmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime();
        LOGGER.info("Parallel push protection is {} for segment: {}.", (Object)(enableParallelPushProtection ? "enabled" : "disabled"), (Object)segmentName);
        if (!enableParallelPushProtection || currentSegmentUploadStartTime == existingSegmentUploadStartTime) {
            this._pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName);
            LOGGER.info("Deleted zk entry and segment {} for table {}.", (Object)segmentName, (Object)tableNameWithType);
        }
    }

    private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadDownloadClient.FileUploadType uploadType, File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI) throws Exception {
        if (uploadType == FileUploadDownloadClient.FileUploadType.METADATA) {
            this.copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI);
            LOGGER.info("Copied segment: {} of table: {} to final location: {}", new Object[]{segmentName, tableNameWithType, finalSegmentLocationURI});
        } else {
            this.copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI);
            LOGGER.info("Copied segment: {} of table: {} to final location: {}", new Object[]{segmentName, tableNameWithType, finalSegmentLocationURI});
        }
    }

    private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI) throws Exception {
        LOGGER.info("Copying segment from: {} to: {}", (Object)segmentFile.getAbsolutePath(), (Object)finalSegmentLocationURI);
        PinotFSFactory.create((String)finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI);
    }

    private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI finalSegmentLocationURI) throws Exception {
        if (sourceDownloadURI.equals(finalSegmentLocationURI)) {
            LOGGER.info("Skip copying segment as sourceDownloadURI: {} is the same as finalSegmentLocationURI", (Object)sourceDownloadURI);
        } else {
            Preconditions.checkState((boolean)sourceDownloadURI.getScheme().equals(finalSegmentLocationURI.getScheme()));
            LOGGER.info("Copying segment from: {} to: {}", (Object)sourceDownloadURI, (Object)finalSegmentLocationURI);
            PinotFSFactory.create((String)finalSegmentLocationURI.getScheme()).copy(sourceDownloadURI, finalSegmentLocationURI);
        }
    }
}

