Asynchronously offloading tasks to redis listeners using Spring-Data-Redis

Introduction

Creating a scalable architecture. It's fun, exciting, but above everything else, there's such a wide variety of technologies to choose from. Blatantly using all of these technologies, will leave you with a big, heavily depending application.

In this example, instead of using the more famous message queues (rabbitMQ or any other embedded or full blown queue), I tried leveraging the pubsub capabilities of a Redis server to simulate message publishing and subscribing.

How the redis pubsub configuration works

I won't go in depth here, but it's useful to know how your messages get processed using pubsub in a redis environment. There is, as the name suggests, two parts to this exchange. First of all, there's a party which can publish messages to a channel. These messages can take on any form (be it a string, object..). On the other side, there are 0, 1 or multiple parties which are subscribed to the channel. Messages are volatile, much like a chatbox. Only parties which were subscribed at the time of publishing will receive the message.

A personal use case

For a personal project, I was dealing with data that had to be imported from a public API. Because of the size of the data, and the continuous fashion in which the information had to be fetched, processed and saved, I had to come up with an architecture that was scalable. I started tinkering with little spring boot projects, which could be deployed multiple times. Think of it as nodes. A main web application had to send out calls to the nodes, in order to fetch and process the information. I wanted to create an architecture where the nodes would not know of the existence of one another.

Why not choose the full blown MQ?

At the time of writing, I had already a redis database running in production. I used it as caching implementation for the Spring 3.1 caching abstraction, as well as for storing my HTTP-Sessions, using Spring Session. After reading through the documentation, the pubsub configuration felt like it could suffice for what I needed.

The Receiving End

As this is more of a hands-on blogpost, it'll be accompanied by a basic code-example on how my application was developed. I'll start off with the receiving end, the node part in my application. This can be just one node, or multiple listeners.

The simplest of POJOs

public class ImportTarget implements Serializable {
    private Long serverId;

    public Long getServerId() {
        return serverId;
    }

    public ImportTarget setServerId(Long serverId) {
        this.serverId = serverId;
        return this;
    }
}

This doesn't do much. This POJO is just a java class which will be send broadcasted to a redis server and picked up by a listener. This can take on any form. In one of the examples spring provided, it was just a string. I used an object, as this has more value in my opinion. This is the method where you can define what happens with the object once it is received.

A listener

public class ImportTargetListener {
    public void handleMessage(ImportTarget importTarget) {
        //doStuff(importTarget);
    }
}

The listener will be the service which will handle the message. Again, this is a basic class, as it doesn't need any annotations or interfaces.

The configuration

To make our application subscribe to our redis server, we'll need some configuration.

Let's provide our listener

 @Bean
 public ImportTargetListener importTargetListener() {
    return new ImportTargetListener();
 }

A messagelistener adapter

Next up, we'll need a MessageListenerAdaptor. Here, we'll define that our listener will have to serialize the input, which will be JSON, into a java class of the type ImportTarget. If no Serializer would have been set, the MessageListenerAdaptor would try to parse the input as a string.

    @Bean
    public MessageListenerAdapter listenerAdapter(ImportTargetListener importTargetListener) {
        MessageListenerAdapter messageListenerAdapter = new 
        MessageListenerAdapter(importTargetListener);
        messageListenerAdapter
                .setSerializer(new JacksonJsonRedisSerializer<>(ImportTarget.class));
        return messageListenerAdapter;
    }

Our Listener Container

The last part of our configuration consist of the creation of a JedisConnectionFactyory, as well as a RedisMessageListenerContainer. In this example, we setup our RedisMessageListenerContainer to make our MessageListenerAdaptor listen on the topic with the pattern import. You can change this to your likings.

    @Bean
    public JedisConnectionFactory connectionFactory() {
        return new JedisConnectionFactory();
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new PatternTopic("import"));
        return container;
    }

The entire configuration

This is the entire configuration for the receiving end.

@Configuration
public class RedisListenersConfiguration {

    @Bean
    public JedisConnectionFactory connectionFactory() {
        return new JedisConnectionFactory();
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new PatternTopic("import"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(ImportTargetListener importTargetListener) {
        MessageListenerAdapter messageListenerAdapter = new        
        MessageListenerAdapter(importTargetListener);
        messageListenerAdapter
                .setSerializer(new JacksonJsonRedisSerializer<>(ImportTarget.class));
        return messageListenerAdapter;
    }

    @Bean
    public ImportTargetListener importTargetListener() {
        return new ImportTargetListener();
    }

}

The Sending Party

Our sending party - the main web applicaton in my use case - doesn't need that much of a configuration.

Just a minor configuration

@Bean
public JedisConnectionFactory connectionFactory() {
    return new JedisConnectionFactory();
}

@Bean
public RedisTemplate<String, ImportTarget> importTargetCache(RedisConnectionFactory connectionFactory) {
    final RedisTemplate<String, ImportTarget> importTargetCache = new RedisTemplate<>();
    importTargetCache.setConnectionFactory(connectionFactory);
    importTargetCache.setDefaultSerializer(new JacksonJsonRedisSerializer<>
    (ImportTarget.class));
    return importTargetCache;
}

As you can see, all we need to do is create a RedisTemplate and set the default Serializer. As we want our object (in our case, an object of type ImportTarget) to be sent to our redis server in the form of a JSON, we choose for the JacksonJsonRedisSerializer.

Sending our payload to the topic

Sending the payload to our redis server is really easy. All we need to do now is Inject the RedisTemplate into a component and send the object to the correct topic.

@Autowired
private RedisTemplate<String, ImportTarget> importTargetTemplate;

public HttpEntity<String> importForRealm(Long id) {
    importTargetTemplate.convertAndSend("import", new ImportTarget().setServerId(id));
    return new HttpEntity<>("OK");
}

Caveats

One this to be remembered is that our redis server, as said before, treats our channel as a topic, and not a queue. Therefore, all of our listeners will react to the call, not just one. If no listeners were defined, or no application was actively listening to our channel, nobody would get notified and the message would be gone.

Comments

comments powered by Disqus