package org.mariadb.r2dbc.client;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.ReferenceCountUtil;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.R2dbcTransientResourceException;
import io.r2dbc.spi.TransactionDefinition;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLParameters;
import org.mariadb.r2dbc.ExceptionFactory;
import org.mariadb.r2dbc.HaMode;
import org.mariadb.r2dbc.MariadbConnectionConfiguration;
import org.mariadb.r2dbc.MariadbTransactionDefinition;
import org.mariadb.r2dbc.SslMode;
import org.mariadb.r2dbc.message.ClientMessage;
import org.mariadb.r2dbc.message.Context;
import org.mariadb.r2dbc.message.ServerMessage;
import org.mariadb.r2dbc.message.client.ExecutePacket;
import org.mariadb.r2dbc.message.client.PreparePacket;
import org.mariadb.r2dbc.message.client.QueryPacket;
import org.mariadb.r2dbc.message.client.QuitPacket;
import org.mariadb.r2dbc.message.client.SslRequestPacket;
import org.mariadb.r2dbc.message.server.CompletePrepareResult;
import org.mariadb.r2dbc.message.server.ErrorPacket;
import org.mariadb.r2dbc.message.server.InitialHandshakePacket;
import org.mariadb.r2dbc.util.HostAddress;
import org.mariadb.r2dbc.util.PrepareCache;
import org.mariadb.r2dbc.util.ServerPrepareResult;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.channel.AbortedException;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:org/mariadb/r2dbc/client/SimpleClient.class */
public class SimpleClient implements Client {
    private static final Logger logger = Loggers.getLogger(SimpleClient.class);
    protected final MariadbConnectionConfiguration configuration;
    private final ServerMessageSubscriber messageSubscriber;
    private final MariadbFrameDecoder decoder;
    private final MariadbPacketEncoder encoder;
    private final PrepareCache prepareCache;
    private final ByteBufAllocator byteBufAllocator;
    protected final ReentrantLock lock;
    protected final Connection connection;
    protected final HostAddress hostAddress;
    protected volatile Context context;
    private final Sinks.Many<ClientMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
    private final Queue<Exchange> exchangeQueue = (Queue) Queues.get(Queues.SMALL_BUFFER_SIZE).get();
    private final Queue<ServerMessage> receiverQueue = (Queue) Queues.get(Queues.SMALL_BUFFER_SIZE).get();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private volatile boolean closeRequested = false;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/mariadb/r2dbc/client/SimpleClient$ServerMessageSubscriber.class */
    public class ServerMessageSubscriber implements CoreSubscriber<ServerMessage> {
        private Subscription upstream;
        private volatile boolean close;
        private final AtomicLong receiverDemands = new AtomicLong(0);
        private final ReentrantLock lock;
        private final Queue<Exchange> exchangeQueue;
        private final Queue<ServerMessage> receiverQueue;

        public ServerMessageSubscriber(ReentrantLock reentrantLock, Queue<Exchange> queue, Queue<ServerMessage> queue2) {
            this.lock = reentrantLock;
            this.receiverQueue = queue2;
            this.exchangeQueue = queue;
        }

        public void onSubscribe(Subscription subscription) {
            this.upstream = subscription;
        }

        public void onError(Throwable th) {
            th.printStackTrace();
            if (this.close) {
                Operators.onErrorDropped(th, currentContext());
                return;
            }
            SimpleClient.this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
            SimpleClient.this.handleConnectionError(th);
            SimpleClient.this.quitOrClose().subscribe();
        }

        public void onComplete() {
            Object[] objArr = new Object[1];
            objArr[0] = SimpleClient.this.closeChannelIfNeeded() ? "unexpected error" : "error";
            close(new R2dbcNonTransientResourceException(String.format("Connection %s", objArr), "08000"));
            SimpleClient.this.quitOrClose().subscribe();
        }

