package com.aliyun.openservices.iot.api.message.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.iot.api.Profile;
import com.aliyun.openservices.iot.api.exception.IotClientException;
import com.aliyun.openservices.iot.api.http2.IotHttp2Client;
import com.aliyun.openservices.iot.api.http2.callback.AbstractHttp2StreamDataReceiver;
import com.aliyun.openservices.iot.api.http2.connection.Connection;
import com.aliyun.openservices.iot.api.http2.connection.ConnectionListener;
import com.aliyun.openservices.iot.api.http2.connection.ConnectionStatus;
import com.aliyun.openservices.iot.api.http2.entity.Http2Response;
import com.aliyun.openservices.iot.api.http2.entity.StreamData;
import com.aliyun.openservices.iot.api.message.api.MessageClient;
import com.aliyun.openservices.iot.api.message.callback.ConnectionCallback;
import com.aliyun.openservices.iot.api.message.callback.MessageCallback;
import com.aliyun.openservices.iot.api.message.entity.Message;
import com.aliyun.openservices.iot.api.message.entity.MessageToken;
import com.aliyun.openservices.iot.api.message.entity.SubscribeInfo;
import com.aliyun.openservices.iot.api.util.StringUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/openservices/iot/api/message/impl/MessageClientImpl.class */
public class MessageClientImpl extends AbstractHttp2StreamDataReceiver implements ConnectionListener, MessageClient {
    private static final Logger log = LoggerFactory.getLogger(MessageClientImpl.class);
    private static final String MESSAGE_ID = "x-message-id";
    private static final String PATH_MESSAGE_ACK = "/message/ack";
    private static final String PATH_MESSAGE_PUB = "/message/pub";
    private static final String PATH_MESSAGE_SUB = "/message/sub";
    private static final String PATH_MESSAGE_UNSUB = "/message/unsub";
    private static final String PATH_CONNECT = "/message/echo/success";
    private static final String SLASH = "/";
    private static final String PLUS_SIGN = "+";
    private static final String CROSSHATCH = "#";
    private static final String IOT_ID = "x-iot-id";
    private static final String TOPIC = "x-topic";
    private static final String QOS = "x-qos";
    private static final String GENERATE_TIME = "x-generate-time";
    private static final String AS_STATUS_PREFIX = "/as/mqtt/status/";
    private final Profile profile;
    private MessageCallback messageCallback;
    private ExecutorService messageCallbackExecutorService;
    private ScheduledExecutorService publishRetryExecutorService;
    private IotHttp2Client client;
    private List<SubscribeInfo> subscriptionInfo = new CopyOnWriteArrayList();
    private AtomicBoolean started = new AtomicBoolean(false);
    private AtomicBoolean isConnected = new AtomicBoolean(false);
    private AtomicLong localMessageId = new AtomicLong(0);
    private ConnectionCallback connectionCallback;

    public MessageClientImpl(Profile profile) {
        this.profile = profile;
    }

    private CompletableFuture<Http2Response> sendAuth(Connection connection) {
        Http2Headers authHeader = this.client.authHeader();
        authHeader.path(PATH_CONNECT);
        authHeader.add("x-clear-session", this.profile.isCleanSession() ? "1" : "0");
        return this.client.sendRequest(connection, authHeader, (byte[]) null);
    }

    private void doConnect() {
        try {
            Connection newConnection = this.client.newConnection();
            newConnection.setDefaultStreamListener(this);
            Http2Response http2Response = sendAuth(newConnection).get(15L, TimeUnit.SECONDS);
            if (http2Response.getStatus() != HttpResponseStatus.OK) {
                throw new IotClientException("connect to server failed", http2Response);
            }
            newConnection.setStatus(ConnectionStatus.AUTHORIZED);
            this.client.addConnectionListener(this);
            this.isConnected.set(true);
            Optional.ofNullable(this.connectionCallback).ifPresent(connectionCallback -> {
                this.messageCallbackExecutorService.execute(() -> {
                    connectionCallback.onConnected(false);
                });
            });
        } catch (InterruptedException | TimeoutException e) {
            throw new IotClientException(e);
        } catch (ExecutionException e2) {
            throw new IotClientException(e2.getCause());
        }
    }

