/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.InputStream;
import java.util.Random;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Assert;
import org.junit.Test;

public class FsCheckpointStateOutputStreamTest {
    private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());

    @Test(expected=IllegalArgumentException.class)
    public void testWrongParameters() {
        new FsStateBackend.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000);
    }

    @Test
    public void testEmptyState() throws Exception {
        FsStateBackend.FsCheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assert.assertTrue((boolean)(handle instanceof ByteStreamStateHandle));
        InputStream inStream = (InputStream)handle.getState(ClassLoader.getSystemClassLoader());
        Assert.assertEquals((long)-1L, (long)inStream.read());
    }

    @Test
    public void testStateBlowMemThreshold() throws Exception {
        this.runTest(222, 999, 512, false);
    }

    @Test
    public void testStateOneBufferAboveThreshold() throws Exception {
        this.runTest(896, 1024, 15, true);
    }

    @Test
    public void testStateAboveMemThreshold() throws Exception {
        this.runTest(576446, 259, 17, true);
    }

    @Test
    public void testZeroThreshold() throws Exception {
        this.runTest(16678, 4096, 0, true);
    }

    private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
        FsStateBackend.FsCheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold);
        Random rnd = new Random();
        byte[] original = new byte[numBytes];
        byte[] bytes = new byte[original.length];
        rnd.nextBytes(original);
        System.arraycopy(original, 0, bytes, 0, original.length);
        int pos = 0;
        while (pos < bytes.length) {
            boolean single = rnd.nextBoolean();
            if (single) {
                stream.write((int)bytes[pos++]);
                continue;
            }
            int num = rnd.nextInt(Math.min(10, bytes.length - pos));
            stream.write(bytes, pos, num);
            pos += num;
        }
        StreamStateHandle handle = stream.closeAndGetHandle();
        if (expectFile) {
            Assert.assertTrue((boolean)(handle instanceof FileStreamStateHandle));
        } else {
            Assert.assertTrue((boolean)(handle instanceof ByteStreamStateHandle));
        }
        Assert.assertArrayEquals((byte[])original, (byte[])bytes);
        InputStream inStream = (InputStream)handle.getState(ClassLoader.getSystemClassLoader());
        byte[] validation = new byte[bytes.length];
        int bytesRead = inStream.read(validation);
        Assert.assertEquals((long)numBytes, (long)bytesRead);
        Assert.assertEquals((long)-1L, (long)inStream.read());
        Assert.assertArrayEquals((byte[])bytes, (byte[])validation);
        handle.discardState();
    }
}