        public void onNext(ServerMessage serverMessage) {
            if (this.close) {
                Operators.onNextDropped(serverMessage, currentContext());
                return;
            }
            this.receiverDemands.decrementAndGet();
            Exchange peek = this.exchangeQueue.peek();
            ReferenceCountUtil.retain(serverMessage);
            if (this.receiverQueue.isEmpty() && peek != null && peek.hasDemand()) {
                if (peek.emit(serverMessage)) {
                    this.exchangeQueue.poll();
                }
                if (peek.hasDemand() || peek.isCancelled()) {
                    requestQueueFilling();
                    return;
                }
                return;
            }
            if (this.receiverQueue.offer(serverMessage)) {
                tryDrainQueue();
                return;
            }
            serverMessage.release();
            Operators.onNextDropped(serverMessage, currentContext());
            onError(new R2dbcNonTransientResourceException("unexpected : server message queue is full"));
        }

        public void onRequest(Exchange exchange, long j) {
            exchange.incrementDemand(j);
            requestQueueFilling();
            tryDrainQueue();
        }

        private void requestQueueFilling() {
            if (this.receiverQueue.isEmpty() && this.receiverDemands.compareAndSet(0L, Queues.SMALL_BUFFER_SIZE)) {
                this.upstream.request(Queues.SMALL_BUFFER_SIZE);
            }
        }

        private void tryDrainQueue() {
            while (!this.receiverQueue.isEmpty() && this.lock.tryLock()) {
                while (!this.receiverQueue.isEmpty()) {
                    try {
                        Exchange peek = this.exchangeQueue.peek();
                        if (peek == null || !peek.hasDemand()) {
                            return;
                        }
                        ServerMessage poll = this.receiverQueue.poll();
                        if (poll == null) {
                            return;
                        }
                        if (peek.emit(poll)) {
                            this.exchangeQueue.poll();
                        }
                    } finally {
                        this.lock.unlock();
                    }
                }
                Exchange peek2 = this.exchangeQueue.peek();
                if (peek2 == null || peek2.hasDemand()) {
                    requestQueueFilling();
                }
            }
        }

        public void close(R2dbcException r2dbcException) {
            this.close = true;
            while (true) {
                Exchange poll = this.exchangeQueue.poll();
                if (poll == null) {
                    break;
                } else {
                    poll.onError(r2dbcException);
                }
            }
            while (!this.receiverQueue.isEmpty()) {
                this.receiverQueue.poll().release();
            }
        }

        public boolean isClose() {
            return this.close;
        }
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Context getContext() {
        return this.context;
    }

    protected SimpleClient(Connection connection, MariadbConnectionConfiguration mariadbConnectionConfiguration, HostAddress hostAddress, ReentrantLock reentrantLock) {
        SSLEngine newEngine;
        this.connection = connection;
        this.configuration = mariadbConnectionConfiguration;
        this.hostAddress = hostAddress;
        this.lock = reentrantLock;
        this.prepareCache = new PrepareCache(this.configuration.useServerPrepStmts() ? this.configuration.getPrepareCacheSize() : 0, this);
        this.decoder = new MariadbFrameDecoder(this.exchangeQueue, this, mariadbConnectionConfiguration);
        this.encoder = new MariadbPacketEncoder();
        this.byteBufAllocator = connection.outbound().alloc();
        this.messageSubscriber = new ServerMessageSubscriber(this.lock, this.exchangeQueue, this.receiverQueue);
        connection.addHandlerFirst(this.decoder);
        if (mariadbConnectionConfiguration.getSslConfig().getSslMode() == SslMode.TUNNEL) {
            try {
                SslContext sslContext = mariadbConnectionConfiguration.getSslConfig().getSslContext();
                if (hostAddress != null) {
                    newEngine = sslContext.newEngine(connection.channel().alloc(), hostAddress.getHost(), hostAddress.getPort());
                    SSLParameters sSLParameters = newEngine.getSSLParameters();
                    if (!mariadbConnectionConfiguration.getSslConfig().tunnelHostVerificationDisabled()) {
                        sSLParameters.setEndpointIdentificationAlgorithm("HTTPS");
                    }
                    newEngine.setSSLParameters(sSLParameters);
                } else {
                    newEngine = sslContext.newEngine(connection.channel().alloc());
                }
                connection.addHandlerFirst(new SslHandler(newEngine));
            } catch (SSLException e) {
                handleConnectionError(e);
            }
        }
        if (logger.isTraceEnabled()) {
            connection.addHandlerFirst(LoggingHandler.class.getSimpleName(), new LoggingHandler(SimpleClient.class, LogLevel.TRACE));
        }
        connection.inbound().receiveObject().cast(ServerMessage.class).onErrorResume(this::receiveResumeError).subscribe(this.messageSubscriber);
        Flux asFlux = this.requestSink.asFlux();
        MariadbPacketEncoder mariadbPacketEncoder = this.encoder;
        Objects.requireNonNull(mariadbPacketEncoder);
        asFlux.map(mariadbPacketEncoder::encodeFlux).flatMap(mono -> {
            return connection.outbound().send(mono);
        }, 1).onErrorResume(this::sendResumeError).doAfterTerminate(this::closeChannelIfNeeded).subscribe();
    }

