/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.ToResultSetFunction;
import com.hazelcast.jet.impl.util.Util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.annotation.Nonnull;

public final class ReadJdbcP<T>
extends AbstractProcessor {
    private final SupplierEx<? extends Connection> newConnectionFn;
    private final ToResultSetFunction resultSetFn;
    private final FunctionEx<? super ResultSet, ? extends T> mapOutputFn;
    private Connection connection;
    private ResultSet resultSet;
    private Traverser traverser;
    private int parallelism;
    private int index;

    private ReadJdbcP(@Nonnull SupplierEx<? extends Connection> newConnectionFn, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet, ? extends T> mapOutputFn) {
        this.newConnectionFn = newConnectionFn;
        this.resultSetFn = resultSetFn;
        this.mapOutputFn = mapOutputFn;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    public static <T> ProcessorMetaSupplier supplier(@Nonnull SupplierEx<? extends Connection> newConnectionFn, @Nonnull ToResultSetFunction resultSetFn, @Nonnull FunctionEx<? super ResultSet, ? extends T> mapOutputFn) {
        Util.checkSerializable(newConnectionFn, "newConnectionFn");
        Util.checkSerializable(resultSetFn, "resultSetFn");
        Util.checkSerializable(mapOutputFn, "mapOutputFn");
        return ProcessorMetaSupplier.preferLocalParallelismOne(() -> new ReadJdbcP(newConnectionFn, resultSetFn, mapOutputFn));
    }

    public static <T> ProcessorMetaSupplier supplier(@Nonnull String connectionURL, @Nonnull String query, @Nonnull FunctionEx<? super ResultSet, ? extends T> mapOutputFn) {
        Util.checkSerializable(mapOutputFn, "mapOutputFn");
        return ProcessorMetaSupplier.forceTotalParallelismOne(ProcessorSupplier.of(() -> new ReadJdbcP(() -> DriverManager.getConnection(connectionURL), (connection, parallelism, index) -> {
            PreparedStatement statement = connection.prepareStatement(query);
            try {
                return statement.executeQuery();
            }
            catch (SQLException e) {
                statement.close();
                throw e;
            }
        }, mapOutputFn)));
    }

    @Override
    protected void init(@Nonnull Processor.Context context) {
        this.connection = this.newConnectionFn.get();
        this.parallelism = context.totalParallelism();
        this.index = context.globalProcessorIndex();
    }

    @Override
    public boolean complete() {
        if (this.traverser == null) {
            this.resultSet = Util.uncheckCall(() -> this.resultSetFn.createResultSet(this.connection, this.parallelism, this.index));
            this.traverser = ((Traverser<ResultSet>)() -> Util.uncheckCall(() -> this.resultSet.next() ? this.resultSet : null)).map(this.mapOutputFn);
        }
        return this.emitFromTraverser(this.traverser);
    }

    @Override
    public void close() throws Exception {
        Exception resultSetException = null;
        Exception statementException = null;
        if (this.resultSet != null) {
            Statement statement = this.resultSet.getStatement();
            resultSetException = ReadJdbcP.close(this.resultSet);
            if (statement != null) {
                statementException = ReadJdbcP.close(statement);
            }
        }
        if (this.connection != null) {
            this.connection.close();
        }
        if (resultSetException != null) {
            throw resultSetException;
        }
        if (statementException != null) {
            throw statementException;
        }
    }

    private static Exception close(AutoCloseable closeable) {
        try {
            closeable.close();
        }
        catch (Exception e) {
            return e;
        }
        return null;
    }
}

