Tuesday, 29 August 2017

Reactive Programming

Reactive programming is an asynchronous programming paradigm concerned with data streams.
In Reactive programming we define data source and data consumer and establish a connection between source and consumer than library take care of data flow from source to consumer.

Using RxJava api we can define the source and consumer as you can see in below code.

import rx.Observable;
import rx.Subscriber;

public class Test {

            public static void main(String[] args) {
                        Observable<Integer> source = Observable.range(1, 10);
                        Subscriber<Integer> consumer = new Subscriber<Integer>() {
                                    public void onNext(Integer number) {
                                                System.out.println(number);
                                    }
                                    public void onError(Throwable e) {
                                                System.out.println("error " + e.getLocalizedMessage());
                                    }
                                    public void onCompleted() {
                                                System.out.println("completed");
                                    }
                        };
                        source.subscribe(consumer);
            }
}

Output of above code:-
1
2
3
4
5
6
7
8
9
10
completed


Please refer below link for detail understanding of reactive programming.

Reactive Stream JAVA 9-

Reactive stream is asynchronous processing of data stream.
When we discuss about data processing than there is at least one data producer and one data consumer, data flow from producer to consumer, with a data pipeline in real time , and there is a processor to transform the data.
Java 9 Reactive stream works on same concept. It have flow package, contain Publisher, Subscriber and Processor refer below-
java.util.concurrent.Flow.Publisher
java.util.concurrent.Flow.Subscriber
java.util.concurrent.Flow.Processor

Refer below code snippet of simple implementation of Stream API
package features.java9;

import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TestDataFlow  {

      Publisher<Integer> publisher = new Publisher<Integer>() {

            public void subscribe(Subscriber<? super Integer> subscriber) {
                  int count = 0;
                  while(count < 10) {
                        subscriber.onNext(++count);
                  }
            }
      };
     
      static Subscriber<Integer> subscriber = new Subscriber<Integer>() {
           
            public void onSubscribe(Subscription subscription) {
            }
           
            public void onNext(Integer item) {
                  System.out.println(item);
            }
           
            public void onError(Throwable throwable) {
            }
           
            public void onComplete() {
            }
      };
     
      public static void main(String args[]) {
            new TestDataFlow().publisher.subscribe(subscriber);
      }

}

package features.java9;

import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TestPublisher implements Publisher<Long> {

      private static int publisherCount = 0;
      public void subscribe(Subscriber<? super Long> subscriber) {
            while(publisherCount < 10) {
                  subscriber.onNext(System.nanoTime());
            }
            subscriber.onComplete();
      }

      public static void main(String[] args) {
            new TestPublisher().subscribe(new Subscriber<>() {
                  public void onSubscribe(Subscription subscription) {
                  }
                  public void onNext(Long item) {
                        try {
                              Thread.sleep(1000);
                              System.out.println("randome value = " + item);
                        } catch (InterruptedException e) {
                        }
                        publisherCount++;
                  }
                  public void onError(Throwable throwable) {
                        System.out.println("error");
                  }
                  public void onComplete() {
                        System.out.println("done");
                  }
            });
      }
}



3 comments: