Tuesday, June 7, 2016

Introduction to Rx Java.


It took me lots of reading to get my head around Rx Java, so I decided to write something to simplify it for beginners. Rx Java is a framework or an extention of the Java language that introduces a new way of designing applications called reactive programming. Reactive programming involves a number of principles, but for simplicity sake let's just say that it is about developing applications that reacts quickly and effectively to changes in state. Rx Java can also be defined as advanced form of the gang of four Observer pattern. Rx Java was first developed for internal use of Netflix, and was then open sourced. There are two very important notions to retain while working with Rx Java: Observables, and Subscribers.

Observables are the main concept of Rx Java. They are entites that can be observed by a number of subscribers, and that notifies all of them when a change in state occurs.

A subscriber receives updates from Observables and takes actions. On subsription, a subscriber defines three things: what it will do with the data, what it will do in case of an error, and what he will do on completion.

Let's go through a first example:

List<Integer> list = IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());

 Observable observable = Observable.from(list);

 observable.subscribe(new Observer<Integer>(){
   public void onCompleted() {
    System.out.println("Completed");
   }

   public void onError(Throwable arg0) {
    // Do something on error
   }

   public void onNext(Integer arg0)   {
    System.out.println(arg0 + " ");
   }
   });
Result:

In this example, we created an observable from an iterable. The observable provided the values to the subscriber as a stream. The observable printed each value on reception and printed the word "Completed" on completion.  

Let's explore another way of creating Observables. Suppose we have a service that monitors traffic in an intersection, and sends  updates to a terminal station as the state of traffic changes, we can create our observable using the create() method, and defining what we send to our observers inside the call() method. 

Observable<String> trafficStream = Observable.create(new OnSubscribe<String>() {
  @Override
   public void call(Subscriber<String> sub) {
     //Simulation of a stream  
      while(true){
         sub.onNext(trafficState[random.nextInt(3)]);
      } 
   }
  })

Notice that we used the onNext method to send the data to our subscriber.

Once we subscribe to our observer :

trafficStream.subscribe(new Observer<String>(){
   @Override
   public void onCompleted() {
    
   }
   @Override
   public void onError(Throwable arg0) {
    
   }
   @Override
   public void onNext(String arg0) {
    System.out.println("Traffic state : " + arg0);
   } 
    
   });
We get:

Composing Observers

There are situation where developpers would need to compose several observers into one to be able to subscribe to all of them at once. Rx Java provides several ways of composing observables.

For composing observables of the same type, Rx Java provides the merge() method:

Observable.merge(observable1, observable2)
   .subscribe(new Observer<Integer>(){
   @Override
   public void onCompleted() {
    // Merged completion action
    
   }

   @Override
   public void onError(Throwable arg0) {
    // Merged error handling
    
   }

   @Override
   public void onNext(Integer arg0) {
    // Merged completion action
    System.out.println(arg0);
   }
    
    
   });

For composing observables of different types, Rx Java provides the zip() method:

Observable.zip(observable, observable2, new Func2<Integer, Double, Map<Integer, Double>>(){
    @Override
    public Map<Integer, Double> call(Integer arg0, Double arg1) {
     // TODO Auto-generated method stub
     Map<Integer, Double> map = new HashMap<Integer, Double>();
     map.put(arg0, arg1);
     return map;
    }
     
     
    }).subscribe(new Observer<Integer, Double>(){

    @Override
    public void onCompleted() {
     // TODO Auto-generated method stub
     
    }

    @Override
    public void onError(Throwable arg0) {
     // TODO Auto-generated method stub
     
    }

    @Override
    public void onNext(Map<Integer, Double> arg0) {
     // TODO Auto-generated method stub
     arg0.forEach((k, v) -> System.out.println(" " + k + " " + v ));
     
    }
     
    });

Interesting videos that explains core concepts of reactive programming:
https://www.youtube.com/watch?v=dwP1TNXE6fc
https://www.youtube.com/watch?v=7D9QfMj_KwI

No comments:

Post a Comment