package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobServer.class */
public final class BlobServer extends Thread implements BlobService {
    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
    static final int BUFFER_SIZE = 4096;
    static final int MAX_KEY_LENGTH = 64;
    static final byte PUT_OPERATION = 0;
    static final byte GET_OPERATION = 1;
    static final byte DELETE_OPERATION = 2;
    private final ServerSocket serverSocket;
    private final Thread shutdownHook;
    private final File storageDir;
    private final AtomicInteger tempFileCounter = new AtomicInteger(PUT_OPERATION);
    private AtomicBoolean shutdownRequested = new AtomicBoolean();

    public BlobServer(Configuration configuration) throws IOException {
        this.storageDir = BlobUtils.initStorageDirectory(configuration.getString("blob.storage.directory", (String) null));
        LOG.info("Created BLOB server storage directory {}", this.storageDir);
        this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        try {
            this.serverSocket = new ServerSocket(PUT_OPERATION);
            start();
            if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Started BLOB server on port %d", Integer.valueOf(this.serverSocket.getLocalPort())));
            }
        } catch (IOException e) {
            throw new IOException("Could not create BlobServer with random port.", e);
        }
    }

    public int getServerPort() {
        return this.serverSocket.getLocalPort();
    }

    public File getStorageLocation(BlobKey blobKey) {
        return BlobUtils.getStorageLocation(this.storageDir, blobKey);
    }

    public File getStorageLocation(JobID jobID, String str) {
        return BlobUtils.getStorageLocation(this.storageDir, jobID, str);
    }

    public void deleteJobDirectory(JobID jobID) throws IOException {
        BlobUtils.deleteJobDirectory(this.storageDir, jobID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File getTemporaryFilename() {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir), String.format("temp-%08d", Integer.valueOf(this.tempFileCounter.getAndIncrement())));
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.shutdownRequested.get()) {
            try {
                new BlobConnection(this.serverSocket.accept(), this).start();
            } catch (Throwable th) {
                if (this.shutdownRequested.get()) {
                    return;
                }
                LOG.error("BLOB server stopped working. Shutting down", th);
                shutdown();
                return;
            }
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public void shutdown() {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            try {
                this.serverSocket.close();
            } catch (IOException e) {
                LOG.debug("Error while closing the server socket.", e);
            }
            try {
                join();
            } catch (InterruptedException e2) {
                LOG.debug("Error while waiting for this thread to die.", e2);
            }
            try {
                FileUtils.deleteDirectory(this.storageDir);
            } catch (IOException e3) {
                LOG.error("BLOB server failed to properly clean up its storage directory.");
            }
            if (this.shutdownHook == null || this.shutdownHook == Thread.currentThread()) {
                return;
            }
            try {
                Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
            } catch (IllegalStateException e4) {
            } catch (Throwable th) {
                LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
            }
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public URL getURL(BlobKey blobKey) throws IOException {
        if (blobKey == null) {
            throw new IllegalArgumentException("Required BLOB cannot be null.");
        }
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir, blobKey);
        if (storageLocation.exists()) {
            return storageLocation.toURI().toURL();
        }
        throw new FileNotFoundException("File " + storageLocation.getCanonicalPath() + " does not exist.");
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public void delete(BlobKey blobKey) throws IOException {
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir, blobKey);
        if (storageLocation.exists()) {
            storageLocation.delete();
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public int getPort() {
        return getServerPort();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void writeLength(int i, byte[] bArr, OutputStream outputStream) throws IOException {
        bArr[PUT_OPERATION] = (byte) (i & 255);
        bArr[GET_OPERATION] = (byte) ((i >> 8) & 255);
        bArr[2] = (byte) ((i >> 16) & 255);
        bArr[3] = (byte) ((i >> 24) & 255);
        outputStream.write(bArr, PUT_OPERATION, 4);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int readLength(byte[] bArr, InputStream inputStream) throws IOException {
        int i = PUT_OPERATION;
        while (true) {
            int i2 = i;
            if (i2 >= 4) {
                return (bArr[PUT_OPERATION] & 255) | ((bArr[GET_OPERATION] & 255) << 8) | ((bArr[2] & 255) << 16) | ((bArr[3] & 255) << 24);
            }
            int read = inputStream.read(bArr, i2, 4 - i2);
            if (read < 0) {
                throw new EOFException();
            }
            i = i2 + read;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void readFully(InputStream inputStream, byte[] bArr, int i, int i2) throws IOException {
        int i3 = PUT_OPERATION;
        while (true) {
            int i4 = i3;
            if (i4 >= i2) {
                return;
            }
            int read = inputStream.read(bArr, i + i4, i2 - i4);
            if (read < 0) {
                throw new EOFException();
            }
            i3 = i4 + read;
        }
    }
}
