The Reactive Extensions provide a great way for composing asynchronous actions, particularly on Windows Phone where IO operations are asynchronous-only.

The Silverlight Unit Testing framework (including my CI-supporting port) supports asynchronous unit tests, but if you've attempted to use them with Rx you've likely run into a few problems.

Avoiding asynchronous tests

In reality, the simplest way to do an asynchronous test is to avoid making it asynchronous. There are two ways of synchronously testing asynchronous APIs that are based on Rx:

If you are writing an isolated unit test and using a "System.Reactive" build of the framework, you can use the Rx Testing Framework to simulate asynchrony without having to write an actual asynchronous test. If you are using the Windows Phone "baked" build (Microsoft.Phone.Reactive), however, the testing framework is not available to you.

The other way is to block until the first value is returned by using First. This will not work with a number of built in asynchronous actions (like HTTP requests) as they internally utilise the UI thread to return the value, so blocking the UI thread results in a hang.

Controlling the stack

On the surface, creating asynchronous tests seems fairly simple and involves three steps:

  1. Have your test class inherit Microsoft.Testing.Silverlight.WorkItemTest
  2. Mark your test method with [Asynchronous]
  3. Call base.TestComplete() when you are done

This works well until your tests fail, either from a failed assertion or from an unhandled Rx call to OnError. When that happens, the error will bubble to the surface and the application will terminate. The problem lies in the call stack, simplified below:

Microsoft.VisualStudio.TestTools.UnitTesting.Assert()
Your.Code.TestClass.TestMethod()
System.Windows.dll!System.Net.Browser.ClientHttpWebRequest.InvokeGetResponseCallback()
mscorlib.dll!System.Threading.ThreadPool.WorkItem.doWork()
mscorlib.dll!System.Threading.Timer.ring()

When the exception is thrown by the assertion, it reaches the top of the stack and the application is terminated. The code that throws the error needs to be called by the testing framework in order for it to register the failure.

In order to re-introduce the testing framework into the stack, the WorkItemTest base class provides the EnqueueCallback method:

void EnqueueCallback(Action testCallbackDelegate)

While we could call this method directly for each call to the observer, the Rx framework already provides a mechanism for controlling execution: schedulers. The IScheduler interface provides the means to schedule work, and the ObserveOn method marshals observer calls to a scheduler.

Here's an adapter implementation of IScheduler that marshals calls to a WorkItemTest class. It also cancels any pending actions when the test completes to avoid executing any asynchronous test case after the test has completed, since doing so can cause an exception to be thrown if the following test is sycnrhonous

public class WorkItemTestScheduler : IScheduler, IDisposable
{
    private readonly WorkItemTest test;
    private readonly CompositeDisposable scheduledActions = new CompositeDisposable();
    private bool disposed = false;
    public WorkItemTestScheduler(WorkItemTest test)
    {
        this.test = test;
        IDisposable completionSubscription =
            Observable.FromEvent<TestMethodCompletedEventArgs>(
                h => test.UnitTestHarness.TestMethodCompleted += h,
                h => test.UnitTestHarness.TestMethodCompleted -= h
            )
            .Take(1)
            .Subscribe(_ => Dispose());
        scheduledActions.Add(completionSubscription);
    }
    public DateTimeOffset Now
    {
        get { return DateTimeOffset.Now; }
    }
    
    public IDisposable Schedule(Action action, TimeSpan dueTime)
    {
        if (disposed)
        {
            return Disposable.Empty;
        }
        if (dueTime != TimeSpan.Zero)
        {
            throw new NotSupportedException("Only immediate schedules are supported");
        }
        var disposable = new BooleanDisposable();
        scheduledActions.Add(disposable);
        test.EnqueueCallback(() =>
        {
            if (!disposable.IsDisposed)
            {
                action();
            }
        });
        return Disposable.Create(() => scheduledActions.Remove(disposable));
    }
    
    public IDisposable Schedule(Action action)
    {
        return Schedule(action, TimeSpan.Zero);
    }
    
    public void Dispose()
    {
        scheduledActions.Dispose();
        disposed = true;
    }
}

To make things easier, we'll introduce an extension method that creates the scheduler for us:

public static class ObservableExtensions
{
    public static IObservable<T> ObserveOnTest<T>(this IObservable<T> source, WorkItemTest test)
    {
        return source
            .ObserveOn(new WorkItemTestScheduler(test));
    }
}

Writing an example test

Here is an example test that attempts to load content from a URL. If the assertion fails or if the server is down, the test framework will log the error and continue execution:

[TestClass]
public class SampleTestClass : WorkItemTest
{
    [Test]
    [Asynchronous]
    public void LoadUrlContent()
    {
        var request = WebRequest.CreateHttp("http://blog.richardszalay.com");
        Observable.FromAsyncPattern<WebResponse>(
            request.BeginGetResponse, request.EndGetResponse
            )()
            .ObserveOnTest(this)
            .Subscribe(response =>
            {
                Assert.IsTrue(response.ContentLength > 1024);
                TestComplete();
            });
    }
}