Reactive programming is an
asynchronous programming paradigm concerned with data streams.
In Reactive programming we
define data source and data consumer and establish a connection between source
and consumer than library take care of data flow from source to consumer.
Using RxJava api we can
define the source and consumer as you can see in below code.
import rx.Observable;
import rx.Subscriber;
public class Test {
public static void
main(String[] args) {
Observable<Integer>
source = Observable.range(1, 10);
Subscriber<Integer>
consumer = new Subscriber<Integer>() {
public
void onNext(Integer number) {
System.out.println(number);
}
public
void onError(Throwable e) {
System.out.println("error
" + e.getLocalizedMessage());
}
public
void onCompleted() {
System.out.println("completed");
}
};
source.subscribe(consumer);
}
}
Output of above code:-
1
2
3
4
5
6
7
8
9
10
completed
Please refer below link for
detail understanding of reactive programming.
Reactive Stream JAVA 9-
Reactive stream is
asynchronous processing of data stream.
When we discuss about data
processing than there is at least one data producer and one data consumer, data
flow from producer to consumer, with a data pipeline in real time , and there
is a processor to transform the data.
Java 9 Reactive stream works
on same concept. It have flow package, contain Publisher, Subscriber and Processor
refer below-
java.util.concurrent.Flow.Publisher
java.util.concurrent.Flow.Subscriber
java.util.concurrent.Flow.Processor
Refer below code
snippet of simple implementation of Stream API
package
features.java9;
import
java.util.concurrent.Flow.Publisher;
import
java.util.concurrent.Flow.Subscriber;
import
java.util.concurrent.Flow.Subscription;
public class
TestDataFlow {
Publisher<Integer> publisher = new
Publisher<Integer>() {
public void
subscribe(Subscriber<? super
Integer> subscriber) {
int count = 0;
while(count <
10) {
subscriber.onNext(++count);
}
}
};
static Subscriber<Integer> subscriber = new
Subscriber<Integer>() {
public void
onSubscribe(Subscription subscription) {
}
public void
onNext(Integer item) {
System.out.println(item);
}
public void
onError(Throwable throwable) {
}
public void
onComplete() {
}
};
public static void
main(String args[]) {
new TestDataFlow().publisher.subscribe(subscriber);
}
}
package
features.java9;
import
java.util.concurrent.Flow.Publisher;
import
java.util.concurrent.Flow.Subscriber;
import
java.util.concurrent.Flow.Subscription;
public class
TestPublisher implements Publisher<Long> {
private static int publisherCount = 0;
public void
subscribe(Subscriber<? super
Long> subscriber) {
while(publisherCount <
10) {
subscriber.onNext(System.nanoTime());
}
subscriber.onComplete();
}
public static void
main(String[] args) {
new TestPublisher().subscribe(new
Subscriber<>() {
public void
onSubscribe(Subscription subscription) {
}
public void
onNext(Long item) {
try {
Thread.sleep(1000);
System.out.println("randome
value = " + item);
} catch
(InterruptedException e) {
}
publisherCount++;
}
public void
onError(Throwable throwable) {
System.out.println("error");
}
public void
onComplete() {
System.out.println("done");
}
});
}
}