Observables with RxJS -

I don't get it | Part#1

Understanding RxJS - What!

Posted by Anas R Firdousi on June 27th, 2016
21 mins read

I have read 100 blog posts, watched 1000 tutorials and have been to tons of presentations, but I still don't get it! If you are having a hard time wrapping your head around the mantra of Reactive programming, specially using RxJS, this series is for you.

If you are new to the concept of Observables, I did a detailed post on basics of Observables in JavaScript. Feel free to read that post. That intro to observables is not a hard pre-req but good-to-read before this post.

Basic Concepts

Streams

Any continuous form of data can be a stream. A user can change input in a text box over time and can be considered a stream of data. A drag action and changing 'x' and 'y' coordinates of a mouse pointer over time can be considered as a stream as well. You can make streams out of anything. Classical data structures like array can be considered a stream if you pull in one value at a time from them. For this post we will consider that we have a stream of likes and dislikes on a post in your favourite social media website. People can 'like' or 'dislike' a post over time. Let's denote Like with 'L' and dislike with 'D'

Our stream might look like this
L.....D......L......D.......D......D.......D......L......D......L.......L....L.......D.......D . . . and this may go on and on

In this post, we will not be using utilities provided by RxJS. We will actually try to get the basics on which RxJS works and how it is written. What Observables actually are behind-the-scenes and why are they so powerful. Why people think RxJS is the Lodash for Streams and what's the foundation of RxJS.

Observer

An Observer is an interface. It, as the name suggests, observes something that can be observed, called an Observable. Observer defines what need to happen when a value is observed in a stream. Let's suppose a user came and Liked(L) the post. What should we do with this first value in stream? Should I convert this "L" into a ❤ sign. Should I just count the number of Ls in the stream? Should I discard all Ls? Should I just observe the first 10 occurrences of "L"? ... and there can be 1000 more things which you may want to do with a single value observed in a stream or number of values observed over time in a stream. Observer defines how you react to a value in a stream Let's define a Observer:

    
        let myObserver = {
            next: (val) => console.log("VALUE: ",val),
            error: (err) => console.log("ERROR: ",err),
            complete: () => console.log("END")
        };
    

So an "Observer" literally is an object with a next,error and complete handlers on it. You do not necessarily need to have all these handlers defined.

Observable

Technically, an observable is a function which accepts an observer and call handlers defined on the observer. (Not at all interesting - is it? )

    
        //Define Observable
        function myObservable(observer){

            //L : Like , D: Dislike
            observer.next("L");
            observer.next("D");
            observer.next("L");
            observer.next("D");
            observer.next("D");
            observer.complete();

        }

        //Call Observable with the defined Observer
        myObservable(myObserver);

        //Result
        VALUE: L
        VALUE: D
        VALUE: L
        VALUE: D
        VALUE: D
        END
    
Please open "Console" window while viewing examples on jsbin.com

Live example? Click here

That's my friend, alllll an observable is! Trust me! Maybe let's make it a bit interesting.

This was all without using RxJS at all. RxJS ships with lots of handy tools to do all of this. As we said, we will not go into details of using RxJS in this post but just for a taste, let's see one out of the hundreds of ways in which we can create an Observable with RxJS

    
    //Creating an Observable
    let reactions$ = Rx.Observable.of('L','D','L','D','D');
    
Calling the famous 'subscribe' method on our reactions$ observable will return a subscription.
    
    //Creating an Observable
    let reactions$ = Rx.Observable.of('L','D','L','D','D');

    //Subscribing to an Observable passing an Observer interface(myObserver)
    let subscription =  reactions$.subscribe(myObserver);
    
'$' notation is just a convention for Observable name, some folks think it's cool to differentiate an observable name like that.

Live example? Click here The '.of()' method is an RxJS utility to create an Observable out of a finite set of values. Let's not allow RxJS to take away our attention to details, at least not in this post.

Forget this RxJS example for now and concentrate on the simple example we saw initially. So what does any "Observables" represent? Observables actually represent a set of values overtime. In above example, we only have finite set of values. Let's make it infinite (by using setTimeout) and further study the behavior

