package org.apache.pulsar.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.Map;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

/* loaded from: input_file:org/apache/pulsar/io/cassandra/CassandraAbstractSink.class */
public abstract class CassandraAbstractSink<K, V> implements Sink<byte[]> {
    private Cluster cluster;
    private Session session;
    CassandraSinkConfig cassandraSinkConfig;
    private PreparedStatement statement;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.cassandraSinkConfig = CassandraSinkConfig.load(map);
        if (this.cassandraSinkConfig.getRoots() == null || this.cassandraSinkConfig.getKeyspace() == null || this.cassandraSinkConfig.getKeyname() == null || this.cassandraSinkConfig.getColumnFamily() == null || this.cassandraSinkConfig.getColumnName() == null) {
            throw new IllegalArgumentException("Required property not set.");
        }
        createClient(this.cassandraSinkConfig.getRoots());
        this.statement = this.session.prepare("INSERT INTO " + this.cassandraSinkConfig.getColumnFamily() + " (" + this.cassandraSinkConfig.getKeyname() + ", " + this.cassandraSinkConfig.getColumnName() + ") VALUES (?, ?)");
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.session.close();
        this.cluster.close();
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(final Record<byte[]> record) {
        KeyValue<K, V> extractKeyValue = extractKeyValue(record);
        Futures.addCallback(this.session.executeAsync(this.statement.bind(extractKeyValue.getKey(), extractKeyValue.getValue())), new FutureCallback<ResultSet>() { // from class: org.apache.pulsar.io.cassandra.CassandraAbstractSink.1
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(ResultSet resultSet) {
                record.ack();
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                record.fail();
            }
        });
    }

    private void createClient(String str) {
        String[] split = str.split(",");
        if (split.length <= 0) {
            throw new RuntimeException("Invalid cassandra roots");
        }
        Cluster.Builder builder = Cluster.builder();
        for (String str2 : split) {
            String[] split2 = str2.split(":");
            builder.addContactPoint(split2[0]);
            if (split2.length > 1) {
                builder.withPort(Integer.valueOf(split2[1]).intValue());
            }
        }
        this.cluster = builder.build();
        this.session = this.cluster.connect();
        this.session.execute("USE " + this.cassandraSinkConfig.getKeyspace());
    }

    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
}
