Below I will be giving a few code snippets to check the features in Reactivex API, you can download its jar & use it in Maven/Gradle
I will not say these APIs are good or bad but these APIs are to be used for specific cases, not anywhere to show your code modern. I see many people discourage Imperative code style & favor Declarative or Functional coding style. But if you choose any style on the basis of these comments then surely you are hurting your project. In the starting Declarative style looks good but if it doesn't fit in your project/expectations properly then you will be facing more problems soon.
So use your judgement to decide any one style or mix of both but don't decide on the basis of others' comments.
Like below code, you see I am using ReactiveX API & I see that same task done using imperative style can be done quickly.
import java.util.ArrayList;
import java.util.List;
import io.reactivex.Observable;
import io.reactivex.functions.Action;
public class ReactiveObservable {
private static Integer sum = 0;
private static Integer oSum = 0;
private static Integer eSum = 0;
public static void main(String[] args) {
getList();
Observable<Integer> todoObservable = Observable.create(emitter -> {
try {
List<Integer> ints = getTodos();
for (Integer todo : ints) {
// if(todo == 4)
// throw new Exception("4 seen....");
emitter.onNext(todo);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
long start = System.currentTimeMillis();
todoObservable.subscribe(a->{sum(a);if(a%2 == 0) eSum(a); else oSum(a);}, e -> System.out.println(e.getMessage()), isDone());
System.out.println("All : " + sum);
System.out.println("Odd : " + oSum);
System.out.println("Even : " + eSum);
System.out.println("Time taken for All : " + (System.currentTimeMillis()-start));
System.out.println("------------Second Subscriber-------------");
sum = 0;
start = System.currentTimeMillis();
todoObservable.filter(a -> a%2 == 0)
.subscribe(a->sum(a), e -> System.out.println(e.getMessage()));
System.out.println("Even : " + sum);
System.out.println("Time taken for Even : " + (System.currentTimeMillis()-start));
Observable<Integer> observable = Observable.fromIterable(getTodos());
System.out.println("------------Subscriber to observable-------------");
sum = 0;
start = System.currentTimeMillis();
observable.filter(a->a%2 != 0)
.subscribe(a->sum(a));
System.out.println("Odd : " + sum);
System.out.println("Time taken for Odd : " + (System.currentTimeMillis()-start));
}
private static List<Integer> getTodos() {
List<Integer> list = new ArrayList<>();
for(int i = 0; i < 100000; i++) {
list.add(i);
}
return list;
}
private static Action isDone() {
return new Action() {
@Override
public void run() throws Exception {
System.out.println("Completed.....");
}
};
}
private static List<Integer> getList() {
List<Integer> list = new ArrayList<>();
int all = 0;
int odd = 0;
int even = 0;
long start = System.currentTimeMillis();
for(int i = 0; i < 100000; i++) {
all+=i;
if(i%2 == 0)
even+=i;
else
odd+=i;
list.add(i);
}
System.out.println("All : " + all);
System.out.println("Odd : " + odd);
System.out.println("Even : " + even);
System.out.println("Time taken in for loop : " + (System.currentTimeMillis()-start));
return list;
}
private static int sum(int a) {
return sum+=a;
}
private static int eSum(int a) {
return eSum+=a;
}
private static int oSum(int a) {
return oSum+=a;
}
}
OUTPUT
All : 704982704
Odd : -1794967296
Even : -1795017296
Time taken in for loop : 7ms
Completed.....
All : 704982704
Odd : -1794967296
Even : -1795017296
Time taken for All : 25ms
------------Second Subscriber-------------
Even : -1795017296
Time taken for Even : 12ms
------------Subscriber to observable-------------
Odd : -1794967296
Time taken for Odd : 12ms
===============================================================================================
I will not say these APIs are good or bad but these APIs are to be used for specific cases, not anywhere to show your code modern. I see many people discourage Imperative code style & favor Declarative or Functional coding style. But if you choose any style on the basis of these comments then surely you are hurting your project. In the starting Declarative style looks good but if it doesn't fit in your project/expectations properly then you will be facing more problems soon.
So use your judgement to decide any one style or mix of both but don't decide on the basis of others' comments.
Like below code, you see I am using ReactiveX API & I see that same task done using imperative style can be done quickly.
import java.util.ArrayList;
import java.util.List;
import io.reactivex.Observable;
import io.reactivex.functions.Action;
public class ReactiveObservable {
private static Integer sum = 0;
private static Integer oSum = 0;
private static Integer eSum = 0;
public static void main(String[] args) {
getList();
Observable<Integer> todoObservable = Observable.create(emitter -> {
try {
List<Integer> ints = getTodos();
for (Integer todo : ints) {
// if(todo == 4)
// throw new Exception("4 seen....");
emitter.onNext(todo);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
});
long start = System.currentTimeMillis();
todoObservable.subscribe(a->{sum(a);if(a%2 == 0) eSum(a); else oSum(a);}, e -> System.out.println(e.getMessage()), isDone());
System.out.println("All : " + sum);
System.out.println("Odd : " + oSum);
System.out.println("Even : " + eSum);
System.out.println("Time taken for All : " + (System.currentTimeMillis()-start));
System.out.println("------------Second Subscriber-------------");
sum = 0;
start = System.currentTimeMillis();
todoObservable.filter(a -> a%2 == 0)
.subscribe(a->sum(a), e -> System.out.println(e.getMessage()));
System.out.println("Even : " + sum);
System.out.println("Time taken for Even : " + (System.currentTimeMillis()-start));
Observable<Integer> observable = Observable.fromIterable(getTodos());
System.out.println("------------Subscriber to observable-------------");
sum = 0;
start = System.currentTimeMillis();
observable.filter(a->a%2 != 0)
.subscribe(a->sum(a));
System.out.println("Odd : " + sum);
System.out.println("Time taken for Odd : " + (System.currentTimeMillis()-start));
}
private static List<Integer> getTodos() {
List<Integer> list = new ArrayList<>();
for(int i = 0; i < 100000; i++) {
list.add(i);
}
return list;
}
private static Action isDone() {
return new Action() {
@Override
public void run() throws Exception {
System.out.println("Completed.....");
}
};
}
private static List<Integer> getList() {
List<Integer> list = new ArrayList<>();
int all = 0;
int odd = 0;
int even = 0;
long start = System.currentTimeMillis();
for(int i = 0; i < 100000; i++) {
all+=i;
if(i%2 == 0)
even+=i;
else
odd+=i;
list.add(i);
}
System.out.println("All : " + all);
System.out.println("Odd : " + odd);
System.out.println("Even : " + even);
System.out.println("Time taken in for loop : " + (System.currentTimeMillis()-start));
return list;
}
private static int sum(int a) {
return sum+=a;
}
private static int eSum(int a) {
return eSum+=a;
}
private static int oSum(int a) {
return oSum+=a;
}
}
OUTPUT
All : 704982704
Odd : -1794967296
Even : -1795017296
Time taken in for loop : 7ms
Completed.....
All : 704982704
Odd : -1794967296
Even : -1795017296
Time taken for All : 25ms
------------Second Subscriber-------------
Even : -1795017296
Time taken for Even : 12ms
------------Subscriber to observable-------------
Odd : -1794967296
Time taken for Odd : 12ms
===============================================================================================
Note share() used during the creation of Flowable instance.
What it does?
Check the above code by removing share(). You will see even second Subscriber is starting after 5 sec but still it can see the data from the start of the stream. So it is like2 persons watching the same Youtube video from the start i.e. both users are having there own stream from the source.
While share() makes stream for all the subscribers common. So as subscriber2 starts after 5 seconds, it will see the same data as seen by subscriber1.
What it does?
Check the above code by removing share(). You will see even second Subscriber is starting after 5 sec but still it can see the data from the start of the stream. So it is like2 persons watching the same Youtube video from the start i.e. both users are having there own stream from the source.
While share() makes stream for all the subscribers common. So as subscriber2 starts after 5 seconds, it will see the same data as seen by subscriber1.