package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobClient.class */
public final class BlobClient implements Closeable {
    private Socket socket = new Socket();

    public BlobClient(InetSocketAddress inetSocketAddress) throws IOException {
        this.socket.connect(inetSocketAddress);
    }

    private void sendPutHeader(OutputStream outputStream, JobID jobID, String str, byte[] bArr) throws IOException {
        outputStream.write(0);
        if (jobID == null || str == null) {
            outputStream.write(1);
            return;
        }
        outputStream.write(0);
        jobID.write(ByteBuffer.wrap(bArr));
        outputStream.write(bArr);
        byte[] bytes = str.getBytes(BlobUtils.DEFAULT_CHARSET);
        BlobServer.writeLength(bytes.length, bArr, outputStream);
        outputStream.write(bytes);
    }

    public BlobKey put(byte[] bArr) throws IOException {
        return put(bArr, 0, bArr.length);
    }

    public BlobKey put(byte[] bArr, int i, int i2) throws IOException {
        return putBuffer(null, null, bArr, i, i2);
    }

    public void put(JobID jobID, String str, byte[] bArr) throws IOException {
        put(jobID, str, bArr, 0, bArr.length);
    }

    public void put(JobID jobID, String str, byte[] bArr, int i, int i2) throws IOException {
        if (str.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        putBuffer(jobID, str, bArr, i, i2);
    }

    public void put(JobID jobID, String str, InputStream inputStream) throws IOException {
        if (str.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        putInputStream(jobID, str, inputStream);
    }

    public BlobKey put(InputStream inputStream) throws IOException {
        return putInputStream(null, null, inputStream);
    }

    public void delete(JobID jobID, String str) throws IOException {
        if (jobID == null) {
            throw new IllegalArgumentException("Argument jobID must not be null");
        }
        if (str == null) {
            throw new IllegalArgumentException("Argument key must not be null");
        }
        if (str.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        deleteInternal(jobID, str);
    }

    public void deleteAll(JobID jobID) throws IOException {
        if (jobID == null) {
            throw new IllegalArgumentException("Argument jobID must not be null");
        }
        deleteInternal(jobID, null);
    }

    private void deleteInternal(JobID jobID, String str) throws IOException {
        OutputStream outputStream = this.socket.getOutputStream();
        byte[] bArr = new byte[16];
        outputStream.write(2);
        jobID.write(ByteBuffer.wrap(bArr));
        outputStream.write(bArr);
        if (str == null) {
            outputStream.write(0);
            return;
        }
        outputStream.write(1);
        byte[] bytes = str.getBytes(BlobUtils.DEFAULT_CHARSET);
        BlobServer.writeLength(bytes.length, bArr, outputStream);
        outputStream.write(bytes);
    }

    private BlobKey putBuffer(JobID jobID, String str, byte[] bArr, int i, int i2) throws IOException {
        OutputStream outputStream = this.socket.getOutputStream();
        MessageDigest createMessageDigest = (jobID == null || str == null) ? BlobUtils.createMessageDigest() : null;
        byte[] bArr2 = new byte[16];
        sendPutHeader(outputStream, jobID, str, bArr2);
        int length = bArr.length;
        int i3 = 0;
        while (true) {
            int i4 = i3;
            if (length <= 0) {
                break;
            }
            int min = Math.min(DefaultMemoryManager.MIN_PAGE_SIZE, length);
            BlobServer.writeLength(min, bArr2, outputStream);
            outputStream.write(bArr, i + i4, min);
            if (createMessageDigest != null) {
                createMessageDigest.update(bArr, i + i4, min);
            }
            length -= min;
            i3 = i4 + min;
        }
        if (createMessageDigest == null) {
            return null;
        }
        InputStream inputStream = this.socket.getInputStream();
        BlobKey blobKey = new BlobKey(createMessageDigest.digest());
        if (blobKey.equals(BlobKey.readFromInputStream(inputStream))) {
            return blobKey;
        }
        throw new IOException("Detected data corruption during transfer");
    }

    private BlobKey putInputStream(JobID jobID, String str, InputStream inputStream) throws IOException {
        OutputStream outputStream = this.socket.getOutputStream();
        MessageDigest createMessageDigest = (jobID == null || str == null) ? BlobUtils.createMessageDigest() : null;
        byte[] bArr = new byte[16];
        byte[] bArr2 = new byte[DefaultMemoryManager.MIN_PAGE_SIZE];
        sendPutHeader(outputStream, jobID, str, bArr);
        while (true) {
            int read = inputStream.read(bArr2);
            if (read < 0) {
                break;
            }
            if (read > 0) {
                BlobServer.writeLength(read, bArr, outputStream);
                outputStream.write(bArr2, 0, read);
                if (createMessageDigest != null) {
                    createMessageDigest.update(bArr2, 0, read);
                }
            }
        }
        if (createMessageDigest == null) {
            return null;
        }
        InputStream inputStream2 = this.socket.getInputStream();
        BlobKey blobKey = new BlobKey(createMessageDigest.digest());
        if (blobKey.equals(BlobKey.readFromInputStream(inputStream2))) {
            return blobKey;
        }
        throw new IOException("Detected data corruption during transfer");
    }

    public InputStream get(JobID jobID, String str) throws IOException {
        if (str.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        byte[] bArr = new byte[16];
        sendGetHeader(this.socket.getOutputStream(), jobID, str, null, bArr);
        return new BlobInputStream(this.socket.getInputStream(), null, bArr);
    }

    public InputStream get(BlobKey blobKey) throws IOException {
        byte[] bArr = new byte[16];
        sendGetHeader(this.socket.getOutputStream(), null, null, blobKey, bArr);
        return new BlobInputStream(this.socket.getInputStream(), blobKey, bArr);
    }

    private void sendGetHeader(OutputStream outputStream, JobID jobID, String str, BlobKey blobKey, byte[] bArr) throws IOException {
        outputStream.write(1);
        if (jobID == null || str == null) {
            outputStream.write(1);
            blobKey.writeToOutputStream(outputStream);
            return;
        }
        outputStream.write(0);
        jobID.write(ByteBuffer.wrap(bArr));
        outputStream.write(bArr);
        byte[] bytes = str.getBytes(BlobUtils.DEFAULT_CHARSET);
        BlobServer.writeLength(bytes.length, bArr, outputStream);
        outputStream.write(bytes);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.socket.close();
    }
}
