/*
 * Decompiled with CFR 0.152.
 */
package org.apache.helix.messaging.handling;

import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.messaging.handling.GroupMessageHandler;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;

public class HelixTask
implements Callable<HelixTaskResult> {
    private static Logger logger = Logger.getLogger(HelixTask.class);
    private final Message _message;
    private final MessageHandler _handler;
    private final NotificationContext _notificationContext;
    private final HelixManager _manager;
    StatusUpdateUtil _statusUpdateUtil;
    HelixTaskExecutor _executor;
    volatile boolean _isTimeout = false;

    public HelixTask(Message message, NotificationContext notificationContext, MessageHandler handler, HelixTaskExecutor executor) throws Exception {
        this._notificationContext = notificationContext;
        this._message = message;
        this._handler = handler;
        this._manager = notificationContext.getManager();
        this._statusUpdateUtil = new StatusUpdateUtil();
        this._executor = executor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public HelixTaskResult call() {
        String errorMessage;
        Timer timer = null;
        if (this._message.getExecutionTimeout() > 0) {
            timer = new Timer(true);
            timer.schedule((TimerTask)new TimeoutCancelTask(this._executor, this._message, this._notificationContext), this._message.getExecutionTimeout());
            logger.info((Object)("Message starts with timeout " + this._message.getExecutionTimeout() + " MsgId:" + this._message.getMsgId()));
        } else {
            logger.info((Object)("Message does not have timeout. MsgId:" + this._message.getMsgId() + "/" + this._message.getPartitionName()));
        }
        HelixTaskResult taskResult = new HelixTaskResult();
        Exception exception = null;
        MessageHandler.ErrorType type = MessageHandler.ErrorType.INTERNAL;
        MessageHandler.ErrorCode code = MessageHandler.ErrorCode.ERROR;
        long start = System.currentTimeMillis();
        logger.info((Object)("msg:" + this._message.getMsgId() + " handling task begin, at: " + start));
        HelixDataAccessor accessor = this._manager.getHelixDataAccessor();
        this._statusUpdateUtil.logInfo(this._message, HelixTask.class, "Message handling task begin execute", accessor);
        this._message.setExecuteStartTimeStamp(new Date().getTime());
        try {
            taskResult = this._handler.handleMessage();
            exception = taskResult.getException();
        }
        catch (InterruptedException e) {
            this._statusUpdateUtil.logError(this._message, HelixTask.class, e, "State transition interrupted, timeout:" + this._isTimeout, accessor);
            logger.info((Object)("Message " + this._message.getMsgId() + " is interrupted"));
            taskResult.setInterrupted(true);
            taskResult.setException(e);
            exception = e;
        }
        catch (Exception e) {
            errorMessage = "Exception while executing a message. " + e + " msgId: " + this._message.getMsgId() + " type: " + this._message.getMsgType();
            logger.error((Object)errorMessage, (Throwable)e);
            this._statusUpdateUtil.logError(this._message, HelixTask.class, e, errorMessage, accessor);
            taskResult.setSuccess(false);
            taskResult.setException(e);
            taskResult.setMessage(e.getMessage());
            exception = e;
        }
        if (timer != null) {
            timer.cancel();
        }
        if (taskResult.isSucess()) {
            this._statusUpdateUtil.logInfo(this._message, this._handler.getClass(), "Message handling task completed successfully", accessor);
            logger.info((Object)("Message " + this._message.getMsgId() + " completed."));
        } else if (taskResult.isInterrupted()) {
            logger.info((Object)("Message " + this._message.getMsgId() + " is interrupted"));
            MessageHandler.ErrorCode errorCode = code = this._isTimeout ? MessageHandler.ErrorCode.TIMEOUT : MessageHandler.ErrorCode.CANCEL;
            if (this._isTimeout) {
                int retryCount = this._message.getRetryCount();
                logger.info((Object)("Message timeout, retry count: " + retryCount + " MSGID:" + this._message.getMsgId()));
                this._statusUpdateUtil.logInfo(this._message, this._handler.getClass(), "Message handling task timeout, retryCount:" + retryCount, accessor);
                if (retryCount > 0) {
                    this._message.setRetryCount(retryCount - 1);
                    this._executor.scheduleTask(this._message, this._handler, this._notificationContext);
                    return taskResult;
                }
            }
        } else {
            String errorMsg = "Message execution failed. msgId: " + this._message.getMsgId() + taskResult.getMessage();
            if (exception != null) {
                errorMsg = errorMsg + exception;
            }
            logger.error((Object)errorMsg, (Throwable)exception);
            this._statusUpdateUtil.logError(this._message, this._handler.getClass(), errorMsg, accessor);
        }
        try {
            if (!this._message.getGroupMessageMode()) {
                this.removeMessageFromZk(accessor, this._message);
                this.reportMessageStat(this._manager, this._message, taskResult);
                this.sendReply(accessor, this._message, taskResult);
            } else {
                GroupMessageHandler.GroupMessageInfo info = this._executor._groupMsgHandler.onCompleteSubMessage(this._message);
                if (info != null) {
                    Map<PropertyKey, CurrentState> curStateMap = info.merge();
                    for (PropertyKey key : curStateMap.keySet()) {
                        accessor.updateProperty(key, (HelixProperty)curStateMap.get(key));
                    }
                    this.removeMessageFromZk(accessor, this._message);
                    this.reportMessageStat(this._manager, this._message, taskResult);
                    this.sendReply(accessor, this._message, taskResult);
                }
            }
            this._executor.reportCompletion(this._message);
        }
        catch (Exception e) {
            errorMessage = "Exception after executing a message, msgId: " + this._message.getMsgId() + e;
            logger.error((Object)errorMessage, (Throwable)e);
            this._statusUpdateUtil.logError(this._message, HelixTask.class, errorMessage, accessor);
            exception = e;
            type = MessageHandler.ErrorType.FRAMEWORK;
            code = MessageHandler.ErrorCode.ERROR;
        }
        finally {
            long end = System.currentTimeMillis();
            logger.info((Object)("msg:" + this._message.getMsgId() + " handling task completed, results:" + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start)));
            if (exception != null) {
                this._handler.onError(exception, code, type);
            }
        }
        return taskResult;
    }

    private void removeMessageFromZk(HelixDataAccessor accessor, Message message) {
        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
        if (message.getTgtName().equalsIgnoreCase("controller")) {
            accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
        } else {
            accessor.removeProperty(keyBuilder.message(this._manager.getInstanceName(), message.getMsgId()));
        }
    }

    private void sendReply(HelixDataAccessor accessor, Message message, HelixTaskResult taskResult) {
        if (this._message.getCorrelationId() != null && !message.getMsgType().equals(Message.MessageType.TASK_REPLY.toString())) {
            logger.info((Object)("Sending reply for message " + message.getCorrelationId()));
            this._statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
            taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSucess());
            taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
            if (!taskResult.isSucess()) {
                taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
            }
            Message replyMessage = Message.createReplyMessage(this._message, this._manager.getInstanceName(), taskResult.getTaskResultMap());
            replyMessage.setSrcInstanceType(this._manager.getInstanceType());
            if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) {
                PropertyKey.Builder keyBuilder = accessor.keyBuilder();
                accessor.setProperty(keyBuilder.message(message.getMsgSrc(), replyMessage.getMsgId()), replyMessage);
            } else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) {
                PropertyKey.Builder keyBuilder = accessor.keyBuilder();
                accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), replyMessage);
            }
            this._statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to " + replyMessage.getTgtName(), accessor);
        }
    }

    private void reportMessageStat(HelixManager manager, Message message, HelixTaskResult taskResult) {
        if (!message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) {
            return;
        }
        long now = new Date().getTime();
        long msgReadTime = message.getReadTimeStamp();
        long msgExecutionStartTime = message.getExecuteStartTimeStamp();
        if (msgReadTime != 0L && msgExecutionStartTime != 0L) {
            long totalDelay = now - msgReadTime;
            long executionDelay = now - msgExecutionStartTime;
            if (totalDelay > 0L && executionDelay > 0L) {
                String fromState = message.getFromState();
                String toState = message.getToState();
                String transition = fromState + "--" + toState;
                StateTransitionContext cxt = new StateTransitionContext(manager.getClusterName(), manager.getInstanceName(), message.getResourceName(), transition);
                StateTransitionDataPoint data = new StateTransitionDataPoint(totalDelay, executionDelay, taskResult.isSucess());
                this._executor.getParticipantMonitor().reportTransitionStat(cxt, data);
            }
        } else {
            logger.warn((Object)"message read time and start execution time not recorded.");
        }
    }

    public class TimeoutCancelTask
    extends TimerTask {
        HelixTaskExecutor _executor;
        Message _message;
        NotificationContext _context;

        public TimeoutCancelTask(HelixTaskExecutor executor, Message message, NotificationContext context) {
            this._executor = executor;
            this._message = message;
            this._context = context;
        }

        @Override
        public void run() {
            HelixTask.this._isTimeout = true;
            logger.warn((Object)("Message time out, canceling. id:" + this._message.getMsgId() + " timeout : " + this._message.getExecutionTimeout()));
            HelixTask.this._handler.onTimeout();
            this._executor.cancelTask(this._message, this._context);
        }
    }
}

