/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import org.apache.flink.runtime.blob.BlobInputStream;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.jobgraph.JobID;

public final class BlobClient
implements Closeable {
    private Socket socket = new Socket();

    public BlobClient(InetSocketAddress serverAddress) throws IOException {
        this.socket.connect(serverAddress);
    }

    private void sendPutHeader(OutputStream outputStream, JobID jobID, String key, byte[] buf) throws IOException {
        outputStream.write(0);
        if (jobID == null || key == null) {
            outputStream.write(1);
        } else {
            outputStream.write(0);
            ByteBuffer bb = ByteBuffer.wrap(buf);
            jobID.write(bb);
            outputStream.write(buf);
            byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
            BlobServer.writeLength(keyBytes.length, buf, outputStream);
            outputStream.write(keyBytes);
        }
    }

    public BlobKey put(byte[] value) throws IOException {
        return this.put(value, 0, value.length);
    }

    public BlobKey put(byte[] value, int offset, int len) throws IOException {
        return this.putBuffer(null, null, value, offset, len);
    }

    public void put(JobID jobId, String key, byte[] value) throws IOException {
        this.put(jobId, key, value, 0, value.length);
    }

    public void put(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        this.putBuffer(jobId, key, value, offset, len);
    }

    public void put(JobID jobId, String key, InputStream inputStream) throws IOException {
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        this.putInputStream(jobId, key, inputStream);
    }

    public BlobKey put(InputStream inputStream) throws IOException {
        return this.putInputStream(null, null, inputStream);
    }

    public void delete(JobID jobId, String key) throws IOException {
        if (jobId == null) {
            throw new IllegalArgumentException("Argument jobID must not be null");
        }
        if (key == null) {
            throw new IllegalArgumentException("Argument key must not be null");
        }
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        this.deleteInternal(jobId, key);
    }

    public void deleteAll(JobID jobId) throws IOException {
        if (jobId == null) {
            throw new IllegalArgumentException("Argument jobID must not be null");
        }
        this.deleteInternal(jobId, null);
    }

    private void deleteInternal(JobID jobId, String key) throws IOException {
        OutputStream os = this.socket.getOutputStream();
        byte[] buf = new byte[16];
        os.write(2);
        ByteBuffer bb = ByteBuffer.wrap(buf);
        jobId.write(bb);
        os.write(buf);
        if (key == null) {
            os.write(0);
        } else {
            os.write(1);
            byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
            BlobServer.writeLength(keyBytes.length, buf, os);
            os.write(keyBytes);
        }
    }

    private BlobKey putBuffer(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
        BlobKey remoteKey;
        OutputStream os = this.socket.getOutputStream();
        MessageDigest md = jobId == null || key == null ? BlobUtils.createMessageDigest() : null;
        byte[] buf = new byte[16];
        this.sendPutHeader(os, jobId, key, buf);
        int remainingBytes = value.length;
        int bytesSent = 0;
        while (remainingBytes > 0) {
            int bytesToSend = Math.min(4096, remainingBytes);
            BlobServer.writeLength(bytesToSend, buf, os);
            os.write(value, offset + bytesSent, bytesToSend);
            if (md != null) {
                md.update(value, offset + bytesSent, bytesToSend);
            }
            remainingBytes -= bytesToSend;
            bytesSent += bytesToSend;
        }
        if (md == null) {
            return null;
        }
        InputStream is = this.socket.getInputStream();
        BlobKey localKey = new BlobKey(md.digest());
        if (!localKey.equals(remoteKey = BlobKey.readFromInputStream(is))) {
            throw new IOException("Detected data corruption during transfer");
        }
        return localKey;
    }

    private BlobKey putInputStream(JobID jobId, String key, InputStream inputStream) throws IOException {
        BlobKey remoteKey;
        int read;
        OutputStream os = this.socket.getOutputStream();
        MessageDigest md = jobId == null || key == null ? BlobUtils.createMessageDigest() : null;
        byte[] buf = new byte[16];
        byte[] xferBuf = new byte[4096];
        this.sendPutHeader(os, jobId, key, buf);
        while ((read = inputStream.read(xferBuf)) >= 0) {
            if (read <= 0) continue;
            BlobServer.writeLength(read, buf, os);
            os.write(xferBuf, 0, read);
            if (md == null) continue;
            md.update(xferBuf, 0, read);
        }
        if (md == null) {
            return null;
        }
        InputStream is = this.socket.getInputStream();
        BlobKey localKey = new BlobKey(md.digest());
        if (!localKey.equals(remoteKey = BlobKey.readFromInputStream(is))) {
            throw new IOException("Detected data corruption during transfer");
        }
        return localKey;
    }

    public InputStream get(JobID jobID, String key) throws IOException {
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        OutputStream os = this.socket.getOutputStream();
        byte[] buf = new byte[16];
        this.sendGetHeader(os, jobID, key, null, buf);
        return new BlobInputStream(this.socket.getInputStream(), null, buf);
    }

    public InputStream get(BlobKey blobKey) throws IOException {
        OutputStream os = this.socket.getOutputStream();
        byte[] buf = new byte[16];
        this.sendGetHeader(os, null, null, blobKey, buf);
        return new BlobInputStream(this.socket.getInputStream(), blobKey, buf);
    }

    private void sendGetHeader(OutputStream outputStream, JobID jobID, String key, BlobKey key2, byte[] buf) throws IOException {
        outputStream.write(1);
        if (jobID == null || key == null) {
            outputStream.write(1);
            key2.writeToOutputStream(outputStream);
        } else {
            outputStream.write(0);
            ByteBuffer bb = ByteBuffer.wrap(buf);
            jobID.write(bb);
            outputStream.write(buf);
            byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
            BlobServer.writeLength(keyBytes.length, buf, outputStream);
            outputStream.write(keyBytes);
        }
    }

    @Override
    public void close() throws IOException {
        this.socket.close();
    }
}

