Throttle Helper

2014-12-09 6:41 AM

I've been writing a lot of revision code recently which ends up spamming our service bus with events. Since I need to dial back the volume of messages that get sent per time chunk, I've gone through a few revisions of blocking - from Task.Delay(250).Wait(); to more obfuscated methods.

But I think this is my favorite so far - it's lightweight and pretty self-explanatory. The goal is to have a loop that will execute no more frequently than the given number of millis. So, if the target is 250ms and the body of the loop takes 300ms to execute, there's no waiting to be done. If the body takes 50ms, then we should wait for 200ms.

With some not-quite-clever use of Tasks we can accomplish this pretty easily:

public class Throttle
{
    private Task _control;
    private readonly int _minimumMillis;

    public Throttle(int millis)
    {
        _minimumMillis = millis;
    }

    public void Block()
    {
        // Only block if it's not the first time
        if(_control != null)
        {
            _control.Wait();
        }
        _control = Task.Delay(_minimumMillis);
    }
}

The only slightly tricky thing is in the _control = Task.Delay(_minimumMillis); call. We start a Task whose sole function is to wait for _minimumMillis. This waiting begins asynchronously while control is returned to the caller. Then, when Block() gets called again, that Task gets attached to after having waited for some amount of time already. This allows a smarter throttling so you can pump out as near to your desired throughput as possible.

This allows us to write a throttled loop, a la:

var throttle = new Throttle(1000);
for(var i = 0; i < 100; ++i)
{
    // Won't execute more frequently than once per second
    Console.WriteLine(i);
    throttle.Block();
}

But what if we want it to be just a little more magical? We can extension-method it right into the IEnumerable that drives a foreach loop!

public static IEnumerable<T> Throttled<T>(this IEnumerable<T> source, int millis)
{
    var throttle = new Throttle(millis);
    foreach(var item in source)
    {
        throttle.Block();
        yield return item;
    }
}

And, voila!

// Could be any IEnumerable
var source = Enumerable.Range(0, 10);
foreach(var i in source.Throttled(250))
{
    // Won't execute more frequently than 4/sec
    Console.WriteLine(i);
}