    private Message convertStreamData2Message(Http2Response http2Response) throws IOException {
        if (!HttpResponseStatus.OK.equals(http2Response.getStatus())) {
            throw new IOException("status is not success, code: " + http2Response.getStatus() + ", content: " + new String(http2Response.getContent()));
        }
        byte[] content = http2Response.getContent();
        String charSequence = ((CharSequence) http2Response.getHeaders().get(MESSAGE_ID)).toString();
        String str = null;
        if (http2Response.getHeaders().contains(TOPIC)) {
            str = ((CharSequence) http2Response.getHeaders().get(TOPIC)).toString();
        }
        int i = 0;
        if (http2Response.getHeaders().contains(QOS)) {
            i = http2Response.getHeaders().getInt(QOS).intValue();
        }
        long j = 0;
        if (http2Response.getHeaders().contains(GENERATE_TIME)) {
            j = http2Response.getHeaders().getLong(GENERATE_TIME).longValue();
        }
        if (isStatusCallback(str)) {
            String[] split = str.split(SLASH);
            JSONObject parseObject = JSON.parseObject(new String(content));
            parseObject.remove("meta");
            if (split.length > 5) {
                parseObject.put("productKey", split[4]);
                parseObject.put("deviceName", split[5]);
            }
            content = parseObject.toJSONString().getBytes();
        }
        return new Message(content, str, charSequence, i, j);
    }

    private boolean isStatusCallback(String str) {
        return StringUtil.isNotEmpty(str) && str.startsWith(AS_STATUS_PREFIX);
    }

    private boolean needAck(Message message) {
        return message.getQos() == 1 || message.getQos() == 2;
    }

    public void onStreamError(Connection connection, Http2Stream http2Stream, IOException iOException) {
        log.error("message receive error, {}", iOException.getMessage());
    }

