/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.lib.db;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.HadoopTestCase;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.hsqldb.server.Server;

public class TestDataDrivenDBInputFormat
extends HadoopTestCase {
    private static final Log LOG = LogFactory.getLog(TestDataDrivenDBInputFormat.class);
    private static final String DB_NAME = "dddbif";
    private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/dddbif";
    private static final String DRIVER_CLASS = "org.hsqldb.jdbc.JDBCDriver";
    private Server server;
    private Connection connection;
    private static final String OUT_DIR = System.getProperty("test.build.data", "/tmp") + "/dddbifout";

    public TestDataDrivenDBInputFormat() throws IOException {
        super(1, 4, 1, 1);
    }

    private void startHsqldbServer() {
        if (null == this.server) {
            this.server = new Server();
            this.server.setDatabasePath(0, System.getProperty("test.build.data", "/tmp") + "/" + DB_NAME);
            this.server.setDatabaseName(0, DB_NAME);
            this.server.start();
        }
    }

    private void createConnection(String driverClassName, String url) throws Exception {
        Class.forName(driverClassName);
        this.connection = DriverManager.getConnection(url);
        this.connection.setAutoCommit(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdown() {
        try {
            this.connection.commit();
            this.connection.close();
            this.connection = null;
        }
        catch (Throwable ex) {
            LOG.warn((Object)("Exception occurred while closing connection :" + StringUtils.stringifyException(ex)));
        }
        finally {
            try {
                if (this.server != null) {
                    this.server.shutdown();
                }
            }
            catch (Throwable ex) {
                LOG.warn((Object)("Exception occurred while shutting down HSQLDB :" + StringUtils.stringifyException(ex)));
            }
            this.server = null;
        }
    }

    private void initialize(String driverClassName, String url) throws Exception {
        this.startHsqldbServer();
        this.createConnection(driverClassName, url);
    }

    @Override
    public void setUp() throws Exception {
        this.initialize(DRIVER_CLASS, DB_URL);
        super.setUp();
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
        this.shutdown();
    }

    public void testDateSplits() throws Exception {
        Statement s = this.connection.createStatement();
        String DATE_TABLE = "datetable";
        String COL2 = "foo";
        try {
            s.executeUpdate("DROP TABLE datetable");
        }
        catch (SQLException e) {
            // empty catch block
        }
        s.executeUpdate("CREATE TABLE datetable(foo DATE)");
        s.executeUpdate("INSERT INTO datetable VALUES('2010-04-01')");
        s.executeUpdate("INSERT INTO datetable VALUES('2010-04-02')");
        s.executeUpdate("INSERT INTO datetable VALUES('2010-05-01')");
        s.executeUpdate("INSERT INTO datetable VALUES('2011-04-01')");
        this.connection.commit();
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        LocalFileSystem fs = FileSystem.getLocal(conf);
        ((FileSystem)fs).delete(new Path(OUT_DIR), true);
        Job job = Job.getInstance(conf);
        job.setMapperClass(ValMapper.class);
        job.setReducerClass(Reducer.class);
        job.setMapOutputKeyClass(DateCol.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(DateCol.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(1);
        job.getConfiguration().setInt("mapreduce.map.tasks", 2);
        FileOutputFormat.setOutputPath(job, new Path(OUT_DIR));
        DBConfiguration.configureDB(job.getConfiguration(), DRIVER_CLASS, DB_URL, null, null);
        DataDrivenDBInputFormat.setInput(job, DateCol.class, "datetable", null, "foo", "foo");
        boolean ret = job.waitForCompletion(true);
        TestDataDrivenDBInputFormat.assertTrue((String)"job failed", (boolean)ret);
        TestDataDrivenDBInputFormat.assertEquals((String)"Did not get all the records", (long)4L, (long)job.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS).getValue());
    }

    public static class ValMapper
    extends Mapper<Object, Object, Object, NullWritable> {
        @Override
        public void map(Object k, Object v, Mapper.Context c) throws IOException, InterruptedException {
            c.write(v, NullWritable.get());
        }
    }

    public static class DateCol
    implements DBWritable,
    WritableComparable {
        Date d;

        public String toString() {
            return this.d.toString();
        }

        @Override
        public void readFields(ResultSet rs) throws SQLException {
            this.d = rs.getDate(1);
        }

        @Override
        public void write(PreparedStatement ps) {
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            long v = in.readLong();
            this.d = new Date(v);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(this.d.getTime());
        }

        public int hashCode() {
            return (int)this.d.getTime();
        }

        @Override
        public int compareTo(Object o) {
            if (o instanceof DateCol) {
                Long v = this.d.getTime();
                Long other = ((DateCol)o).d.getTime();
                return v.compareTo(other);
            }
            return -1;
        }
    }
}