Cancelling a Subscription/ Stop Reading a Stream

    
        //Define Observable
        function myObservable(observer){

            //L : Like , D: Dislike
            let intervalId = setInterval(()=>{
                                    observer.next("L");
                                    observer.next("D");
                                    observer.next("L");
                                    observer.next("D");
                                    observer.next("D");
                                    observer.complete();
                                },1000); // Spit these set of values on the stream every 1 sec

            //Observables are cancellable because they return 'clearTimeout' subscription
            return () => clearTimeout(intervalId);
        }

        const  unsubscribe = myObservable(myObserver);

        setTimeout(()=>{
            unsubscribe();
        },3000); //Cancel the subscription i.e stop reading the stream after 3sec
    

We love working examples right? Click here Yay!!! We learned that we can cancel subscription from any infinite stream whenever we want. Using setTimeout is just one way of doing it. In real world applications, you can unsubscribe from a stream whenever a particular condition is met. This is one of the differences that sets apart a 'Promise' from an 'Observable', the ability to cancel it. In real world applications, you may want to cancel a subscription whenever you are not interested in a particular stream anymore. Imagine you are viewing a 'LIVE' cast on Facebook or Periscope. Have you seen the thumbs and hearts flowing on the screen, showing real time emotions coming from users. That's a good example where reactive programming can be very handy. But when you move away from that particular view, you don't want to read the stream of likes/dislikes anymore. With RxJS and Observables, you can cancel such subscriptions.

Performing Operations on Observables

By now we have understood that Observables are simple functions which emit values that can observed/subscribed to, overtime. Once we start observing this stream of data coming from an observable source, we can do a lot with it. There are tons of utility operations already built in to RxJS. For the sake of learning, rather than using those ready-made RxJS utilities, we will try and write a few of them ourselves. Let's start with the most basic one, map

map()

How do we write a "map" operator for an observable stream? A "map" function maps/transforms/projects an observable stream into another observable stream. Let's try to write it

    

    //Including myObserver and myObservable once again for the sake of completeness here

    let myObserver = {

      next: (val) => console.log("VALUE: " + val),
      error: (err) => console.log("ERROR: "+ err),
      complete: ()=> console.log("END")
    };
          
    function myObservable(observer){
      observer.next("L");
      observer.next("D");
      observer.next("L");
      observer.next("D");
      observer.next("D");

      observer.complete();

    }

    //Map Operator : Maps one observable to another
    function map(source,project){
       return (observer) => {
          return source({
            next:(x) => observer.next(project(x)),
            error:(err) => observer.error(err),
            complete:() => observer.complete()
          });
       };
    }

    //Projection: Maps one value to another
    function projection(x){
       return x==="L"?"❤":"!";
    }

    //Passing Source Observable, Projection and Observer Interface
    var observableMapper = map(myObservable,projection);
    observableMapper(myObserver);

    //A better way to write above 2 statements in 1 line can be
    //map(myObservable,projection)(myObserver);
    

Click here for a working example of the above code.

Our map() function intakes a source observable and a projection: a function that converts/maps/projects an observable into something else. This is a very basic implementation of observable mappers. What's wrong here? Are these observables safe? What we mean by safe is, can the observable push a value even after we have called .complete() on it? For example:

    

    //Including myObserver and myObservable once again for the sake of completeness here

    let myObserver = {

      next: (val) => console.log("VALUE: " + val),
      error: (err) => console.log("ERROR: "+ err),
      complete: ()=> console.log("END")
    };
          
    function myObservable(observer){
      
      observer.next("L");
      observer.next("D");
      observer.next("L");
      observer.next("D");
      observer.next("D");
      
      observer.complete();
      
      observer.next("L"); // Will this work? Unfortunately YES!
    }
    

This simulates a situation where you are not interested in an Observable anymore but since we just passed in a plain observer to it, the observable stream is still readable (even after signaling completion)! How can we solve the problem? We need "Safe Observers". One of the thing most of the Observable libraries do including RxJS, is try to wrap things in a "Safe Observer".