    public void onDataRead(Connection connection, Http2Stream http2Stream, StreamData streamData) {
        try {
            Message convertStreamData2Message = convertStreamData2Message(new Http2Response(streamData.getHeaders(), streamData.readAllData()));
            MessageToken messageToken = new MessageToken(convertStreamData2Message, connection, this.client);
            log.info("receive msg, messageId:{}, data size: {}", convertStreamData2Message.getMessageId(), Integer.valueOf(convertStreamData2Message.getPayload().length));
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
                try {
                    MessageCallback messageCallback = (MessageCallback) this.subscriptionInfo.stream().filter(subscribeInfo -> {
                        return isMatch(convertStreamData2Message.getTopic(), subscribeInfo.getTopic());
                    }).findFirst().map((v0) -> {
                        return v0.getCallback();
                    }).orElse(this.messageCallback);
                    if (messageCallback != null) {
                        return messageCallback.consume(messageToken);
                    }
                    log.warn("no message callback for " + convertStreamData2Message.getTopic());
                    return MessageCallback.Action.CommitFailure;
                } catch (Throwable th) {
                    log.error("message consume error, messageId:{}, {}", convertStreamData2Message.getMessageId(), th);
                    return MessageCallback.Action.CommitFailure;
                }
            }, this.messageCallbackExecutorService);
            if (needAck(convertStreamData2Message)) {
                supplyAsync.whenComplete((action, th) -> {
                    if (th != null) {
                        log.error("consume message {}, occurs error: ", convertStreamData2Message, th.getMessage());
                    }
                    log.info("consume message: {} , result: {}", convertStreamData2Message, action == null ? "null" : action.name());
                    if (action == MessageCallback.Action.CommitSuccess) {
                        ack(messageToken);
                    }
                });
            }
        } catch (IOException e) {
            log.error("message receive error,{}, {}", e.getMessage(), streamData);
        }
    }

    public void onSettingReceive(Connection connection, Http2Settings http2Settings) {
    }

    public void onStatusChange(ConnectionStatus connectionStatus, Connection connection) {
        if (connectionStatus == ConnectionStatus.CREATED) {
            connection.setDefaultStreamListener(this);
            sendAuth(connection).whenComplete((http2Response, th) -> {
                if (th != null) {
                    log.error("failed to auth connection {}, {}", connection, th.getMessage());
                    connection.close();
                } else if (http2Response.getStatus() == HttpResponseStatus.OK) {
                    connection.setStatus(ConnectionStatus.AUTHORIZED);
                } else {
                    log.error("failed to auth connection {}, code: {}, content: {}, request id: {}", new Object[]{connection, Integer.valueOf(http2Response.getStatus().code()), new String(http2Response.getContent()), http2Response.getRequestId()});
                    connection.close();
                }
            });
        }
        boolean anyMatch = this.client.allConnections().stream().anyMatch((v0) -> {
            return v0.isAuthorized();
        });
        boolean z = this.isConnected.get();
        if (!anyMatch) {
            this.isConnected.set(false);
        }
        Optional.ofNullable(this.connectionCallback).ifPresent(connectionCallback -> {
            if (connectionStatus == ConnectionStatus.AUTHORIZED) {
                connectionCallback.onConnected(true);
            } else {
                if (anyMatch || !z) {
                    return;
                }
                connectionCallback.onConnectionLost();
            }
        });
    }

    private boolean isMatch(String str, String str2) {
        String[] split = str.split(SLASH);
        String[] split2 = str2.split(SLASH);
        if (split.length < split2.length) {
            return false;
        }
        if (split.length != split2.length && !CROSSHATCH.equals(split2[split2.length - 1])) {
            return false;
        }
        for (int i = 0; i < split2.length; i++) {
            if (!CROSSHATCH.equals(split2[i]) && !PLUS_SIGN.equals(split2[i]) && !split[i].equals(split2[i])) {
                return false;
            }
        }
        return true;
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public void connect(MessageCallback messageCallback) {
        if (!this.started.compareAndSet(false, true)) {
            throw new IotClientException("client is already connected!");
        }
        setMessageCallback(messageCallback);
        this.messageCallbackExecutorService = new ThreadPoolExecutor(this.profile.getCallbackThreadCorePoolSize(), this.profile.getCallbackThreadMaximumPoolSize(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(this.profile.getCallbackThreadBlockingQueueSize()), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("iot-message-client-receiver-%d").build());
        this.publishRetryExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("iot-message-client-schedule-thread").build());
        this.client = new IotHttp2Client(this.profile, this.profile.isMultiConnection() ? -1 : 1);
        doConnect();
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public void disconnect() {
        checkStarted();
        if (this.started.compareAndSet(true, false)) {
            this.client.removeConnectionListener(this);
            this.client.shutdown();
            this.messageCallbackExecutorService.shutdown();
            this.publishRetryExecutorService.shutdown();
            this.client = null;
            this.messageCallbackExecutorService = null;
            this.subscriptionInfo.clear();
            this.started.set(false);
        }
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public void setMessageListener(MessageCallback messageCallback) {
        this.messageCallback = messageCallback;
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public void setMessageListener(String str, MessageCallback messageCallback) {
        if (StringUtil.isEmpty(str)) {
            throw new IllegalArgumentException("topic can't be null");
        }
        Optional<SubscribeInfo> findAny = this.subscriptionInfo.stream().filter(subscribeInfo -> {
            return str.equals(subscribeInfo.getTopic());
        }).findAny();
        if (!findAny.isPresent()) {
            this.subscriptionInfo.add(new SubscribeInfo(str, messageCallback));
        } else if (messageCallback == null) {
            this.subscriptionInfo.remove(findAny.get());
        } else {
            findAny.get().setCallback(messageCallback);
        }
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public CompletableFuture<Boolean> subscribe(String str) {
        checkStarted();
        DefaultHttp2Headers defaultHttp2Headers = new DefaultHttp2Headers();
        defaultHttp2Headers.path(PATH_MESSAGE_SUB + str);
        return sendWithResult(defaultHttp2Headers, null, "failed to subscribe " + str);
    }

    private CompletableFuture<Boolean> sendWithResult(Http2Headers http2Headers, byte[] bArr, String str) {
        Connection connection = getConnection();
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        try {
            this.client.sendRequest(connection, http2Headers, bArr).whenComplete((http2Response, th) -> {
                if (th != null) {
                    log.error(str, th);
                    completableFuture.completeExceptionally(th);
                } else {
                    if (HttpResponseStatus.OK.equals(http2Response.getStatus())) {
                        completableFuture.complete(true);
                        return;
                    }
                    IotClientException iotClientException = new IotClientException(str, http2Response);
                    log.error(iotClientException.getMessage());
                    completableFuture.completeExceptionally(iotClientException);
                }
            });
        } catch (Exception e) {
            completableFuture.completeExceptionally(new IotClientException(str, e));
        }
        return completableFuture;
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public CompletableFuture<Boolean> subscribe(String str, MessageCallback messageCallback) {
        checkStarted();
        setMessageListener(str, messageCallback);
        return subscribe(str);
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public CompletableFuture<Boolean> unsubscribe(String str) {
        checkStarted();
        DefaultHttp2Headers defaultHttp2Headers = new DefaultHttp2Headers();
        defaultHttp2Headers.path(PATH_MESSAGE_UNSUB + str);
        return sendWithResult(defaultHttp2Headers, null, "failed to unsubscribe " + str);
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public MessageToken publish(String str, Message message) {
        checkStarted();
        DefaultHttp2Headers defaultHttp2Headers = new DefaultHttp2Headers();
        defaultHttp2Headers.path(PATH_MESSAGE_PUB + str);
        defaultHttp2Headers.add(QOS, String.valueOf(message.getQos()));
        MessageToken messageToken = new MessageToken(message, null, this.client);
        messageToken.setLocalMessageId(this.localMessageId.getAndIncrement());
        doPublish(messageToken, defaultHttp2Headers);
        return messageToken;
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public CompletableFuture<Boolean> ack(MessageToken messageToken) {
        DefaultHttp2Headers defaultHttp2Headers = new DefaultHttp2Headers();
        String messageId = messageToken.getMessage().getMessageId();
        defaultHttp2Headers.set(MESSAGE_ID, messageId);
        defaultHttp2Headers.path(PATH_MESSAGE_ACK);
        this.client.sendRequest(messageToken.getConnection(), defaultHttp2Headers, (byte[]) null).whenComplete((http2Response, th) -> {
            if (th != null) {
                log.error("ack failed, messageId {}, error {}", messageId, th.getMessage());
            } else if (HttpResponseStatus.OK.equals(http2Response.getStatus())) {
                log.debug("ack success, messageId {}", messageId);
            } else {
                log.error(new IotClientException("ack message: " + messageId + " failed", http2Response).getMessage());
            }
        });
        return null;
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public boolean isConnected() {
        return this.isConnected.get();
    }

    @Override // com.aliyun.openservices.iot.api.message.api.MessageClient
    public void setConnectionCallback(ConnectionCallback connectionCallback) {
        this.connectionCallback = connectionCallback;
    }

    private void doPublish(MessageToken messageToken, Http2Headers http2Headers) {
        log.info("publish message {}", Long.valueOf(messageToken.getLocalMessageId()));
        try {
            this.client.sendRequest(getConnection(), http2Headers, messageToken.getMessage().getPayload()).whenComplete((http2Response, th) -> {
                IotClientException iotClientException = null;
                if (th != null) {
                    iotClientException = new IotClientException("failed to publish, message id: {}", th);
                } else if (http2Response == null) {
                    iotClientException = new IotClientException("failed to publish, response is null");
                } else if (!HttpResponseStatus.OK.equals(http2Response.getStatus())) {
                    iotClientException = new IotClientException("failed to publish", http2Response);
                }
                if (iotClientException == null) {
                    try {
                        messageToken.getPublishFuture().complete(convertStreamData2Message(http2Response));
                        return;
                    } catch (Exception e) {
                        log.error("failed to receive response, error: {}", e.getMessage(), e);
                    }
                }
                retryPublish(messageToken, http2Headers, iotClientException, false);
            });
        } catch (Exception e) {
            retryPublish(messageToken, http2Headers, new IotClientException("failed to publish message, error msg:" + e.getMessage()), true);
        }
    }

    private void retryPublish(MessageToken messageToken, Http2Headers http2Headers, Exception exc, boolean z) {
        log.error("failed to publish message {}, error: {}", Long.valueOf(messageToken.getLocalMessageId()), exc.getMessage());
        if (messageToken.shouldStop(z)) {
            log.info("give up publishing, message id: {}", Long.valueOf(messageToken.getLocalMessageId()));
            messageToken.getPublishFuture().completeExceptionally(exc);
        } else {
            messageToken.increaseAttemptCount();
            this.publishRetryExecutorService.schedule(() -> {
                doPublish(messageToken, http2Headers);
            }, messageToken.computeSleepTime(), TimeUnit.MILLISECONDS);
            log.info("message {} will be delivery after {} ms", Long.valueOf(messageToken.getLocalMessageId()), Long.valueOf(messageToken.computeSleepTime()));
        }
    }

    private Connection getConnection() {
        return (Connection) this.client.randomConnection((v0) -> {
            return v0.isAuthorized();
        }).orElseThrow(() -> {
            return new IotClientException("fail to publish, no connection exists");
        });
    }

    private void checkStarted() {
        if (!this.started.get()) {
            throw new IotClientException("client is not connected, please connect first");
        }
    }

    public MessageCallback getMessageCallback() {
        return this.messageCallback;
    }

    public void setMessageCallback(MessageCallback messageCallback) {
        this.messageCallback = messageCallback;
    }
}
