/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BlobRecoveryITCase {
    private File recoveryDir;

    @Before
    public void setUp() throws Exception {
        this.recoveryDir = new File(FileUtils.getTempDirectory(), "BlobRecoveryITCaseDir");
        if (!this.recoveryDir.exists() && !this.recoveryDir.mkdirs()) {
            throw new IllegalStateException("Failed to create temp directory for test");
        }
    }

    @After
    public void cleanUp() throws Exception {
        if (this.recoveryDir != null) {
            FileUtils.deleteDirectory((File)this.recoveryDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlobServerRecovery() throws Exception {
        Random rand = new Random();
        BlobServer[] server = new BlobServer[2];
        InetSocketAddress[] serverAddress = new InetSocketAddress[2];
        BlobClient client = null;
        try {
            int j;
            int i;
            byte[] actual;
            BlobServer[] config = new Configuration();
            config.setString("recovery.mode", "ZOOKEEPER");
            config.setString("state.backend", "FILESYSTEM");
            config.setString("recovery.zookeeper.storageDir", this.recoveryDir.getPath());
            for (int i2 = 0; i2 < server.length; ++i2) {
                server[i2] = new BlobServer((Configuration)config);
                serverAddress[i2] = new InetSocketAddress("localhost", server[i2].getPort());
            }
            client = new BlobClient(serverAddress[0]);
            byte[] expected = new byte[1024];
            rand.nextBytes(expected);
            BlobKey[] keys = new BlobKey[]{client.put(expected), client.put(expected, 32, 256)};
            JobID[] jobId = new JobID[]{new JobID(), new JobID()};
            String[] testKey = new String[]{"test-key-1", "test-key-2"};
            client.put(jobId[0], testKey[0], expected);
            client.put(jobId[1], testKey[1], expected, 32, 256);
            client.close();
            client = new BlobClient(serverAddress[1]);
            try (InputStream is = client.get(keys[0]);){
                actual = new byte[expected.length];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)expected.length, null);
                for (i = 0; i < expected.length; ++i) {
                    Assert.assertEquals((long)expected[i], (long)actual[i]);
                }
            }
            is = client.get(keys[1]);
            var11_14 = null;
            try {
                actual = new byte[256];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)256, null);
                i = 32;
                j = 0;
                while (i < 256) {
                    Assert.assertEquals((long)expected[i], (long)actual[j]);
                    ++i;
                    ++j;
                }
            }
            catch (Throwable actual2) {
                var11_14 = actual2;
                throw actual2;
            }
            finally {
                if (is != null) {
                    if (var11_14 != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable actual2) {
                            var11_14.addSuppressed(actual2);
                        }
                    } else {
                        is.close();
                    }
                }
            }
            is = client.get(jobId[0], testKey[0]);
            var11_14 = null;
            try {
                actual = new byte[expected.length];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)expected.length, null);
                for (i = 0; i < expected.length; ++i) {
                    Assert.assertEquals((long)expected[i], (long)actual[i]);
                }
            }
            catch (Throwable actual3) {
                var11_14 = actual3;
                throw actual3;
            }
            finally {
                if (is != null) {
                    if (var11_14 != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable actual3) {
                            var11_14.addSuppressed(actual3);
                        }
                    } else {
                        is.close();
                    }
                }
            }
            is = client.get(jobId[1], testKey[1]);
            var11_14 = null;
            try {
                actual = new byte[256];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)256, null);
                i = 32;
                j = 0;
                while (i < 256) {
                    Assert.assertEquals((long)expected[i], (long)actual[j]);
                    ++i;
                    ++j;
                }
            }
            catch (Throwable throwable) {
                var11_14 = throwable;
                throw throwable;
            }
            finally {
                if (is != null) {
                    if (var11_14 != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable throwable) {
                            var11_14.addSuppressed(throwable);
                        }
                    } else {
                        is.close();
                    }
                }
            }
        }
        finally {
            for (BlobServer s : server) {
                if (s == null) continue;
                s.shutdown();
            }
            if (client != null) {
                client.close();
            }
        }
        Object[] recoveryFiles = this.recoveryDir.listFiles();
        Assert.assertEquals((String)("Unclean state backend: " + Arrays.toString(recoveryFiles)), (long)0L, (long)recoveryFiles.length);
    }
}