The concept is simple. A "Safe Observer" is a class which takes any anonymous observer and returns a "safe" implementation of it. A "safe" implementation makes sure that the observer should not be able to fetch/observe (call .next() ) values from the observable stream after the done/complete signal. We need have a factory(kind of) class which can generate "Safe Observers" for us.

    
    //Wrapper class to convert a anonymous observer into a safe observer

    class SafeObserver{
      constructor(observer){
        this.observer = observer;
      }

      next(x){
        if(!this.unSubscribe){
          this.observer.next(x);
        }
      }

      error(err){
        if(!this.unSubscribe){
          this.unSubscribe();
          this.observer.error(err);
        }
      }

      complete(){
        if(!this.unSubscribe){
          this.unSubscribe();
          this.observer.complete();
        }
      }

      unSubscribe(){
        this.unSubscribed = true;
      }

    }

    // Our anonymous Observer (same as used in previous examples)

    let myObserver = {

      next: (val) => console.log("VALUE: " + val),
      error: (err) => console.log("ERROR: "+ err),
      complete: ()=> console.log("END")
    };


    //Now it is Observables job to wrap an Observer and make sure you
    // can't call .next() after .complete()
    //To do this, it uses the above SafeObserver wrapper

    function myObservable(observer){
      
      observer = new SafeObserver(observer);

      observer.next("L");
      observer.next("D");
      observer.next("L");
      observer.next("D");
      observer.next("D");
      
      observer.complete();
      
      observer.next("L"); // Will this work? NOT ANY MORE ! We are using Safe Observers!
    }
    

Click here for live example.

Another thing that you would always want to do is have some tear down logic whenever we call .complete() on an observer. Let's modify 2 thing, the unSubscribe() method in the SafeObserver and myObservable as well. Let's look at these two changes only:

    
    //Wrapper class to convert a anonymous observer into a safe observer

    class SafeObserver{
    
      . . .

      unSubscribe(){
        this.unSubscribed = true;
        
        if(this.teardown){ // Check if any tear down logic exists
          this.teardown();
        }
      }

    }

   .  .   .

    function myObservable(observer){
      
      observer = new SafeObserver(observer);

      let id = setTimeout(()=> {

        observer.next("L");
        observer.next("D");
        observer.next("L");
        observer.next("D");
        observer.next("D");
        observer.complete();
        observer.next("L"); // Will this work? NOT ANY MORE ! We are using Safe Observers!

      },1000);
      
      let teardown = () => clearTimeout(id);

      observer.teardown = teardown; // Extending Observer to add teardown logic

      return teardown;
     
    }
    

All what we are doing here is extending the Observer by adding a teardown function as we set clearTimeout function to the 'teardown' property. Here is a working example!

