When integrating independent services to build larger systems, it’s often important for services to keep track of the status of the other services that they depend on. Especially for a microservices approach, where services should expect their dependencies can be absent at any point in time. Services that cope with dependencies being unavailable make a less flaky and more resilient, hands off system.
Services can dynamically change their behaviour as the states of their dependencies change. If a dependency is offline, a service can decide to re-route work, buffer it until processing can resume, or explicitly reject requests. Doing this proactively by reacting to changes in dependencies’ statuses makes this much more fluid.
This post will look at a simple way of using Reactive Extensions to keep track of dependency status.
A simple simulation of a heartbeating dependency
Modelling a dependency’s status as a stream of updates with Reactive Extensions is quite easy. A basic approach is to have a stream of heartbeats – receiving a heartbeat means the service is healthy, missing heartbeats indicates it’s not. Heartbeats can be gathered through a ping-pong http request/responses, or heartbeats broadcast over a message bus, for example.
For the purposes of this post, I’ll use a fake heartbeat stream, which in reality would be a stream that fires whenever a heartbeat happens, from some kind of message or request arriving.
A stream of heartbeats arriving once per second
We’ll start with a heartbeat stream that updates every second, using Interval:
Observable.Interval(TimeSpan.FromSeconds(1))
A simulated gap in heartbeats
We’ll also need some kind of delay in heartbeat delivery so that we can see what happens when the service goes missing. Given that we expect heartbeats every second, I’ll say that three seconds with no heartbeats indicates something is wrong:
var heartbeatThreshold = TimeSpan.FromSeconds(3); var pause = Observable.Empty<long>() .Delay(heartbeatThreshold.Add( TimeSpan.FromSeconds(2)));
Creating a heartbeat stream with simulated pauses
Now we’ve got a heartbeat stream and a pause that acts like missing heartbeats. The pause lasts for five seconds so it should trigger the missed heartbeat logic. The heartbeats and pause can be tied together to create an unending stream of heartbeats followed by a pause, followed by more heartbeats, etc. The code below takes five heartbeats, appends a pause using Concat, and repeats this whole stream:
var heartbeatThreshold = TimeSpan.FromSeconds(3); var pause = Observable.Empty<long>() .Delay(heartbeatThreshold.Add( TimeSpan.FromSeconds(2))); var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Take(5) .Concat(pause) .Repeat();
That’s a basic simulation of a dependency’s heartbeats, including it periodically going missing, then coming back to life. The code to turn that into a stream of health statuses is actually less than the code to simulate the heartbeats.
Detecting dependencies’ status
What does healthy look like?
The first requirement is a way of indicating whether the dependency is healthy or unhealthy. In C#, this can be done with a simple Enum:
enum Status { Healthy, Unhealthy }
When the service receives a heartbeat from a dependency, it knows that dependency is healthy. So the heartbeat stream can be turned into a Status stream using Select/Map:
var heartbeats = source.Select(_ => Status.Healthy) .Publish().RefCount();
Publish().RefCount() is used to share subscriptions to this stream, as we’ll be using it twice – once for healthy updates and once for unhealthy updates. Rxjs has a nicer version of this combination simply called share.
Detecting missed heartbeats
Now to work out when the service is not healthy. Recall that the heartbeatThreshold was three seconds – after this the dependency should be flagged Unhealthy. What we need is an operator that only pushes a notification once the heartbeat stream hasn’t produced an update for three seconds.
This can be done using throttle (aka debounce), and mapping the event to an Unhealthy status:
var missedHeartbeats = heartbeats.Throttle(heartbeatThreshold) .Select(_ => Status.Unhealthy);
Combining into a dependency status stream
Now there’s a stream of Healthy updates and a stream of Unhealthy updates. Merging these together produces a stream of the current status of the service:
var status = heartbeats.Merge(missedHeartbeats) .StartWith(Status.Unhealthy);
Optionally, the stream can be started with a certain status. This means that as a service is starting up, it treats its dependencies as Unhealthy until it hears a heartbeat from them. Alternately, there could be an extra Unknown/Undefined status. This means that services can start but delay full activation until all dependencies are accounted for. That way the service doesn’t accept requests till it can handle them.
Adding logging to status changes
Status streams are a great place to add logging, which can be done by adding a Do:
var status = heartbeats.Merge(missedHeartbeats) .StartWith(Status.Unhealthy) .Do(s => Trace.TraceInformation("Service status is {s}."));
Other uses
This approach is fairly generic – it’s been applied to detecting a service’s health through heartbeats. But really it could be applied to any source of updating information which can unpredictably change state, and where that state change is important to the consumer.
Latency
For example, in an application where latency to user’s machines is important (eg: games), it would be possible to make a stream of measured latency values. Latency can vary dramatically, especially if the user is on a mobile network. It would be simple to adapt the above approach to classify certain levels of latency as “healthy” and “unhealthy”, or to identify sudden changes in latency. That would allow the app to react – possibly smoothly degrading the user experience to match current capabilities.
Degrading stream bitrate could be seen as a pretty similar problem as well, but based on bandwidth instead of latency.
Financial market data staleness
Another application is in identifying and reacting to market data staleness. This is a common problem in financial applications, and Rx makes for a neat and simple solution. I’ll have a look at this in a future post.