/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.MessageDigest;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobInputStream;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BlobClient
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(BlobClient.class);
    private final Socket socket = new Socket();

    public BlobClient(InetSocketAddress serverAddress) throws IOException {
        try {
            this.socket.connect(serverAddress);
        }
        catch (IOException e) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("Could not connect to BlobServer at address " + serverAddress, e);
        }
    }

    @Override
    public void close() throws IOException {
        this.socket.close();
    }

    public boolean isClosed() {
        return this.socket.isClosed();
    }

    public InputStream get(JobID jobID, String key) throws IOException {
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("GET BLOB %s / \"%s\" from %s", jobID, key, this.socket.getLocalSocketAddress()));
        }
        try {
            OutputStream os = this.socket.getOutputStream();
            InputStream is = this.socket.getInputStream();
            this.sendGetHeader(os, jobID, key, null);
            this.receiveAndCheckResponse(is);
            return new BlobInputStream(is, null);
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("GET operation failed: " + t.getMessage(), t);
        }
    }

    public InputStream get(BlobKey blobKey) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("GET content addressable BLOB %s from %s", blobKey, this.socket.getLocalSocketAddress()));
        }
        try {
            OutputStream os = this.socket.getOutputStream();
            InputStream is = this.socket.getInputStream();
            this.sendGetHeader(os, null, null, blobKey);
            this.receiveAndCheckResponse(is);
            return new BlobInputStream(is, blobKey);
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("GET operation failed: " + t.getMessage(), t);
        }
    }

    private void sendGetHeader(OutputStream outputStream, JobID jobID, String key, BlobKey blobKey) throws IOException {
        outputStream.write(1);
        if (jobID == null || key == null) {
            outputStream.write(0);
            blobKey.writeToOutputStream(outputStream);
        } else {
            outputStream.write(1);
            outputStream.write(jobID.getBytes());
            byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
            BlobUtils.writeLength(keyBytes.length, outputStream);
            outputStream.write(keyBytes);
        }
    }

    private void receiveAndCheckResponse(InputStream is) throws IOException {
        int response = is.read();
        if (response < 0) {
            throw new EOFException("Premature end of response");
        }
        if (response == 1) {
            Throwable cause = BlobClient.readExceptionFromStream(is);
            throw new IOException("Server side error: " + cause.getMessage(), cause);
        }
        if (response != 0) {
            throw new IOException("Unrecognized response");
        }
    }

    public BlobKey put(byte[] value) throws IOException {
        return this.put(value, 0, value.length);
    }

    public BlobKey put(byte[] value, int offset, int len) throws IOException {
        return this.putBuffer(null, null, value, offset, len);
    }

    public void put(JobID jobId, String key, byte[] value) throws IOException {
        this.put(jobId, key, value, 0, value.length);
    }

    public void put(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        this.putBuffer(jobId, key, value, offset, len);
    }

    public void put(JobID jobId, String key, InputStream inputStream) throws IOException {
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        this.putInputStream(jobId, key, inputStream);
    }

    public BlobKey put(InputStream inputStream) throws IOException {
        return this.putInputStream(null, null, inputStream);
    }

    private BlobKey putBuffer(JobID jobId, String key, byte[] value, int offset, int len) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        if (LOG.isDebugEnabled()) {
            if (jobId == null) {
                LOG.debug(String.format("PUT content addressable BLOB buffer (%d bytes) to %s", len, this.socket.getLocalSocketAddress()));
            } else {
                LOG.debug(String.format("PUT BLOB buffer (%d bytes) under %s / \"%s\" to %s", len, jobId, key, this.socket.getLocalSocketAddress()));
            }
        }
        try {
            OutputStream os = this.socket.getOutputStream();
            MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
            this.sendPutHeader(os, jobId, key);
            int remainingBytes = len;
            while (remainingBytes > 0) {
                int bytesToSend = Math.min(65536, remainingBytes);
                BlobUtils.writeLength(bytesToSend, os);
                os.write(value, offset, bytesToSend);
                if (md != null) {
                    md.update(value, offset, bytesToSend);
                }
                remainingBytes -= bytesToSend;
                offset += bytesToSend;
            }
            BlobUtils.writeLength(-1, os);
            InputStream is = this.socket.getInputStream();
            return this.receivePutResponseAndCompare(is, md);
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("PUT operation failed: " + t.getMessage(), t);
        }
    }

    private BlobKey putInputStream(JobID jobId, String key, InputStream inputStream) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        if (LOG.isDebugEnabled()) {
            if (jobId == null) {
                LOG.debug(String.format("PUT content addressable BLOB stream to %s", this.socket.getLocalSocketAddress()));
            } else {
                LOG.debug(String.format("PUT BLOB stream under %s / \"%s\" to %s", jobId, key, this.socket.getLocalSocketAddress()));
            }
        }
        try {
            OutputStream os = this.socket.getOutputStream();
            MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null;
            byte[] xferBuf = new byte[65536];
            this.sendPutHeader(os, jobId, key);
            while (true) {
                int read;
                if ((read = inputStream.read(xferBuf)) < 0) break;
                if (read <= 0) continue;
                BlobUtils.writeLength(read, os);
                os.write(xferBuf, 0, read);
                if (md == null) continue;
                md.update(xferBuf, 0, read);
            }
            BlobUtils.writeLength(-1, os);
            InputStream is = this.socket.getInputStream();
            return this.receivePutResponseAndCompare(is, md);
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("PUT operation failed: " + t.getMessage(), t);
        }
    }

    private BlobKey receivePutResponseAndCompare(InputStream is, MessageDigest md) throws IOException {
        int response = is.read();
        if (response < 0) {
            throw new EOFException("Premature end of response");
        }
        if (response == 0) {
            if (md == null) {
                return null;
            }
            BlobKey remoteKey = BlobKey.readFromInputStream(is);
            BlobKey localKey = new BlobKey(md.digest());
            if (!localKey.equals(remoteKey)) {
                throw new IOException("Detected data corruption during transfer");
            }
            return localKey;
        }
        if (response == 1) {
            Throwable cause = BlobClient.readExceptionFromStream(is);
            throw new IOException("Server side error: " + cause.getMessage(), cause);
        }
        throw new IOException("Unrecognized response");
    }

    private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) throws IOException {
        if (!(jobID == null && key == null || jobID != null && key != null)) {
            throw new IllegalArgumentException();
        }
        outputStream.write(0);
        if (jobID == null) {
            outputStream.write(0);
        } else {
            outputStream.write(1);
            byte[] idBytes = jobID.getBytes();
            byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
            outputStream.write(idBytes);
            BlobUtils.writeLength(keyBytes.length, outputStream);
            outputStream.write(keyBytes);
        }
    }

    public void delete(BlobKey key) throws IOException {
        if (key == null) {
            throw new IllegalArgumentException("BLOB key must not be null");
        }
        this.deleteInternal(null, null, key);
    }

    public void delete(JobID jobId, String key) throws IOException {
        if (jobId == null) {
            throw new IllegalArgumentException("JobID must not be null");
        }
        if (key == null) {
            throw new IllegalArgumentException("Key must not be null");
        }
        if (key.length() > 64) {
            throw new IllegalArgumentException("Keys must not be longer than 64");
        }
        this.deleteInternal(jobId, key, null);
    }

    public void deleteAll(JobID jobId) throws IOException {
        if (jobId == null) {
            throw new IllegalArgumentException("Argument jobID must not be null");
        }
        this.deleteInternal(jobId, null, null);
    }

    private void deleteInternal(JobID jobId, String key, BlobKey bKey) throws IOException {
        if (jobId != null && bKey != null || jobId == null && bKey == null) {
            throw new IllegalArgumentException();
        }
        try {
            byte[] idBytes;
            OutputStream outputStream = this.socket.getOutputStream();
            InputStream inputStream = this.socket.getInputStream();
            outputStream.write(2);
            if (jobId == null) {
                outputStream.write(0);
                bKey.writeToOutputStream(outputStream);
            } else if (key != null) {
                outputStream.write(1);
                idBytes = jobId.getBytes();
                byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET);
                outputStream.write(idBytes);
                BlobUtils.writeLength(keyBytes.length, outputStream);
                outputStream.write(keyBytes);
            } else {
                outputStream.write(2);
                idBytes = jobId.getBytes();
                outputStream.write(idBytes);
            }
            int response = inputStream.read();
            if (response < 0) {
                throw new EOFException("Premature end of response");
            }
            if (response == 1) {
                Throwable cause = BlobClient.readExceptionFromStream(inputStream);
                throw new IOException("Server side error: " + cause.getMessage(), cause);
            }
            if (response != 0) {
                throw new IOException("Unrecognized response");
            }
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("DELETE operation failed: " + t.getMessage(), t);
        }
    }

    private static Throwable readExceptionFromStream(InputStream in) throws IOException {
        int len = BlobUtils.readLength(in);
        byte[] bytes = new byte[len];
        BlobUtils.readFully(in, bytes, 0, len, "Error message");
        try {
            return (Throwable)InstantiationUtil.deserializeObject((byte[])bytes, (ClassLoader)ClassLoader.getSystemClassLoader());
        }
        catch (ClassNotFoundException e) {
            throw new IOException("Could not transfer error message", e);
        }
    }
}

