package org.apache.camel.component.redis;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.direct.DirectConsumer;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.Topic;

/* loaded from: input_file:org/apache/camel/component/redis/RedisConsumer.class */
public class RedisConsumer extends DirectConsumer implements MessageListener {
    private final RedisConfiguration redisConfiguration;

    public RedisConsumer(RedisEndpoint redisEndpoint, Processor processor, RedisConfiguration redisConfiguration) {
        super(redisEndpoint, processor);
        this.redisConfiguration = redisConfiguration;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.redisConfiguration.getListenerContainer().addMessageListener(this, toTopics(this.redisConfiguration.getChannels()));
    }

    private Collection<Topic> toTopics(String str) {
        String[] split = str.split(",");
        ArrayList arrayList = new ArrayList();
        for (String str2 : split) {
            if (Command.PSUBSCRIBE.toString().equals(this.redisConfiguration.getCommand())) {
                arrayList.add(new PatternTopic(str2));
            } else {
                if (!Command.SUBSCRIBE.toString().equals(this.redisConfiguration.getCommand())) {
                    throw new RuntimeException("Unsupported Command");
                }
                arrayList.add(new ChannelTopic(str2));
            }
        }
        return arrayList;
    }

    public void onMessage(Message message, byte[] bArr) {
        try {
            Exchange createExchange = getEndpoint().createExchange();
            setChannel(createExchange, message.getChannel());
            setPattern(createExchange, bArr);
            setBody(createExchange, message.getBody());
            getProcessor().process(createExchange);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void setBody(Exchange exchange, byte[] bArr) {
        if (bArr != null) {
            exchange.getIn().setBody(this.redisConfiguration.getSerializer().deserialize(bArr));
        }
    }

    private void setPattern(Exchange exchange, byte[] bArr) {
        if (bArr != null) {
            exchange.getIn().setHeader(RedisConstants.PATTERN, bArr);
        }
    }

    private void setChannel(Exchange exchange, byte[] bArr) throws UnsupportedEncodingException {
        if (bArr != null) {
            exchange.getIn().setHeader(RedisConstants.CHANNEL, new String(bArr, "UTF8"));
        }
    }
}
