Ad-Hockery

ad-hockery: /ad·hok'@r·ee/, n.
Gratuitous assumptions... which lead to the appearance of semi-intelligent behavior but are in fact entirely arbitrary. Jargon File

Testing the RxJava poller

Yesterday I posted about an implementation of a simple remote service polling mechanism using RxJava. One of the things I particularly liked when applying this pattern at work was how straightforward it was to unit test.

Often when dealing with asynchronous processing unit testing can be pretty painful. Typically you need to use a mechanism such as Java’s CountDownLatch or Spock’s BlockingVariables or PollingConditions to synchronize threads before making assertions. Allowing processing to run asynchronously, especially when testing scheduled activity can make tests very slow as well.

Ideally tests the asynchronous nature of the code is abstracted and the timer can be faked out. This is exactly the approach that RxJava takes.

Recall that we used Schedulers.io() to ensure that blocking I/O was handled appropriately. If we make that scheduler a property of the poller object we can replace it in tests with Schedulers.test().

If we wrap the polling logic discussed in the previous post up in a start method of a MessagePoller class we can have a constructor that looks like this:

public MessagePoller(MessageService messageService,
                     Action1<Message> subscriber,
                     Scheduler scheduler) {

For the sake of brevity I’m actually omitting a couple of parameters here – we’d also want the polling interval and the message recipient to be parameters. For the examples in this post I’ll hardcode those.

The test scheduler provides a method advanceTimeBy(long, TimeUnit) that fakes a delay without any time actually having to elapse. For our purposes this is ideal as we can use advanceTimeBy(30, SECONDS) to trigger a poll on the remote service.

It’s very easy to write a test that ensures the subscriber is sent a message returned by the remote service.

def messageService = Stub(MessageService)
def subscriber = Mock(Action1)
@Shared scheduler = Schedulers.test()

@Subject messagePoller = new MessagePoller(messageService,
                                           subscriber,
                                           scheduler)

def "subscriber can receive a single message"() {
  given: "the remote service will return a single message"
  messageService.recentMessages(_) >> [message]

  and: "polling has started"
  messagePoller.start()

  when: "the next poll is triggered"
  scheduler.advanceTimeBy(30, SECONDS)

  then: "the subscriber receives the message"
  1 * subscriber.call(message)

  where:
  message = new Message("1", "Hi", "Cam", "Rob")
}

We stub the response from the remote service, start polling, fake the interval ocurring and then assert that our subscriber mock receives the message. There’s no thread synchronization and the test runs almost instantaneously.

We can just as easily test that messages are streamed consistently from multiple calls to the remote service.

def "subscriber can receive multiple messages from multiple polls"() {
  given:
  messageService.recentMessages(_) >>
      [message1, message2] >>
      [message3]

  and:
  messagePoller.start()

  when:
  scheduler.advanceTimeBy(60, SECONDS)

  then:
  with(subscriber) {
    1 * call(message1)
    1 * call(message2)
    1 * call(message3)
  }

  where:
  message1 = new Message("1", "Hi", "Cam", "Rob")
  message2 = new Message("2", "Bye", "Cam", "Rob")
  message3 = new Message("3", "Hi", "Clay", "Rob")
}

Here we trigger two polls by advancing the time further and ensuring that the subscriber recieves all the messages.

The test sets up two sequential return values for the messageService stub using Spock’s chaining syntax. The first time messageServices.recentMessages gets called it will respond with [message1, message2] and the second time it will respond with [message3].

Using the same technique we can also test that exceptions do not stop the polling mechanism.

def "polling continues after an error from the message service"() {
  given:
  messageService.recentMessages(_) >>
      [message1] >>
      { throw MockHttpException.newInternalError(null) } >>
      [message2]

  and:
  messagePoller.start()

  when:
  scheduler.advanceTimeBy(90, SECONDS)

  then:
  2 * subscriber.call(_)

  where:
  message1 = new Message("1", "Hi", "Tomas", "Rob")
  message2 = new Message("2", "Bye", "Tomas", "Rob")
}

Here we poll the remote service three times. Using Spock’s stub chaining we’ve set up 3 responses, the second of which will throw an exception. We then assert that both messages arrive because the exception was not allowed to kill the stream.

It’s similarly straightforward to test other aspects of the polling mechanism; message filtering by recipient, de-duplication and so on.

For those that are interested in playing with this I have published a small GitHub project containing this example and its tests.

Web Statistics