Managing variable rate streams with Reactive Extensions

In a previous post, I looked at how Rx can be used to neatly track the status of service dependencies. In this post, I’ll look at how the same technique can be used to manage streams that update with a varying frequency, and a specific application – tracking market data staleness in financial applications.

Variable rate streams

Let’s say you’re looking at selling your house. You want to get an idea what it might be worth, so you look at other houses selling in your area. If there’s one house selling every day, you can watch the sales come in for a little while and see what range of prices they’re in. This gives you a fairly good idea what your house is worth, and even whether the prices are increasing, decreasing or staying flat.

What if all the sales stop coming in, and nothing sells for a month? It could caused by any number of things: the economy, interest rates, or just a holiday season. Suddenly you’ve got less confidence in setting your sale price for your house. There hasn’t been a sale for a while, so there’s a greater chance that the next sale price will be significantly different from the last one.

If we imagine the house sales as a stream of data points, that stream has gone from updating at a high frequency to going quiet for a while. While you’re waiting for the next update to come in, the previous update becomes less and less valuable – the data is getting stale.

Fiancial market data

Systems in financial markets often have streams of constantly updating data about the public market. These might be stock prices or foreign exchange rates, for example. Stock prices and foreign exchange rates can change very frequently and the speed at which a system reacts to new information can be a competitive advantage.

Having up-to-date prices is critical for offering quotes to customers. There’s a race condition involved. Imagine that a stock’s price is $100, and the system gives the customer a quote to buy at $100.50, so the system will make $0.50 profit (per share). The customer considers the quote, and by the time they accept it the market price has changed to $101. Now the system is selling a share that costs $101 for $100.50, and making a loss of $0.50 per share.

How should the system handle this? It’s got a few options, all of them bad:

  • Reject the customer’s order, and hope it works next time. If it happens a lot, users get frustrated, and the system looks bad.
  • Accept the order and lose money on the trade. For obvious reasons, this is not a popular option with financial organisations. It can even open the system up to deliberate abuse by sophisticated customers.
  • Charge a bigger profit margin so that this scenario is less likely to happen. Instead of charging $0.50 per share, charge $1.20. In the scenario above, the system would still get to make $0.20, which is better than losing $0.50. But now the system is uncompetitive and customers go elsewhere.

It boils down to lose money or lose customers.

Another solution is to ensure that the system is aware of and reacts to the latest market information as quickly as possible. That narrows the window of possibility for offering a quote based on a market price that turns out to be out of date.

But what should the system do when the market data feed goes stale?

A stale market data stream means we’ve got less confidence of the quotes the system is offering. The stream could come alive any second and jump to quite a different price, leaving the system having offered a quote it has to lose money on. Handling stale streams becomes important.

The advantage of a quick reaction to staleness

If the system recognises data becoming stale it can react dynamically. It could increase margins temporarily, or even refuse customer orders until data is flowing again.

One of the advantages of this is being able to specify a mechanism by which the system can gracefully cope with a problem that’s unpredictable, but likely to happen. Policies can be developed for reasonable responses to stale data, and a potential support incident or money losing scenario can be smoothly managed.

Code time – an example stock price ticker

I’ve knocked up a very basic simulation of a moving stock price of a fictional, very large tech company producing consumable items. I’ve called them Banana, Inc. Let’s say their stock price is typically around the $100 mark, but it’s very volatile because lots of Banana fans like to get on forums and discuss rumours about upcoming products.

To simulate a moving stock price, we need a couple of random events to happen:

  • Firstly, we want the stock price to move randomly, but in a believable manner – we don’t want it going from $100 to $2 to $500. I’m sure some Brownian motion or similar would be great here, but I’ll do something simpler.
  • Secondly, we want the price updates themselves to arrive at random times, sometimes many in a burst and sometimes none for a while.

I’ll say each price update will be somewhere within +/- 3% of the last price:

private decimal GetNextPrice(decimal currentPrice)
{
    // generate a percentage between -3% and +3%
    var percentage = (decimal)_randomGenerator.NextDouble() * 6 - 3;
    var newPrice = currentPrice * (1 + percentage / 100);
    return Decimal.Round(newPrice, 2);
}

To control the frequency of updates I’ll just pick a random delay up to 5 seconds from the last update:

private TimeSpan GetNextPriceDelay()
{
    return TimeSpan.FromSeconds(_randomGenerator.Next(100) / 20.0);
}

Now we tie these together into a stream of random price updates at random intervals using Observable.Generate:


    var startingPrice = 100m;
    var prices = Observable.Generate(startingPrice,
                                     _ => true,
                                     GetNextPrice,
                                     p => p,
                                     _ => GetNextPriceDelay());

Generate takes several arguments:

  1. The first value in the stream, starting at $100
  2. Whether to continue the stream – we have a never ending stream
  3. A function to calculate the next value based on the previous one
  4. A translator from the result of GetNextPrice to another object if we want – for example, we could create a PriceUpdate object with other information. But we’ll just keep the raw value.
  5. When to publish the update

