Tuesday, 23 February 2016

Lesson learned with Ratpack and RxJava

We recently encountered an issue with integrating Ratpack with RxJava's Observables.  This post will detail what the error was, how it was broken and how it was fixed.

The code 

Essentially, our handler code was as follows (NOTE: see git project for this class and also a test showing the behaviour of the broken code):

@Override
public void handle(Context context) throws Exception {

    Observable<String> contentFromDownstreamSystem = observableOnDifferentThreadService.getContent();

    contentFromDownstreamSystem.subscribe(response -> {
        context.render("Downstream system returned: " + response);
            }
    );
}


The code above retrieves an Observable from a service.  As implied by the variable name, the Observable emits items on a different thread when it is subscribed to.  The Obsevable I have used to illustrate this issue is very simple and can be found here.  In our real world example, our Observable represented a ResultSetFuture from Cassandra which would emit items after a (very short) period of time.  

The errors


No response error


The first error we encountered was as follows:

[2016-02-17 08:39:16,812] ratpack-demo WARN  [ratpack-compute-1-2] r.s.i.NettyHandlerAdapter - No response sent for GET request to /observable-different-thread-broken (last handler: com.github.phillbarber.scenario.observablethread.ObservableOnDifferentThreadHandlerBroken)


Despite the fact our Observable was emitting items that would take some time to be emitted, the above error occurred seemingly immediately after our handler had completed.  This also resulted in a http 500 error response issued to the client.

Double transmission error


It gets better!  Not only did we get an error indicating no response, we then saw an error implying we had tried to send two responses as follows:

[2016-02-17 08:39:16,821] ratpack-demo WARN  [Thread-4] r.s.i.DefaultResponseTransmitter - attempt at double transmission for: /observable-different-thread-broken
ratpack.handling.internal.DoubleTransmissionException: attempt at double transmission for: /observable-different-thread-broken

The Problem


The problem here is that Ratpack is not aware that the request is dependent on the Observable emitting items.  In other words, the request's Execution does not contain a reference to the Execution segment which represents the Observable's success action (the lambda passed to the subscribe method).  Since it seems to ratpack that there s no further work to do, ratpack's NettyHandlerAdapter detects that no response has been sent and issues the "No response sent for request" error and issues an error response to the client.

The final twist is that eventually our Observable's action is completed.  When it tries to write a response, it can't as the response for the request has already been committed.  This is why we get the "double transmission" error.


The fix - Convert your Observable to a Ratpack Promise


We need to ensure that the request's Execution has a reference to the Execution segment of our success action.   This is done by converting the Observable to a Ratpack Promise and activating it as follows (see fixed code in git here and a test here):

@Override
public void handle(Context context) throws Exception {

    Observable<String> contentFromDownstreamSystem = observableOnDifferentThreadService.getContent();

    RxRatpack.promise(contentFromDownstreamSystem).then(response -> {
                context.render("Downstream system returned: " + response);
            }
    );
}


If you read the ratpack documentation, this will seems obvious and you might wonder why we made this mistake in the first place.  However we were tricked into thinking that the broken code would work due to some very subtle ways in which the broken code can actually work.  The broken code will work just fine under the following scenarios:

  1. If the Observable returned by the service was converted from a Promise e.g. a Promise returned by the Ratpack httpclient.  That way the Promise will have been activated (or rather the execution segment added to the execution) indirectly by some other code and not explicitly by the Handler.
  2. The Observable synchronously emits items on the same thread.  Not sure of a real life example as to why you'd do this but it can occur during your testing when mocking and stubbing.
Our conclusion is, that when using RxJava with ratpack you should always convert to a promise in your handler layer.  You should do this even if you don't need to (as descibred by points one and two above) so as to play it safe incase the implementation of the Observable changes in the future.  

Why are we using RxJava


When we decided to use Ratpack, we wanted to avoid depending on it throughout our entire codebase.  If all of our services dealt returned Ratpack Promises, we'd have an even bigger job on our hands if we decided to switch frameworks.  It was hoped that using RxJava would decouple most of our code from Ratpack.  

Even with the extra learning curve of using RxJava, this seems reasonable as typically a web framework will only be referenced from your code in the front end, web/controller layer. It seems a bit of an anti pattern to do depend on it throughout the entire code base.

Summary


You only understand how things work when things go wrong.  This was a great problem for us as a team to figure out since it taught us about the intricacies of how Ratpack actually works.