C# vNext: Rx == async IEnumerable

Rx and TPL are so strikingly similar it’s only mind-boggling that they are boxed as two separate frameworks with very little connecting in between. IObservable is practically the plural form of Task.

While TPL has been given some serious love lately in the form of async/await syntax in the upcoming C#5, Rx has not received any plural equivalent of such syntax. Despite being positioned as a mathematical dual of IEnumerable since its dawn, IObservable is still not (yet) paired with any of the syntatix features of IEnumerable. Yes I’m talking about async version of foreach and yield.

This post is basically just me daydreaming about my own hypothetical C#6, where TPL (async/await) meets IEnumerable (foreach/yield), and the outcome of this fusion is of course IObservable.

Rx and TPL Today

Before we look ahead to the future, let’s just first see what’s possible TODAY. The line separating TPL and Rx is a very thin one, and it makes perfect sense for them to be merged together into a single asynchrony concept. Basically, what IObservable is to task, is what IEnumerable is to Object. In simple words, IObservable is a stream of Tasks.

IObservable -> Task

The first implication of this realization is that IObservable should be able to return individual Task objects, for instance using methods like Single(), Last(), First(), ElementAt(), and so on, which should return instances of Tasks. Strangely, the current IObservable API does not seem to reflect that. Those methods I mentioned are currently blocking methods that return objects directly (to simulate IEnumerable), which does not make much sense given the asynchronous nature of Rx (and the direction of .net 4.5 in general).

public string GetMessage()
{
   var message = messagesObservable.FirstOrDefault(); // blocking wait
   return message.Body;
}

I’ve never understood why Rx is implemented in that manner instead of looking to its close TPL cousin that is the perfect fit for Rx. The SingleOrDefault() method should really return an awaitable reference, especially in .Net 4.5.

public async Task<string> GetMessageAsync()
{
   var message = await messagesObservable.FirstOrDefaultTpl(); // non-blocking
   return message.Body; // continuation
}

FirstOrDefaultTpl() method can be implemented pretty easily using today’s .net framework.

public static Task<T> FirstOrDefaultTpl<T>(this IObservable<T> observable)
{
   var tcs = new TaskCompletionSource<T>();
   observable.Take(1).DefaultIfEmpty(default(T)).Subscribe(tcs.SetResult, tcs.SetException);
   return tcs.Task;
}

(And similarly for other operations: First(), Last(), LastOrDefault(), ElementAt(), ElementAtDefault()).

Task -> IObservable

On the flip side of the coin, if IObservable is a stream of Tasks, you should also be able to compose an IObservable from a number of Task instances. For instance:

public IObservable<BookPrice> GetBookPrices(ISBN isbn)
{
   return new Task<BookPrice>[] {
       LookupAmazonAsync(isbn),
       LookupBordersAync(isbn),
       LookupBarnesNobleAsync(isbn),
       LookupBestBuyAsync(isbn)
   }.AsObservable();
}

The last line is an extension-method AsObservable() that we’ll now be implementing to convert an array of Tasks into an IObservable:

public static IObservable<T> AsObservable<T>(this Task<T>[] tasks)
{
   return Observable.Create<T>(observer=>
      {
         var cancelSource = new CancellationTokenSource();
         Task.Factory.ContinueWhenAll(tasks.Select(task=> task.ContinueWith(t=>
         {
            if(t.IsFaulted)
               observer.OnError(t.Exception);
            else if(t.IsCompleted)
               observer.OnNext(t.Result);
            })).ToArray(), _ => observer.OnCompleted(), cancelSource.Token);

            return cancelSource.Cancel;
      });
}

(PS: in practice, you may want to make an overload that takes a CancellationTokenSource)

In the IEnumerable world, this is akin to converting a bounded array of objects into an IEnumerable of a known length. However, to make an IEnumerable that returns an unbounded stream of objects, things are a bit different, but luckily .net framework provides the yield keyword for that since 2.0.

Back to the IObservable world, to make an IObservable from an unbounded series of asynchronous Tasks, we need a similar yield construct, which does not actually exist in the current (and any foreseeable future) .net frameworks, but does exist in my hypothetical C#6.

Rx and TPL in C#6

The year is 2015, the Mayans turned out to be a load of bollocks. I’m here sitting before my 64-core laptop, swiftly fired up my VisualStudio 2014 and wrote the following method:

public async IObservable<Temperature> PollTemperatureChanges() // C#6: async IObservable method
{
   Temperature lastTemp;
   while(true)
   {
      var nextWeather = await GetWeatherAsync();  // await within an async IObservable method
      if(nextWeather.Temperature != lastTemp)
      {
         yield return currentWeather.Temperature;  // C#6: IObservable yield
         lastWeather = currentWeather.Temperature;
      }

      await Task.Delay(TimeSpan.FromSeconds(5));  // await within an async IObservable method
   }
}

In the old C#, that method would have returned an IEnumerable that produces an infinite stream of weather reports (produced by the yield syntax). But here on C#6, I just wrote one that returns an IObservable of exactly the same behavior, except asynchronously. (Afterall, IObservable is the mathematical dual of IEnumerable)

