package org.springframework.cassandra.core.session;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/springframework/cassandra/core/session/DefaultBridgedReactiveSession.class */
public class DefaultBridgedReactiveSession implements ReactiveSession {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Session session;
    private final Scheduler scheduler;

    /* loaded from: input_file:org/springframework/cassandra/core/session/DefaultBridgedReactiveSession$DefaultReactiveResultSet.class */
    private static class DefaultReactiveResultSet implements ReactiveResultSet {
        private final ResultSet resultSet;
        private final Scheduler scheduler;

        DefaultReactiveResultSet(ResultSet resultSet, Scheduler scheduler) {
            this.resultSet = resultSet;
            this.scheduler = scheduler;
        }

        @Override // org.springframework.cassandra.core.session.ReactiveResultSet
        public Flux<Row> rows() {
            return Flux.fromIterable(this.resultSet).subscribeOn(this.scheduler).publishOn(Schedulers.immediate(), Math.max(1, this.resultSet.getAvailableWithoutFetching()));
        }

        @Override // org.springframework.cassandra.core.session.ReactiveResultSet
        public ColumnDefinitions getColumnDefinitions() {
            return this.resultSet.getColumnDefinitions();
        }

        @Override // org.springframework.cassandra.core.session.ReactiveResultSet
        public boolean wasApplied() {
            return this.resultSet.wasApplied();
        }

        @Override // org.springframework.cassandra.core.session.ReactiveResultSet
        public ExecutionInfo getExecutionInfo() {
            return this.resultSet.getExecutionInfo();
        }

        @Override // org.springframework.cassandra.core.session.ReactiveResultSet
        public List<ExecutionInfo> getAllExecutionInfo() {
            return this.resultSet.getAllExecutionInfo();
        }
    }

    public DefaultBridgedReactiveSession(Session session, Scheduler scheduler) {
        Assert.notNull(session, "Session must not be null");
        Assert.notNull(scheduler, "Scheduler must not be null");
        this.session = session;
        this.scheduler = scheduler;
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str) {
        Assert.hasText(str, "Query must not be empty");
        return execute((Statement) new SimpleStatement(str));
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str, Object... objArr) {
        Assert.hasText(str, "Query must not be empty");
        return execute((Statement) new SimpleStatement(str, objArr));
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public Mono<ReactiveResultSet> execute(String str, Map<String, Object> map) {
        Assert.hasText(str, "Query must not be empty");
        return execute((Statement) new SimpleStatement(str, map));
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public Mono<ReactiveResultSet> execute(Statement statement) {
        Assert.notNull(statement, "Statement must not be null");
        return Mono.defer(() -> {
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Executing Statement [{}]", statement);
                }
                CompletableFuture completableFuture = new CompletableFuture();
                ResultSetFuture executeAsync = this.session.executeAsync(statement);
                executeAsync.addListener(() -> {
                    if (executeAsync.isDone()) {
                        try {
                            completableFuture.complete(new DefaultReactiveResultSet(executeAsync.getUninterruptibly(), this.scheduler));
                        } catch (Exception e) {
                            completableFuture.completeExceptionally(e);
                        }
                    }
                }, (v0) -> {
                    v0.run();
                });
                return Mono.fromFuture(completableFuture);
            } catch (Exception e) {
                return Mono.error(e);
            }
        }).subscribeOn(this.scheduler);
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public Mono<PreparedStatement> prepare(String str) {
        Assert.hasText(str, "Query must not be empty");
        return prepare((RegularStatement) new SimpleStatement(str));
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public Mono<PreparedStatement> prepare(RegularStatement regularStatement) {
        Assert.notNull(regularStatement, "Statement must not be null");
        return Mono.defer(() -> {
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Preparing Statement [{}]", regularStatement);
                }
                CompletableFuture completableFuture = new CompletableFuture();
                ListenableFuture prepareAsync = this.session.prepareAsync(regularStatement);
                prepareAsync.addListener(() -> {
                    if (prepareAsync.isDone()) {
                        try {
                            completableFuture.complete(prepareAsync.get());
                        } catch (Exception e) {
                            completableFuture.completeExceptionally(e);
                        }
                    }
                }, (v0) -> {
                    v0.run();
                });
                return Mono.fromFuture(completableFuture);
            } catch (Exception e) {
                return Mono.error(e);
            }
        }).subscribeOn(this.scheduler);
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.session.close();
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public boolean isClosed() {
        return this.session.isClosed();
    }

    @Override // org.springframework.cassandra.core.session.ReactiveSession
    public Cluster getCluster() {
        return this.session.getCluster();
    }
}
