Ad-Hockery

ad-hockery: /ad·hok'@r·ee/, n.
Gratuitous assumptions... which lead to the appearance of semi-intelligent behavior but are in fact entirely arbitrary. Jargon File

Simple background polling with RxJava

I’ve done odd bits of playing around with RxJava before but never dived into it very deeply. Today I wanted to re-implement an unreliable background polling operation and Tomás Lin suggested I look at using RxJava. This may be an obvious implementation to some but I had to do a bit of experimentation (and searching Stack Overflow) to come up with it.

Imagine we’re polling a web-service regularly. The service returns a JSON array of independent items (think events since last poll, Tweets with a particular hashtag or something similar). We want to do some processing (filtering, parsing to a type) on each item and then pass them off to a handler.

The first thing we need is a heartbeat so:

Observable.interval(30, TimeUnit.SECONDS)

That will return an Observable<Long> that emits a value every 30 seconds. The values are clock ticks starting at zero.

At each tick we want to poll a remote service. For the sake of simplicity let’s assume we’re using a REST client like Retrofit that does the HTTP transport and JSON parsing for us so we have an interface like this:

public interface MessageService {
  // annotations are ommitted but this method makes a REST call
  List<Message> getRecentMessages();
}

public interface Message {
  boolean isFor(Recipient recipient);
}

How do we translate our tick sequence into calls to getRecentMessages? We can think of it as a map operation – just one in which we don’t care about the original value.

Observable.interval(30, TimeUnit.SECONDS)
          .map((tick) -> messageService.getRecentMessages())

Now we have an Observable<List<Message>>. So far so good. However, that map operation is doing blocking I/O so we should shift it onto an appropriate scheduler (yes, Retrofit can return rx.Observable so we could handle this in a non-blocking way but I’ll skip that for the purposes of this discussion).

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())

For this example we’ll consume the messages by just subscribing and logging what we get.

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())
          .subscribe(messages -> {
            // the subscriber is called with List<Message> every time
            // the remote service is polled
            for (Message message : messages) {
              log.info(message.toString())
            }
          })

Of course the call to the remote service may fail – the service may be down or rate limiting us for example. We need to ensure our subscription doesn’t end because the call threw an exception. We could add a try/catch around the remote call but RxJava has a mechanism for retrying that is a little neater.

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())
          .retry()
          .subscribe(messages -> {
            for (Message message : messages) {
              log.info(message.toString())
            }
          })

Now any exceptions are swallowed and the poll will get repeated at the next interval. We could get more sophisticated here and use a back-off policy but the key point is the subscriber only has to deal with messages that were successfully retrieved.

That said, we probably want to know the remote service is giving us trouble so let’s add some logging.

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())
          // The error handler is called with any exception raised upstream
          .doOnError(err -> log.error("Error retrieving messages", err))
          // Errors do not propagate past retry so they must be handled before
          .retry()
          .subscribe(messages -> {
            for (Message message : messages) {
              log.info(message.toString())
            }
          })

Now the next thing we need to do is filter out any messages not intended for us. That’s pretty easy with another map operation on the List<Message> emitted every time we poll.

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())
          .doOnError(err -> log.error("Error retrieving messages", err))
          .retry()
          .map(messages ->
            // a 2nd map operation is called with the List<Message> emitted upstream
            messages.stream()
                    // filter to a sub-list containing only "our" messages
                    .filter(message -> message.isFor(recipient))
                    // turn sub-list stream back into a List<Message>
                    .collect(Collectors.toList()))
          .subscribe(messages -> {
            for (Message message : messages) {
              log.info(message.toString())
            }
          })

At this point everything is working but notice that we’re always dealing with List<Message>. The grouping is just an artifact of the way the messages are retrieved from the remote service. We really should be dealing with a stream of individual messages. I scratched my head on this one for a little while until Jon Schneider and Tim Yates provided me with an obvious and simple solution.

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())
          .doOnError(err -> log.error("Error retrieving messages", err))
          .retry()
          // turns each List<Message> into an Observable<Message> that flatMap
          // will join into a single stream
          .flatMap(Observable::from)
          .filter(message -> message.isFor(recipient))
          .subscribe(message -> log.info(message.toString()))

The flatMap method allows you to produce a new Observable for each value emitted from upstream and coalesceses them into a single downstream Observable.

This is much neater. Now the downstream operations are free to deal with individual messages – we can handle the recipient filtering with a simple filter operation instead of mapping the original list to a sub-list and we no longer need to loop over multiple messages in the subscriber.

One last thing is that perhaps our remote service will give us some duplicate messages on subsequent calls. An ideal way to handle that would be to send a last message id or an HTTP If-Modified-Since header with the request but if that’s not an option Rx can help us.

Observable.interval(30, TimeUnit.SECONDS, Schedulers.io())
          .map(tick -> messageService.getRecentMessages())
          .doOnError(err -> log.error("Error retrieving messages", err))
          .retry()
          .flatMap(Observable::from)
          // filter out any previously seen messages
          .distinct()
          .filter(message -> message.isFor(recipient))
          .subscribe(message -> log.info(message.toString()))

Obviously using distinct assumes that our Message objects are unique and implement hashCode properly.

We now have a filtered stream of unique messages generated by polling a remote service in an error-tolerant way.

Web Statistics