/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.sqlserver.jdbc.spark;

import com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$;
import com.microsoft.sqlserver.jdbc.spark.ColumnMetadata;
import com.microsoft.sqlserver.jdbc.spark.DataIOStrategy;
import com.microsoft.sqlserver.jdbc.spark.FailureInjection$;
import com.microsoft.sqlserver.jdbc.spark.SQLServerBulkJdbcOptions;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.collection.IndexedSeq;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayOps;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

public final class ReliableSingleInstanceStrategy$
extends DataIOStrategy
implements Logging {
    public static ReliableSingleInstanceStrategy$ MODULE$;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new ReliableSingleInstanceStrategy$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    @Override
    public void write(Dataset<Row> df, ColumnMetadata[] dfColMetaData, SQLServerBulkJdbcOptions options, String appId) {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "write : reliable write to single instance called");
        Connection conn = (Connection)JdbcUtils$.MODULE$.createConnectionFactory((JDBCOptions)options).apply();
        IndexedSeq<String> stagingTableList = this.getStagingTableNames(appId, df.rdd().getNumPartitions());
        this.cleanupStagingTables(conn, stagingTableList, options);
        this.createStagingTables(conn, stagingTableList, options);
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "write : Starting Phase 1 - Insert to Staging tables");
        boolean phase1Success = this.writeToStagingTables(df, dfColMetaData, options, appId);
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "write : Starting Phase 2 - Union Staging tables");
        boolean bl = phase1Success;
        if (bl) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "*** write : Initiating unionStagingTables");
            if (stagingTableList.length() > 0) {
                this.unionStagingTables(conn, stagingTableList, dfColMetaData, options);
            }
        } else {
            if (!bl) {
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "*** write : Dropping Phase 2 due to Phase 1 failure");
                this.cleanupStagingTables(conn, stagingTableList, options);
                throw new SQLException("Failed dues to non-transient error. No records written ");
            }
            throw new MatchError((Object)BoxesRunTime.boxToBoolean((boolean)bl));
        }
        this.cleanupStagingTables(conn, stagingTableList, options);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "write : Finished");
    }

    private boolean writeToStagingTables(Dataset<Row> df, ColumnMetadata[] dfColMetadata, SQLServerBulkJdbcOptions options, String appId) {
        Object object;
        boolean allSuccess = true;
        try {
            object = df.rdd().mapPartitionsWithIndex((Function2 & Serializable & scala.Serializable)(index, iterator) -> ReliableSingleInstanceStrategy$.$anonfun$writeToStagingTables$1(appId, options, dfColMetadata, BoxesRunTime.unboxToInt((Object)index), iterator), df.rdd().mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.Int()).collect();
        }
        catch (Exception ex) {
            allSuccess = false;
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("writeToStagingTables: Executor failed write to table: ").append(ex.getMessage()).toString());
            object = BoxedUnit.UNIT;
        }
        return allSuccess;
    }

    private void idempotentInsertToTable(Iterator<Row> iterator, String tableName, ColumnMetadata[] dfColMetaData, SQLServerBulkJdbcOptions options) {
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "idempotentInsertToTable : Started");
        Connection conn = (Connection)JdbcUtils$.MODULE$.createConnectionFactory((JDBCOptions)options).apply();
        try {
            BulkCopyUtils$.MODULE$.mssqlTruncateTable(conn, tableName);
        }
        catch (SQLException ex) {
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("idempotentInsertToTable : Exception during drop table:").append(ex.getMessage()).toString());
        }
        BulkCopyUtils$.MODULE$.savePartition(iterator, tableName, dfColMetaData, options);
        FailureInjection$.MODULE$.simulateRandomRestart(options);
    }

    private void unionStagingTables(Connection conn, IndexedSeq<String> stagingTableList, ColumnMetadata[] dfColMetadata, SQLServerBulkJdbcOptions options) {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "unionStagingTables: insert to final table");
        String insertStmt = this.stmtInsertWithUnion(stagingTableList, dfColMetadata, options);
        Connection conn2 = (Connection)JdbcUtils$.MODULE$.createConnectionFactory((JDBCOptions)options).apply();
        BulkCopyUtils$.MODULE$.executeUpdate(conn2, insertStmt);
    }

    /*
     * WARNING - void declaration
     */
    private IndexedSeq<String> getStagingTableNames(String appId, int nrOfPartitions) {
        void var3_3;
        scala.collection.immutable.IndexedSeq stagingTableList = (scala.collection.immutable.IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), nrOfPartitions).map((Function1 & Serializable & scala.Serializable)index -> ReliableSingleInstanceStrategy$.MODULE$.getStagingTableName(appId, BoxesRunTime.unboxToInt((Object)index)), IndexedSeq$.MODULE$.canBuildFrom());
        return var3_3;
    }

    private String getStagingTableName(String appId, int index) {
        return new StringBuilder(3).append("##").append(appId).append("_").append(index).toString();
    }

    private String stmtInsertWithUnion(IndexedSeq<String> stagingTableList, ColumnMetadata[] dfColMetadata, SQLServerBulkJdbcOptions options) {
        String string;
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("stmtInsertWithUnion: Staging tables to union are ").append(stagingTableList.mkString(",")).toString());
        String unionStr = ((TraversableOnce)stagingTableList.map((Function1 & Serializable & scala.Serializable)item -> new StringBuilder(14).append("SELECT * from ").append((String)item).toString(), scala.collection.IndexedSeq$.MODULE$.canBuildFrom())).mkString(" UNION ALL ");
        String colStr = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])dfColMetadata)).map((Function1 & Serializable & scala.Serializable)item -> item.getName(), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class))))).mkString(",");
        boolean bl = options.tableLock();
        if (bl) {
            string = new StringBuilder(28).append("INSERT INTO ").append(options.dbtable()).append(" WITH (TABLOCK) ").append(unionStr).toString();
        } else if (!bl) {
            string = new StringBuilder(13).append("INSERT INTO ").append(options.dbtable()).append(" ").append(unionStr).toString();
        } else {
            throw new MatchError((Object)BoxesRunTime.boxToBoolean((boolean)bl));
        }
        return string;
    }

    private void cleanupStagingTables(Connection conn, IndexedSeq<String> stagingTableList, SQLServerBulkJdbcOptions options) {
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("cleanupStagingTables: Tables to cleanup are ").append(stagingTableList.mkString(",")).toString());
        stagingTableList.map((Function1 & Serializable & scala.Serializable)item -> {
            ReliableSingleInstanceStrategy$.$anonfun$cleanupStagingTables$2(conn, options, item);
            return BoxedUnit.UNIT;
        }, scala.collection.IndexedSeq$.MODULE$.canBuildFrom());
    }

    private void createStagingTable(Connection conn, String tableName, SQLServerBulkJdbcOptions options) {
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("createStagingTable : Creating table ").append(tableName).append(" as schema copy of ").append(options.dbtable()).toString());
        String createTableStr = new StringBuilder(30).append("SELECT * INTO ").append(tableName).append(" From ").append(options.dbtable()).append(" WHERE 1=0").toString();
        BulkCopyUtils$.MODULE$.executeUpdate(conn, createTableStr);
    }

    private void createStagingTables(Connection conn, IndexedSeq<String> stagingTableList, SQLServerBulkJdbcOptions options) {
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("createStagingTables: Tables to create are ").append(stagingTableList.mkString(",")).toString());
        stagingTableList.map((Function1 & Serializable & scala.Serializable)item -> {
            ReliableSingleInstanceStrategy$.$anonfun$createStagingTables$2(conn, options, item);
            return BoxedUnit.UNIT;
        }, scala.collection.IndexedSeq$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ Iterator $anonfun$writeToStagingTables$1(String appId$1, SQLServerBulkJdbcOptions options$1, ColumnMetadata[] dfColMetadata$1, int index, Iterator iterator) {
        String table_name = MODULE$.getStagingTableName(appId$1, index);
        MODULE$.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(56).append("writeToStagingTables: Writing partition index ").append(index).append(" to Table ").append(table_name).toString());
        SQLServerBulkJdbcOptions newOptions = new SQLServerBulkJdbcOptions((Map<String, String>)options$1.parameters().$plus(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"tableLock"), (Object)"true")));
        MODULE$.idempotentInsertToTable((Iterator<Row>)iterator, table_name, dfColMetadata$1, newOptions);
        MODULE$.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(61).append("writeToStagingTables: Successfully wrote partition ").append(index).append(" to Table ").append(table_name).toString());
        return package$.MODULE$.Iterator().apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{1}));
    }

    public static final /* synthetic */ void $anonfun$cleanupStagingTables$2(Connection conn$1, SQLServerBulkJdbcOptions options$2, String item) {
        try {
            JdbcUtils$.MODULE$.dropTable(conn$1, item, (JDBCOptions)options$2);
        }
        catch (SQLException ex) {
            MODULE$.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("cleanupStagingTables: Exception while dropping table ").append(item).append(" :").append(ex.getMessage()).toString());
        }
    }

    public static final /* synthetic */ void $anonfun$createStagingTables$2(Connection conn$2, SQLServerBulkJdbcOptions options$4, String item) {
        try {
            MODULE$.createStagingTable(conn$2, item, options$4);
        }
        catch (SQLException ex) {
            MODULE$.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("createStagingTables: Exception while creating table ").append(item).append(" : ").append(ex.getMessage()).toString());
            throw ex;
        }
    }

    private ReliableSingleInstanceStrategy$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
    }
}