    public static Mono<SimpleClient> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, HostAddress hostAddress, MariadbConnectionConfiguration mariadbConnectionConfiguration, ReentrantLock reentrantLock) {
        return setSocketOption(mariadbConnectionConfiguration, TcpClient.create(connectionProvider).remoteAddress(() -> {
            return socketAddress;
        }).runOn(mariadbConnectionConfiguration.loopResources())).connect().flatMap(connection -> {
            return Mono.just(new SimpleClient(connection, mariadbConnectionConfiguration, hostAddress, reentrantLock));
        });
    }

    public static TcpClient setSocketOption(MariadbConnectionConfiguration mariadbConnectionConfiguration, TcpClient tcpClient) {
        if (mariadbConnectionConfiguration.getConnectTimeout() != null) {
            tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(Math.toIntExact(mariadbConnectionConfiguration.getConnectTimeout().toMillis())));
        }
        if (mariadbConnectionConfiguration.isTcpKeepAlive()) {
            tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(mariadbConnectionConfiguration.isTcpKeepAlive()));
        }
        if (mariadbConnectionConfiguration.isTcpAbortiveClose()) {
            tcpClient = tcpClient.option(ChannelOption.SO_LINGER, 0);
        }
        return tcpClient;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public void handleConnectionError(Throwable th) {
        if (AbortedException.isConnectionReset(th) && !isConnected()) {
            this.messageSubscriber.close(new R2dbcNonTransientResourceException("Cannot execute command since connection is already closed", "08000", th));
        } else {
            this.messageSubscriber.close(th instanceof SSLHandshakeException ? new R2dbcNonTransientResourceException(th.getMessage(), "08000", th) : new R2dbcNonTransientResourceException("Connection error", "08000", th));
            closeChannelIfNeeded();
        }
    }

    private Mono<Void> sendResumeError(Throwable th) {
        handleConnectionError(th);
        this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        return quitOrClose();
    }

    private Mono<ServerMessage> receiveResumeError(Throwable th) {
        return sendResumeError(th).then(Mono.empty());
    }

    @Override // org.mariadb.r2dbc.client.Client
    public boolean closeChannelIfNeeded() {
        if (!this.isClosed.compareAndSet(false, true)) {
            return false;
        }
        Channel channel = this.connection.channel();
        this.messageSubscriber.close(new R2dbcNonTransientResourceException("Connection unexpectedly closed", "08000"));
        if (!channel.isOpen()) {
            return true;
        }
        this.connection.dispose();
        return true;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> close() {
        this.closeRequested = true;
        return quitOrClose();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Void> quitOrClose() {
        return Mono.defer(() -> {
            this.messageSubscriber.close(new R2dbcNonTransientResourceException(this.closeRequested ? "Connection has been closed" : "Connection closed", "08000"));
            if (!this.isClosed.compareAndSet(false, true)) {
                return Mono.empty();
            }
            if (!this.connection.channel().isOpen()) {
                return Flux.just(QuitPacket.INSTANCE).doOnNext(quitPacket -> {
                    this.connection.channel().writeAndFlush(quitPacket.encode(this.context, this.context.getByteBufAllocator()));
                }).then().doOnSuccess(r3 -> {
                    this.connection.dispose();
                }).then(this.connection.onDispose());
            }
            this.connection.dispose();
            return this.connection.onDispose();
        });
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> sendSslRequest(SslRequestPacket sslRequestPacket, MariadbConnectionConfiguration mariadbConnectionConfiguration) {
        SslHandler newHandler;
        try {
            SslContext sslContext = mariadbConnectionConfiguration.getSslConfig().getSslContext();
            if (this.hostAddress != null) {
                newHandler = sslContext.newHandler(this.connection.channel().alloc(), this.hostAddress.getHost(), this.hostAddress.getPort());
                if (mariadbConnectionConfiguration.getSslConfig().getSslMode() == SslMode.VERIFY_FULL) {
                    SSLEngine engine = newHandler.engine();
                    SSLParameters sSLParameters = engine.getSSLParameters();
                    sSLParameters.setEndpointIdentificationAlgorithm("HTTPS");
                    engine.setSSLParameters(sSLParameters);
                }
            } else {
                newHandler = sslContext.newHandler(this.connection.channel().alloc());
            }
            this.requestSink.emitNext(sslRequestPacket, Sinks.EmitFailureHandler.FAIL_FAST);
            this.connection.addHandlerFirst(newHandler);
            return Mono.empty();
        } catch (Throwable th) {
            closeChannelIfNeeded();
            return Mono.error(th);
        }
    }

    private Flux<ServerMessage> execute(Consumer<FluxSink<ServerMessage>> consumer) {
        return Flux.create(fluxSink -> {
            if (!isConnected()) {
                fluxSink.error(new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
                return;
            }
            try {
                this.lock.lock();
                consumer.accept(fluxSink);
            } finally {
                this.lock.unlock();
            }
        });
    }

    @Override // org.mariadb.r2dbc.client.Client
    public long getThreadId() {
        return this.context.getThreadId();
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> beginTransaction() {
        try {
            this.lock.lock();
            Flux<ServerMessage> execute = execute(fluxSink -> {
                if (this.exchangeQueue.isEmpty() && (this.context.getServerStatus() & 1) != 0) {
                    logger.debug("Skipping start transaction because already in transaction");
                    fluxSink.complete();
                    return;
                }
                Exchange exchange = new Exchange(fluxSink, DecoderState.QUERY_RESPONSE, "BEGIN");
                if (!this.exchangeQueue.offer(exchange)) {
                    fluxSink.error(new R2dbcTransientResourceException("Request queue limit reached"));
                } else {
                    this.requestSink.emitNext(new QueryPacket("BEGIN"), Sinks.EmitFailureHandler.FAIL_FAST);
                    fluxSink.onRequest(j -> {
                        this.messageSubscriber.onRequest(exchange, j);
                    });
                }
            });
            ExceptionFactory withSql = ExceptionFactory.withSql("BEGIN");
            Objects.requireNonNull(withSql);
            return execute.handle(withSql::handleErrorResponse).then();
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> beginTransaction(TransactionDefinition transactionDefinition) {
        StringBuilder sb = new StringBuilder("START TRANSACTION");
        boolean z = true;
        if (Boolean.TRUE.equals(transactionDefinition.getAttribute(TransactionDefinition.READ_ONLY))) {
            sb.append(" READ ONLY");
            z = false;
        }
        if (Boolean.TRUE.equals(transactionDefinition.getAttribute(MariadbTransactionDefinition.WITH_CONSISTENT_SNAPSHOT))) {
            if (!z) {
                sb.append(",");
            }
            sb.append(" WITH CONSISTENT SNAPSHOT");
        }
        String sb2 = sb.toString();
        try {
            this.lock.lock();
            Flux<ServerMessage> execute = execute(fluxSink -> {
                if (this.exchangeQueue.isEmpty() && (this.context.getServerStatus() & 1) != 0) {
                    logger.debug("Skipping start transaction because already in transaction");
                    fluxSink.complete();
                    return;
                }
                Exchange exchange = new Exchange(fluxSink, DecoderState.QUERY_RESPONSE, sb2);
                if (!this.exchangeQueue.offer(exchange)) {
                    fluxSink.error(new R2dbcTransientResourceException("Request queue limit reached"));
                } else {
                    this.requestSink.emitNext(new QueryPacket(sb2), Sinks.EmitFailureHandler.FAIL_FAST);
                    fluxSink.onRequest(j -> {
                        this.messageSubscriber.onRequest(exchange, j);
                    });
                }
            });
            ExceptionFactory withSql = ExceptionFactory.withSql(sb2);
            Objects.requireNonNull(withSql);
            Mono<Void> then = execute.handle(withSql::handleErrorResponse).then();
            this.lock.unlock();
            return then;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> commitTransaction() {
        try {
            this.lock.lock();
            Flux<ServerMessage> execute = execute(fluxSink -> {
                executeWhenTransaction(fluxSink, "COMMIT");
            });
            ExceptionFactory withSql = ExceptionFactory.withSql("COMMIT");
            Objects.requireNonNull(withSql);
            return execute.handle(withSql::handleErrorResponse).then();
        } finally {
            this.lock.unlock();
        }
    }

    private void executeWhenTransaction(FluxSink<ServerMessage> fluxSink, String str) {
        if (this.exchangeQueue.isEmpty() && (this.context.getServerStatus() & 1) <= 0) {
            logger.debug(String.format("Skipping '%s' because no active transaction", str));
            fluxSink.complete();
            return;
        }
        try {
            try {
                this.lock.lock();
                Exchange exchange = new Exchange(fluxSink, DecoderState.QUERY_RESPONSE, str);
                if (this.exchangeQueue.offer(exchange)) {
                    fluxSink.onRequest(j -> {
                        this.messageSubscriber.onRequest(exchange, j);
                    });
                    this.requestSink.emitNext(new QueryPacket(str), Sinks.EmitFailureHandler.FAIL_FAST);
                } else {
                    fluxSink.error(new R2dbcTransientResourceException("Request queue limit reached"));
                }
            } catch (Throwable th) {
                th.printStackTrace();
                throw th;
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> rollbackTransaction() {
        try {
            this.lock.lock();
            Flux<ServerMessage> execute = execute(fluxSink -> {
                executeWhenTransaction(fluxSink, "ROLLBACK");
            });
            ExceptionFactory withSql = ExceptionFactory.withSql("ROLLBACK");
            Objects.requireNonNull(withSql);
            return execute.handle(withSql::handleErrorResponse).then();
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> rollbackTransactionToSavepoint(String str) {
        try {
            this.lock.lock();
            String format = String.format("ROLLBACK TO SAVEPOINT `%s`", str.replace("`", "``"));
            Flux<ServerMessage> execute = execute(fluxSink -> {
                executeWhenTransaction(fluxSink, format);
            });
            ExceptionFactory withSql = ExceptionFactory.withSql(format);
            Objects.requireNonNull(withSql);
            Mono<Void> then = execute.handle(withSql::handleErrorResponse).then();
            this.lock.unlock();
            return then;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<Void> setAutoCommit(boolean z) {
        try {
            this.lock.lock();
            Flux<ServerMessage> execute = execute(fluxSink -> {
                String str = "SET autocommit=" + (z ? '1' : '0');
                if (this.exchangeQueue.isEmpty() && z == isAutoCommit()) {
                    logger.debug("Skipping autocommit since already in that state");
                    fluxSink.complete();
                    return;
                }
                try {
                    Exchange exchange = new Exchange(fluxSink, DecoderState.QUERY_RESPONSE, str);
                    if (this.exchangeQueue.offer(exchange)) {
                        fluxSink.onRequest(j -> {
                            this.messageSubscriber.onRequest(exchange, j);
                        });
                        this.requestSink.emitNext(new QueryPacket(str), Sinks.EmitFailureHandler.FAIL_FAST);
                    } else {
                        fluxSink.error(new R2dbcTransientResourceException("Request queue limit reached"));
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                    throw th;
                }
            });
            ExceptionFactory withSql = ExceptionFactory.withSql(null);
            Objects.requireNonNull(withSql);
            return execute.handle(withSql::handleErrorResponse).then();
        } finally {
            this.lock.unlock();
        }
    }

    public Flux<ServerMessage> receive(DecoderState decoderState) {
        return Flux.create(fluxSink -> {
            try {
                try {
                    this.lock.lock();
                    Exchange exchange = new Exchange(fluxSink, decoderState);
                    fluxSink.onRequest(j -> {
                        this.messageSubscriber.onRequest(exchange, j);
                    });
                    if (!this.exchangeQueue.offer(exchange)) {
                        fluxSink.error(new R2dbcTransientResourceException("Request queue limit reached during handshake"));
                    }
                } finally {
                }
            } finally {
                this.lock.unlock();
            }
        });
    }

    @Override // org.mariadb.r2dbc.client.Client
    public void setContext(InitialHandshakePacket initialHandshakePacket, long j) {
        this.context = (HaMode.NONE.equals(this.configuration.getHaMode()) || !this.configuration.isTransactionReplay()) ? new SimpleContext(initialHandshakePacket.getServerVersion(), initialHandshakePacket.getThreadId(), initialHandshakePacket.getCapabilities(), initialHandshakePacket.getServerStatus(), initialHandshakePacket.isMariaDBServer(), j, this.configuration.getDatabase(), this.byteBufAllocator, this.configuration.getIsolationLevel()) : new RedoContext(initialHandshakePacket.getServerVersion(), initialHandshakePacket.getThreadId(), initialHandshakePacket.getCapabilities(), initialHandshakePacket.getServerStatus(), initialHandshakePacket.isMariaDBServer(), j, this.configuration.getDatabase(), this.byteBufAllocator, this.configuration.getIsolationLevel());
        this.decoder.setContext(this.context);
        this.encoder.setContext(this.context);
    }

    @Override // org.mariadb.r2dbc.client.Client
    public boolean isAutoCommit() {
        return (this.context.getServerStatus() & 2) > 0;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public boolean isInTransaction() {
        return (this.context.getServerStatus() & 1) > 0;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public boolean noBackslashEscapes() {
        return (this.context.getServerStatus() & 512) > 0;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public ServerVersion getVersion() {
        return this.context != null ? this.context.getVersion() : ServerVersion.UNKNOWN_VERSION;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public boolean isConnected() {
        if (this.isClosed.get()) {
            return false;
        }
        return this.connection.channel().isOpen();
    }

    @Override // org.mariadb.r2dbc.client.Client
    public boolean isCloseRequested() {
        return this.closeRequested;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public void sendCommandWithoutResult(ClientMessage clientMessage) {
        try {
            this.lock.lock();
            this.requestSink.emitNext(clientMessage, Sinks.EmitFailureHandler.FAIL_FAST);
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Flux<ServerMessage> sendCommand(ClientMessage clientMessage, boolean z) {
        return sendCommand(clientMessage, DecoderState.QUERY_RESPONSE, null, z);
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Flux<ServerMessage> sendCommand(ClientMessage clientMessage, DecoderState decoderState, boolean z) {
        return sendCommand(clientMessage, decoderState, null, z);
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Flux<ServerMessage> sendCommand(ClientMessage clientMessage, DecoderState decoderState, String str, boolean z) {
        return Flux.create(fluxSink -> {
            if (isConnected()) {
                try {
                    if (!this.messageSubscriber.isClose()) {
                        try {
                            this.lock.lock();
                            Exchange exchange = new Exchange(fluxSink, decoderState, str);
                            if (this.exchangeQueue.offer(exchange)) {
                                if (clientMessage instanceof PreparePacket) {
                                    this.decoder.addPrepare(((PreparePacket) clientMessage).getSql());
                                }
                                fluxSink.onRequest(j -> {
                                    this.messageSubscriber.onRequest(exchange, j);
                                });
                                this.requestSink.emitNext(clientMessage, Sinks.EmitFailureHandler.FAIL_FAST);
                            } else {
                                fluxSink.error(new R2dbcTransientResourceException("Request queue limit reached"));
                            }
                            this.lock.unlock();
                            return;
                        } catch (Throwable th) {
                            fluxSink.error(th);
                            this.lock.unlock();
                            return;
                        }
                    }
                } catch (Throwable th2) {
                    this.lock.unlock();
                    throw th2;
                }
            }
            fluxSink.error(new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
        });
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Mono<ServerPrepareResult> sendPrepare(ClientMessage clientMessage, ExceptionFactory exceptionFactory, String str) {
        return sendCommand(clientMessage, DecoderState.PREPARE_RESPONSE, str, true).handle((serverMessage, synchronousSink) -> {
            if (serverMessage instanceof ErrorPacket) {
                synchronousSink.error(exceptionFactory.from((ErrorPacket) serverMessage));
                return;
            }
            if (serverMessage instanceof CompletePrepareResult) {
                synchronousSink.next(((CompletePrepareResult) serverMessage).getPrepare());
            }
            if (serverMessage.ending()) {
                synchronousSink.complete();
            }
        }).cast(ServerPrepareResult.class).singleOrEmpty();
    }

    @Override // org.mariadb.r2dbc.client.Client
    public Flux<ServerMessage> sendCommand(PreparePacket preparePacket, ExecutePacket executePacket, boolean z) {
        return Flux.create(fluxSink -> {
            if (isConnected()) {
                try {
                    if (!this.messageSubscriber.isClose()) {
                        try {
                            this.lock.lock();
                            Exchange exchange = new Exchange(fluxSink, DecoderState.PREPARE_AND_EXECUTE_RESPONSE, preparePacket.getSql());
                            if (this.exchangeQueue.offer(exchange)) {
                                fluxSink.onRequest(j -> {
                                    this.messageSubscriber.onRequest(exchange, j);
                                });
                                this.decoder.addPrepare(preparePacket.getSql());
                                this.requestSink.emitNext(preparePacket, Sinks.EmitFailureHandler.FAIL_FAST);
                                this.requestSink.emitNext(executePacket, Sinks.EmitFailureHandler.FAIL_FAST);
                            } else {
                                fluxSink.error(new R2dbcTransientResourceException("Request queue limit reached"));
                            }
                            this.lock.unlock();
                            return;
                        } catch (Throwable th) {
                            th.printStackTrace();
                            fluxSink.error(th);
                            this.lock.unlock();
                            return;
                        }
                    }
                } catch (Throwable th2) {
                    this.lock.unlock();
                    throw th2;
                }
            }
            fluxSink.error(new R2dbcNonTransientResourceException("Connection is close. Cannot send anything"));
        });
    }

    @Override // org.mariadb.r2dbc.client.Client
    public HostAddress getHostAddress() {
        return this.hostAddress;
    }

    @Override // org.mariadb.r2dbc.client.Client
    public PrepareCache getPrepareCache() {
        return this.prepareCache;
    }

    public String toString() {
        return "Client{isClosed=" + this.isClosed + ", context=" + this.context + '}';
    }
}
