/*
 * Decompiled with CFR 0.152.
 */
package org.apache.helix.manager.zk;

import java.io.Reader;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;

public class DefaultSchedulerMessageHandlerFactory
implements MessageHandlerFactory {
    public static final String WAIT_ALL = "WAIT_ALL";
    public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
    private static Logger _logger = Logger.getLogger(DefaultSchedulerMessageHandlerFactory.class);
    HelixManager _manager;

    public DefaultSchedulerMessageHandlerFactory(HelixManager manager) {
        this._manager = manager;
    }

    @Override
    public MessageHandler createHandler(Message message, NotificationContext context) {
        String type = message.getMsgType();
        if (!type.equals(this.getMessageType())) {
            throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:" + message.getMsgType());
        }
        return new DefaultSchedulerMessageHandler(message, context, this._manager);
    }

    @Override
    public String getMessageType() {
        return Message.MessageType.SCHEDULER_MSG.toString();
    }

    @Override
    public void reset() {
    }

    public static class DefaultSchedulerMessageHandler
    extends MessageHandler {
        HelixManager _manager;

        public DefaultSchedulerMessageHandler(Message message, NotificationContext context, HelixManager manager) {
            super(message, context);
            this._manager = manager;
        }

        @Override
        public HelixTaskResult handleMessage() throws InterruptedException {
            Criteria recipientCriteria;
            String type = this._message.getMsgType();
            HelixTaskResult result = new HelixTaskResult();
            if (!type.equals(Message.MessageType.SCHEDULER_MSG.toString())) {
                throw new HelixException("Unexpected msg type for message " + this._message.getMsgId() + " type:" + this._message.getMsgType());
            }
            int timeOut = -1;
            if (this._message.getRecord().getSimpleFields().containsKey("TIMEOUT")) {
                try {
                    timeOut = Integer.parseInt(this._message.getRecord().getSimpleFields().get("TIMEOUT"));
                }
                catch (Exception e) {
                    // empty catch block
                }
            }
            ZNRecord record = new ZNRecord("templateMessage");
            record.getSimpleFields().putAll(this._message.getRecord().getMapField("MessageTemplate"));
            Message messageTemplate = new Message(record);
            StringReader sr = new StringReader(this._message.getRecord().getSimpleField("Criteria"));
            ObjectMapper mapper = new ObjectMapper();
            try {
                recipientCriteria = (Criteria)mapper.readValue((Reader)sr, Criteria.class);
            }
            catch (Exception e) {
                _logger.error((Object)"", (Throwable)e);
                result.setException(e);
                result.setSuccess(false);
                return result;
            }
            _logger.info((Object)("Scheduler sending message, criteria:" + recipientCriteria));
            boolean waitAll = false;
            if (this._message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) != null) {
                try {
                    waitAll = Boolean.parseBoolean(this._message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
                }
                catch (Exception e) {
                    _logger.warn((Object)"", (Throwable)e);
                }
            }
            int nMsgsSent = 0;
            SchedulerAsyncCallback callback = new SchedulerAsyncCallback(this._message, this._manager);
            nMsgsSent = waitAll ? this._manager.getMessagingService().sendAndWait(recipientCriteria, messageTemplate, callback, timeOut) : this._manager.getMessagingService().send(recipientCriteria, messageTemplate, callback, timeOut);
            HelixDataAccessor accessor = this._manager.getHelixDataAccessor();
            PropertyKey.Builder keyBuilder = accessor.keyBuilder();
            HashMap<String, String> sendSummary = new HashMap<String, String>();
            sendSummary.put("MessageCount", "" + nMsgsSent);
            ZNRecord statusUpdate = ((HelixProperty)accessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), this._message.getMsgId()))).getRecord();
            statusUpdate.getMapFields().put("SentMessageCount", sendSummary);
            accessor.setProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), this._message.getMsgId()), new StatusUpdate(statusUpdate));
            result.getTaskResultMap().put("ControllerResult", "msg " + this._message.getMsgId() + " from " + this._message.getMsgSrc() + " processed");
            result.getTaskResultMap().put(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID, this._message.getMsgId());
            result.setSuccess(true);
            return result;
        }

        @Override
        public void onError(Exception e, MessageHandler.ErrorCode code, MessageHandler.ErrorType type) {
            _logger.error((Object)("Message handling pipeline get an exception. MsgId:" + this._message.getMsgId()), (Throwable)e);
        }
    }

    public static class SchedulerAsyncCallback
    extends AsyncCallback {
        StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
        Message _originalMessage;
        HelixManager _manager;
        final Map<String, Map<String, String>> _resultSummaryMap = new ConcurrentHashMap<String, Map<String, String>>();

        public SchedulerAsyncCallback(Message originalMessage, HelixManager manager) {
            this._originalMessage = originalMessage;
            this._manager = manager;
        }

        @Override
        public void onTimeOut() {
            _logger.info((Object)("Scheduler msg timeout " + this._originalMessage.getMsgId() + " timout with " + this._timeout + " Ms"));
            this._statusUpdateUtil.logError(this._originalMessage, SchedulerAsyncCallback.class, "Task timeout", this._manager.getHelixDataAccessor());
            this.addSummary(this._resultSummaryMap, this._originalMessage, this._manager, true);
        }

        @Override
        public void onReplyMessage(Message message) {
            _logger.info((Object)("Update for scheduler msg " + this._originalMessage.getMsgId() + " Message " + message.getMsgSrc() + " id " + message.getCorrelationId() + " completed"));
            String key = "MessageResult " + message.getMsgSrc() + " " + UUID.randomUUID();
            this._resultSummaryMap.put(key, message.getResultMap());
            if (this.isDone()) {
                _logger.info((Object)("Scheduler msg " + this._originalMessage.getMsgId() + " completed"));
                this._statusUpdateUtil.logInfo(this._originalMessage, SchedulerAsyncCallback.class, "Scheduler task completed", this._manager.getHelixDataAccessor());
                this.addSummary(this._resultSummaryMap, this._originalMessage, this._manager, false);
            }
        }

        private void addSummary(Map<String, Map<String, String>> _resultSummaryMap, Message originalMessage, HelixManager manager, boolean timeOut) {
            TreeMap<String, String> summary = new TreeMap<String, String>();
            summary.put("TotalMessages:", "" + _resultSummaryMap.size());
            summary.put("Timeout", "" + timeOut);
            _resultSummaryMap.put("Summary", summary);
            HelixDataAccessor accessor = manager.getHelixDataAccessor();
            PropertyKey.Builder keyBuilder = accessor.keyBuilder();
            ZNRecord statusUpdate = ((HelixProperty)accessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), originalMessage.getMsgId()))).getRecord();
            statusUpdate.getMapFields().putAll(_resultSummaryMap);
            accessor.setProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), originalMessage.getMsgId()), new StatusUpdate(statusUpdate));
        }
    }
}