Do you see how much work do we have to do every single time you have create an Observable. That's gross! At the start of this article, I said Observables are just functions which is not cent percent correct. You can always write observables as function but then you have to do a lot every time as we just saw in our examples. So what do we do? We write a Observable class Similar to how we have a SafeObserver class, can we not have a class which creates and returns an Observable and does all the hard work. Let's do it!

    
    //Wrapper class to convert a anonymous observer into a safe observer

    class SafeObserver{
      constructor(observer){
        this.observer = observer;
      }

      next(x){
        if(!this.unSubscribe){
          this.observer.next(x);
        }
      }

      error(err){
        if(!this.unSubscribe){
          this.unSubscribe();
          this.observer.error(err);
        }
      }

      complete(){
        if(!this.unSubscribe){
          this.unSubscribe();
          this.observer.complete();
        }
      }

      unSubscribe(){
        this.unSubscribed = true;
      }

    }

    // Our anonymous Observer (same as used in previous examples)

    let myObserver = {

      next: (val) => console.log("VALUE: " + val),
      error: (err) => console.log("ERROR: "+ err),
      complete: ()=> console.log("END")
    };


    class Observable{
      constructor(_subscribe){
        this._subscribe = _subscribe;
      }

      subscribe(observer){
        let safeObserver = new SafeObserver(observer);
        safeObserver.teardown = this._subscribe(safeObserver);
        return {
          unsubscribe(){
            safeObserver.teardown();
          }
        }
      }

    }


    //Now it is Observables job to wrap an Observer and make sure you
    // can't call .next() after .complete()
    //To do this, it uses the above SafeObserver wrapper

    let myObservable = new Observable((observer) => {
      
       let id = setTimeout(()=> {

        observer.next("L");
        observer.next("D");
        observer.next("L");
        observer.next("D");
        observer.next("D");
        observer.complete();
        observer.next("L"); // Will this work? NOT ANY MORE ! We are using Safe Observers!

      },1000);
      
      return () => clearTimeout(id);

    }

    let subscription = myObservable.subscribe(myObserver);

    //To unsubscribe, you can call 
    //subscription.unsubscribe();


    

Work example here

Things to notice:

  • We now have a Observable class that returns an observable using safe observers
  • Observable class has a subscribe method on it
  • RxJS internally returns an object (instead of returning a function) out of this .subscribe() method. The returned object has a unsubscribe() method in it. This way it is more cleaner to unsubscribe,
        
        . . .
    
        //Cleaner
        let subscription = myObservable.subscribe(myObserver);
        subscription.unsubscribe();
    
    
        //Rather than doing 
        //let unsubscription = myObservable.subscribe(myObserver);
        //unsubscription();
        
    
  • Returning an object with an unsubscribe method in it rather than returning a function directly makes it much more explicit when you want to unsubscribe it.

One last thing we would want to do in this post is move our .map() function to our Observable class. Let's do that!

    
     class Observable{
      constructor(_subscribe){
        this._subscribe = _subscribe;
      }

      subscribe(observer){
        let safeObserver = new SafeObserver(observer);
        safeObserver.teardown = this._subscribe(safeObserver);
        return safeObserver.teardown;
        
      }
   
      map(project){
         return new Observable((observer) => {
            return this.subscribe({
              next:(x) => observer.next(project(x)),
              error:(err) => observer.error(err),
              complete:() => observer.complete()
            });
         });
      }

    }
          
    let myObservable = new Observable((observer) => {

      //Uncomment this line to use un-safe/raw observer
      observer = new SafeObserver(observer); 

      let id = setTimeout(()=>{
        
        observer.next("L");
        observer.next("D");
        observer.next("L");
        observer.next("D");
        observer.next("D");
        observer.complete();
        observer.next("D"); // This will not work due to Safe Observer implementation
        
      },1000);
    
      return () => clearTimeout(id);
    
    });

    //Using the .map() function over the observable stream.    
    let subscription = myObservable.map((x)=>x+"#").map((x)=>x+ "!").subscribe(myObserver);
  

Here is a live example

Let's talk about the map() function. There are only 2 things you should really care about and they are straightforward.

  • A map() function projects an observable stream to whatever you like and returns a new stream. It's important to notice that the underlying stream remains the same so you can say that .map() implementation is pure because it does produce any side effects, it does not change the original stream rather return a new stream of observables.
      
        map(project){
           return new Observable((observer) => {
              . . .
           });
        }
    
    
  • It calls the projection you pass to it whenever you do a next on the original stream.
        
          map(project){
            . . .
                  next:(x) => observer.next(project(x)),
            . . .   
          }
        
    

This is all what Observables look like from the inside. They look a bit more complicated in RxJS 5 because they are optimized for performance. Having said that, you don't have to worry about any of these. As we promised at the start,

In this post, we will not be using utilities provided by RxJS

All what we have done in this post is already baked in for you in RxJS. The bottom line we learned here is Observers are the one which does most of the work. Stay tuned to this series as we will move on RxJS in full swing. In the next post, we will be improving on our Observer implementation. We will also be doing details on Hot vs Cold observables and much more.

Thanks to @BenLesh for all the amazing talks he has given on reactive programming with RxJS

Feel free to leave a comment, question, suggestion and corrections. Until next time, Happy learning!