With the magic of LinqPad, I can just .Dump() the stream to see the values. I’ve used .Timestamp() to make the random delays between each update clear:

void Main()
{
    var startingPrice = 100m;
    var prices = Observable.Generate(startingPrice,
                                     _ => true,
                                     GetNextPrice,
                                     p => p,
                                     _ => GetNextPriceDelay());
    prices.Timestamp().Dump();
}

And here’s the price stream running in Linqpad:

prices

Detecting staleness

So given the stream of data, the next step is to work out how to identify when the stream goes stale. I’ll use a staleness threshold of two seconds – if there’s not a new update on the stream after two seconds, the current price is labelled Stale.

This approach is very similar to the one used to detect when services stop heartbeating. It uses Throttle (aka debounce) to detect cases where the data stream hasn’t produced an update for two seconds. I’ve extended it slightly so that the stream has both the current stock price and a flag to indicate when it’s stale:

    var stale = prices.Throttle(TimeSpan.FromSeconds(2))
                      .Select(p => new {
                                        Price = p.Value,
                                        Stale = true,
                                        p.Timestamp
                                       });
    var unstale = prices.Select(p => new {
                                            Price = p.Value,
                                            Stale = false,
                                            p.Timestamp
                                         });

    var pricesWithStaleness = stale.Merge(unstale);

Here’s the output from Linqpad, note the price becoming stale when there are pauses in the stream, then becoming unstale when the next update arrives:

stale-prices3

 

As with the service heartbeating example, the code to detect the staleness is a lot smaller than the code to simulate a source stream. Just a few lines of code has pulled together a fundamental concept used in many financial systems.

A simple application of staleness logic

I mentioned that a system could react to stale data by increasing margins on the prices offered to customers until the stream is no longer stale. When the system’s data is not up to date, there’s a high chance that when the system goes to execute an order, the actual price will be different. So reacting to stale data by increasing margins gives safety in times of stale data but lets the system provide competitive margins in the majority of cases where the data is up to date.

In this example code, the size of the margin depends on whether the price is stale. During normal operation, the margin is 1%, but when the source data is stale, the margin becomes 5%. Using that margin, we can generate a quote that has a buy price and a sell price, similar to what you’d see on foreign exchange kiosk boards in the airport, or what you’d get looking at a stock through an online broker.

var margined = pricesWithStaleness.Select(p => new
{
    PriceDetails = p,
    Margin = p.Stale ? 0.05m : 0.01m,
})
.Select(p => new
{
    CorePrice = p.PriceDetails.Price,
    SellPrice = p.PriceDetails.Price * (1 - p.Margin),
    BuyPrice = p.PriceDetails.Price * (1 + p.Margin),
    p.PriceDetails.Stale,
    p.Margin
});

The final result

Here’s the output from Linqpad:

margined-stale-prices

Note that when a price becomes stale, the CorePrice stays the same, but the SellPrice and BuyPrice move further apart – the “spread” between the buy and sell price has become wider. When the price feed gets an update, the spread drops back down again.

Wrapping up

With quite a small amount of code, we’re able to get something that will react to market data becoming stale, modify the system behaviour, and then revert to normal when the market data starts flowing again.

Achieving this without using Reactive Extensions would involve a class full of state. It would have to keep track of the latest data, when that data arrived, the current healthy/stale state, and a staleness timer that gets restarted every time new data arrives. It also needs some code to manage the healthy/stale state as the data and timer events occur. It adds up to a lot more code and state to manage. The Rx based code has all these elements as well, but much of it is bundled up in higher level constructs – in this case Throttle, Select and Merge.

The final kicker is that you’d have to use real time to test it, but with Reactive Extensions, you can use virtual time. So your tests can run scenarios over seconds, minutes, hours or days, but execute in a flash.

4 thoughts on “Managing variable rate streams with Reactive Extensions

    • I don’t have access to Linqpad at the moment, as my pc is out of action. However, if you start a new Linqpad query of type C# Program, then paste in the Main, GetNextPrice and GetNextPriceDelay functions from the post, it should run as is.

      Then to get the extended functionality going, just paste the staleness and margin code blocks into the end of the Main method. Then you can add pricesWithStaleness.Dump() and margined.Dump() to output those two streams as well.

      Linqpad can output multiple streams simultaneously, so you’ll get the prices, the staleness information and the margined prices all updating at the same time. And it can make a useful output of almost any object you .Dump().

      Hope this helps!

      Like

  1. It is very great example when using Rx.NET. Thank you so much.
    But Throttle only emit one item if it reach the time passed into Throttle. So if the time between two up-to-date price is long, there is only one stale price emitted.

    Like

    • Yes, that is the intention 🙂 If you have prices of 1, 2, , 3 – do you need an update on the stream saying every second for an hour? The stream should produce new values when there’s a change – something to take action on. Sometimes it is useful to have a stream re-publishing the same state, for example for heartbeats. But in this case there’s not much use for repeated data.

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s