I then proceed to write the next method, still on my VisualStudio 2014:

public async Task<Foo> DoSomethingWithTemperatures()
{
   foreach await(var temp in PollTemperatureChanges()) // C#6: foreach await(IObservable)
   {
      // do something
      if(blah)
         break;      // c#6: foreach await unsubscribe

      if(otherBlah)
         return someFoo; // c#6: foreach await return
   }

   // do something afterwards...   -> foreach await continuation
   return otherFoo;
}

This is year 2015, remember? Most methods are now declared as async (thanks to its viral characteristic), sync methods have become almost obsolete. IEnumerable is on the way out, being replaced by IObservable. And all syntaxes that were applicable to IEnumerable are now also supported for IObservable.

Now back to today’s world (or nearer future anyway): in C#5 (with its lack of IObservable support), those same two simple methods above would have been written hideously as the following:

public IObservable<Temperature> PollTemperatureChanges() // C#6: async IObservable method
{
   return Observable.Create<int>(observer =>
   {
      var isCancelled = false;
      ((Action)async delegate
         {
            try
            {
               var lastWeather = 0;
               while(true)
               {
                   var nextWeather = await GetWeatherAsync();
                   if (nextWeather.Temperature != lastTemp)
                   {
                      observer.OnNext(weather.Temperature);
                      if (isCancelled)
                         return;

                      lastWeather = currentWeather.Temperature;
                    }

                    await Task.Delay(TimeSpan.FromSeconds(5));
                 }
              }
              catch(Exception e)
              {
                 observer.OnError(e);
              }
              finally
              {
                 observer.OnCompleted();
              }
          })();

       return () => isCancelled = true;
   });
}

public async Task<Foo> DoSomethingWithTemperatures()
{
   var tcs = new TaskCompletionSource<int>();
   PollTemperatureChanges().TakeWhile(temp =>
      {
         // do something
         if (blah)
            return false;
         if(otherBlah)
         {
            tcs.SetResult(someFoo);
            return false;
         }
         return true;
      }).Subscribe(
         onNext: _ => { },
         onError: tcs.SetException,
         onCompleted: () => {
            if(!tcs.Task.IsCompleted)
            {
               // do something afterwards...
               tcs.SetResult(otherFoo);
            }
         });

   return tcs.Task;
}

That wasn’t pretty. Those methods actually would have been a total train wreck in the current C#4, but thanks to (the nearer future) C#5 who has helped massively in reducing the complexity. But still, due to the absence of IObservable support (at least until C#6), the C#5 code above still looks far uglier and harder to write than it needs to be.
Just like TPL before C#5, your nested lambda will only get deeper and deeper, especially when you have multiple observables. (Anyone who works with TPL on C#4 knows how to read code diagonally from top-left to bottom-right, thanks to nested lambdas).

Note that those were even only a simplified version of the actual code. They have not, for example, done exceptions propagation properly. But more importantly, they have not handled Task cancellations. Currently Rx and TPL have 2 different patterns in handling cancellation that are unfortunately not compatible with each other, and are therefore not seamlessly connectable.

CancellationToken vs Unsubscribe

This is where TPL and Rx are incompatible:

  • In TPL, the pattern to handle cancellation is by passing a token into Task operations.
  • In contrast, IObservable subscriptions return cancellation IDisposable-s to the client. The equivalent of this would be to have a Cancel() method on the Task class. But it doesn’t. This difference makes it hard to seamlessly integrate the two APIs together.

Consider our first C#6 PollTemperatureChanges() method above. When the IObservable is unsubscribed from, the GetWeather() task it is awaiting for needs to receive a cancellation signal. Unfortunately since there is no Cancel() method on the Task class (nor on its Awaiter), this is not possible to be syntactically inferred. A syntatix workaround is required to get around this (albeit manually), which is by introducing a special keyword for IObservabler unsubscription.

Using this new C#6’s try/unsubscribe syntax, we can modify our C#6 above to support TPL cancellation.

public async IObservable<Temperature> PollTemperatureChanges()
{
   Temperature lastTemp;
   var cancelToken = new CancellationTokenSource();
   try
   {
      while(true)
      {
         var nextWeather = await GetWeatherAsync(cancelToken.Token);
         if(nextWeather.Temperature != lastTemp)
         {
            yield return currentWeather.Temperature;
            lastWeather = currentWeather.Temperature;
         }

         await Task.Delay(TimeSpan.FromSeconds(5));
      }
   }
   unsubscribe   // our new C#6 'unsubscribe' block
   {
      cancelToken.Cancel();
   }
}

TL;DR

Just in case you’ve missed the point, all references about C#6 in this post are purely hypothetical; though it did not take a leap of imagination. The similarities between TPL, IObservable, and IEnumerable just make for a very compelling case to fuse async/await and foreach/yield keywords together, and the result is a futuristic IObservable syntax that is elegantly consistent with those of IEnumerable and TPL. Some extention-methods can already be implemented today to bridge IObservable and TPL async/await, but a richer native syntatix support for IObservable in future C# is still something to be awaited (if you excuse the pun).