Tuesday, 23 February 2016

Lesson learned with Ratpack and RxJava

We recently encountered an issue with integrating Ratpack with RxJava's Observables.  This post will detail what the error was, how it was broken and how it was fixed.

The code 

Essentially, our handler code was as follows (NOTE: see git project for this class and also a test showing the behaviour of the broken code):

@Override
public void handle(Context context) throws Exception {

    Observable<String> contentFromDownstreamSystem = observableOnDifferentThreadService.getContent();

    contentFromDownstreamSystem.subscribe(response -> {
        context.render("Downstream system returned: " + response);
            }
    );
}


The code above retrieves an Observable from a service.  As implied by the variable name, the Observable emits items on a different thread when it is subscribed to.  The Obsevable I have used to illustrate this issue is very simple and can be found here.  In our real world example, our Observable represented a ResultSetFuture from Cassandra which would emit items after a (very short) period of time.  

The errors


No response error


The first error we encountered was as follows:

[2016-02-17 08:39:16,812] ratpack-demo WARN  [ratpack-compute-1-2] r.s.i.NettyHandlerAdapter - No response sent for GET request to /observable-different-thread-broken (last handler: com.github.phillbarber.scenario.observablethread.ObservableOnDifferentThreadHandlerBroken)


Despite the fact our Observable was emitting items that would take some time to be emitted, the above error occurred seemingly immediately after our handler had completed.  This also resulted in a http 500 error response issued to the client.

Double transmission error


It gets better!  Not only did we get an error indicating no response, we then saw an error implying we had tried to send two responses as follows:

[2016-02-17 08:39:16,821] ratpack-demo WARN  [Thread-4] r.s.i.DefaultResponseTransmitter - attempt at double transmission for: /observable-different-thread-broken
ratpack.handling.internal.DoubleTransmissionException: attempt at double transmission for: /observable-different-thread-broken

The Problem


The problem here is that Ratpack is not aware that the request is dependent on the Observable emitting items.  In other words, the request's Execution does not contain a reference to the Execution segment which represents the Observable's success action (the lambda passed to the subscribe method).  Since it seems to ratpack that there s no further work to do, ratpack's NettyHandlerAdapter detects that no response has been sent and issues the "No response sent for request" error and issues an error response to the client.

The final twist is that eventually our Observable's action is completed.  When it tries to write a response, it can't as the response for the request has already been committed.  This is why we get the "double transmission" error.


The fix - Convert your Observable to a Ratpack Promise


We need to ensure that the request's Execution has a reference to the Execution segment of our success action.   This is done by converting the Observable to a Ratpack Promise and activating it as follows (see fixed code in git here and a test here):

@Override
public void handle(Context context) throws Exception {

    Observable<String> contentFromDownstreamSystem = observableOnDifferentThreadService.getContent();

    RxRatpack.promise(contentFromDownstreamSystem).then(response -> {
                context.render("Downstream system returned: " + response);
            }
    );
}


If you read the ratpack documentation, this will seems obvious and you might wonder why we made this mistake in the first place.  However we were tricked into thinking that the broken code would work due to some very subtle ways in which the broken code can actually work.  The broken code will work just fine under the following scenarios:

  1. If the Observable returned by the service was converted from a Promise e.g. a Promise returned by the Ratpack httpclient.  That way the Promise will have been activated (or rather the execution segment added to the execution) indirectly by some other code and not explicitly by the Handler.
  2. The Observable synchronously emits items on the same thread.  Not sure of a real life example as to why you'd do this but it can occur during your testing when mocking and stubbing.
Our conclusion is, that when using RxJava with ratpack you should always convert to a promise in your handler layer.  You should do this even if you don't need to (as descibred by points one and two above) so as to play it safe incase the implementation of the Observable changes in the future.  

Why are we using RxJava


When we decided to use Ratpack, we wanted to avoid depending on it throughout our entire codebase.  If all of our services dealt returned Ratpack Promises, we'd have an even bigger job on our hands if we decided to switch frameworks.  It was hoped that using RxJava would decouple most of our code from Ratpack.  

Even with the extra learning curve of using RxJava, this seems reasonable as typically a web framework will only be referenced from your code in the front end, web/controller layer. It seems a bit of an anti pattern to do depend on it throughout the entire code base.

Summary


You only understand how things work when things go wrong.  This was a great problem for us as a team to figure out since it taught us about the intricacies of how Ratpack actually works.

  

3 comments:

  1. Hi Phil,
    I'm have a similar but a slightly different problem that I'm stuck with. Just wondering if you have any insights to a possible workaround/solution.

    Basically I have a scenario that needs handling a possible long running request and I need to respond either a 200 or a 202 depending on a certain time threshold is reached. In case of threshold reached, i still need to continue the transaction but respond 202 immediately to client. I tried doing this converting the ratpack promise to rx.Observable's and using timeout / timers but it leads to a bit of a convoluted mess and going nowhere.

    wonder if you have any suggestions? I also posted this at https://github.com/ratpack/ratpack/issues/612

    ReplyDelete
  2. Hi, I have written some code here that does what I think you're after...

    https://github.com/phillbarber/ratpack-demo/blob/timeout_with_different_responses/application/src/main/java/com/github/phillbarber/service/DownstreamHttpService.java

    Also see the tests (returnsA200IfOnTime, returnsA202IfLate) here... https://github.com/phillbarber/ratpack-demo/blob/timeout_with_different_responses/application/src/test/java/com/github/phillbarber/service/http/DownstreamHttpServiceTest.java

    Not sure if that's what you're after or not, but either way I had fun playing with Observables again!

    ReplyDelete
  3. Hi Phil,
    Thank you for your response.
    I actually tried that, but it seems Rx stops the upstream process in case if the timeout triggers. In my case I want the upstream to continue even on a timeout.
    In the end I managed to get this working by `merging` two separate Observables one produces an event on a time interval the other process the transaction. So whichever triggers first I respond back to client while I still handle the second event differently.
    Luke Daley also suggested a different approach to my question in https://github.com/ratpack/ratpack/issues/612

    ReplyDelete