Concurrent Web Service Calls from Windows Phone 7 using Rx

Introduction

I’m pretty sure that everybody who works with Silverlight, WPF or WP7 have come across the issue when you just want to call a couple of web services independently from each other but wait for all the results and build a complex object from those results. Well right now I’ve came across this issue once more. I’m building a WP7 application that calls the Facebook API and at the same time I have to call my own services on Windows Azure. I definitely don’t want to communicate with Facebook on the server side since that would be expensive in every possible way. (Performance, usage, price, etc.) Not the mention that I would have to impersonate my user and call the Facebook API from the server side, which would definitely be a security issue. So I have to deal with this on the phone.

I could definitely write the code to synchronize these calls, but after a while it’d get kind of impossible to handle and understand. I could of course do some kind of an abstraction, build an infrastructure around it, but this would just give me a headache and cause additional costs (my time).

Here is some good news. We have a technology that was basically born to solve these kind of issues. The technology is called Reactive Extension (Rx) and it is available for Windows Phone 7. The bad news is that it’s kind of hard to understand, many people tried and to tell you the truth a hell of lot of them just failed. The problem with this technology is that it needs a different thinking, a whole new mind, a switch in your brain to flip. So here is the deal, I’m not going to try to explain Rx. There are a lot of HOLs, sessions, tutorials, blog posts to learn about it. What I’m going to do, is to show you only what you need here to solve this issue, and make you understand only this little part. (Which is IMHO maybe the most useful part clip_image002[4]) If you want to inspect Rx a little bit more you can start with Mike Taulty’s blog and video post: Reactive Extensions “for the rest of us”

The Problem Simplified

So the model of our problem: I have a WP7 client implemented with MVVM. I need to make asynchronous calls to the server side. I need to call two entirely different web service operations. After both calls are completed I need to process the data received from the two service calls. This is it, let’s see what we can do.

Very short intro to Rx – The Mindset

Now imagine we have a couple of objects. These objects have events. Imagine that I create a collection and add these events to this collection. Now I’m waiting for these events to fire. What I need is someone to tell me if all the events in this collection have already fired. Also maybe I want to be notified about every single event that was fired at the time of firing. Still with me, right? Now lets flip that switch in your brain.

What do we have now? We have a collection of events, (or data) and information (one of them is fired, all of them are fired, a certain subset was fired, etc.) that is being pushed towards us! So instead of a collection think of it as a stream. A stream of data, a stream of events. You subscribe to a stream of events/data, and every single time something happens, or data is available, notification (information) is being pushed towards you! All you have to do is to deal with this information.

Very short intro to Rx- Observing Objects

Now this is the part where I’m not going to dive into how Rx works. But what you need to understand is that if you want to observe an object (event, data, collection, whatever) you have to make it observable!

//Two different service to call concurrently
TestService1Client client1 = new TestService1Client();
TestService2Client client2 = new TestService2Client();

//Observe this objects for specific events
var observedServiceCall1 =
    Observable.FromEvent<GetDataCompletedEventArgs>(client1, "GetDataCompleted");
var observedServiceCall2 =
    Observable.FromEvent<GetIntDataCompletedEventArgs>(client2, "GetIntDataCompleted");

Take a look at what we did! We have these two service client proxies. Both proxies have a “Completed” event. (GetDataCompleted and GetIntDataCompleted). Using the Observable.FromEvent<TEventArgs>(source, eventName) method we make observable objects from these events! Now we are ready to subscribe to them!

observedServiceCall1.Subscribe(OnNext);

OnNext deals with the data whenever the GetDataCompleted event fires. But this is not what I want. What I want is to handle these events as a group and I want to be notified only when both events have fired!

Very short intro to Rx – Observing multiple events as a single unit

So I have to tell the Rx framework that I need to observe these events together!

//Observe on background thread
//Observe both objects simultanously
var combinedCall = observedServiceCall1.ObserveOn(Scheduler.ThreadPool)
    .Zip(observedServiceCall2, (left, right) =>
{
    //save results from the two calls to a complex object
    return new CombinedCallResult
    {
        Call1Result = left.EventArgs.Result,
        Call2Result = right.EventArgs.Result
    };
});

This is exactly what I need! The key operator is the Zip operator! This is the one that allows me to combine these two observable object. Now I’ve created this aggregated observable object (combinedCall). Let me explain this code a little:

§  ObserveOn(Scheduler.ThreadPool) – I want to do the observing on the background thread. As a matter of fact I want to do everything on a background thread, so my UI can stay responsive! Scheduler.ThreadPool just gives me a thread from the ThreadPool that I can work with.

§  Zip(rightSource, selector) – Zip will help me create a new observable that will now observe observedServiceCall1 AND observedServiceCall2! Selector is basically a function that will deal with the individual results. So when I get the result I wrap it into this custom class.

§  (left,right) – left stands for the observed event of observedServiceCall1, right stand for the observed event of observedServiceCall2

Very short intro to Rx – Subscription

So now we have the combinedCall observable that helps us observe two events! Now the last thing we need is to subscribe to any info that may be pushed to us!

//Subscription: Both service calls have been completed
combinedCall.ObserveOn(Scheduler.Dispatcher).Subscribe((result) =>
{
        AggregatedData = "WebService Call 1: " + result.Call1Result +
            "\nWebService Call 2: " + result.Call2Result;
});

Now we are subscribing to the results! When data is pushed to the Observer, this code will run! We‘re just updating a property in the ViewModel. But this one is tricky! Because AggregatedData is bound to the UI! It actually raises the PropertyChanged event. And a couple of minutes ago we agreed to do the observing on a background thread. Now this would result in an invalid cross-thread exception. This is something we don’t want, so we tell Rx, that we will need to process the data on the UI Dispatcher! So now the code inside the lambda expression will be running on the UI thread.

All the code together

Now this might be a little confusing so take a moment and read through all the code needed to solve this problem.

//Two different service to call concurrently
TestService1Client client1 = new TestService1Client();
TestService2Client client2 = new TestService2Client();

//Observe this objects for specific events
var observedServiceCall1 =
    Observable.FromEvent<GetDataCompletedEventArgs>(client1, "GetDataCompleted");
var observedServiceCall2 =
    Observable.FromEvent<GetIntDataCompletedEventArgs>(client2, "GetIntDataCompleted");

//Observe on background thread
//Observe both objects simultanously
var combinedCall = observedServiceCall1.ObserveOn(Scheduler.ThreadPool)
    .Zip(observedServiceCall2, (left, right) =>
{
    //save results from the two calls to a complex object
    return new CombinedCallResult
    {
        Call1Result = left.EventArgs.Result,
        Call2Result = right.EventArgs.Result
    };
});

//Subscription: Both service calls have been completed
combinedCall.ObserveOn(Scheduler.Dispatcher).Subscribe((result) =>
{
        AggregatedData = "WebService Call 1: " + result.Call1Result +
            "\nWebService Call 2: " + result.Call2Result;
});

//Issue the calls
client1.GetDataAsync();
client2.GetIntDataAsync();

Threading in pictures

clip_image009[4]

As you can see on the picture above, we are calling the services asynchronously, and observing happens on a worker thread!

Now you just have to free your mind and start using Rx to make your life a whole lot easier! You want 3-4-5 calls running at the same time? You want to run them in a certain order? (chain of call?) You just got a very compact and easy to read and maintainable solution to do all the asynchronous code management you need.

If you want the source code for this sample, you can get it here: RxWebServicesDemo