RxJS Operators

Here is the link, on the basis of that I have created one JSFiddle, which is mentioned as below

JSFiddle ->


//fork – Use this operator when you need to run the Observables in parallel.//Don’t let me know until all the Observables are complete, then give me all the values at once. ( Array )const getPostOne2$ = Rx.Observable.timer(1000).mapTo({id: 1});const getPostTwo2$ = Rx.Observable.timer(2000).mapTo({id: 2});
Rx.Observable.forkJoin(getPostOne2$, getPostTwo2$).subscribe(res => console.log(res))
//concat -> Subscribe to Observables in order but only when the previous completes, let me know, then move to the next one.//The stream completes when all input streams complete and will throw an error if some of the input streams throw an error. It will never complete if some of the input streams don’t complete which also means that some streams will never be subscribed.
//Use concat if you want to combine them and ensure that the value from cache is delivered first.
////here id:1, will subscribe first.
const getPostOne$ = Rx.Observable.timer(3000).mapTo({id: 1});const getPostTwo$ = Rx.Observable.timer(1000).mapTo({id: 2});
Rx.Observable.concat(getPostOne$, getPostTwo$).subscribe(res => console.log(res));
//merge -> Subscribe to Observables whatever complete first..//The stream completes when all input streams complete and will throw an error if any of the streams throws an error. It will never complete if some of the input streams don’t complete.
//here id:2, will subscribe first.
const getPostOne1$ = Rx.Observable.timer(3000).mapTo({id: 1});const getPostTwo1$ = Rx.Observable.timer(1000).mapTo({id: 2});
Rx.Observable.merge(getPostOne1$, getPostTwo1$).subscribe(res => console.log(res));

//race -This operator can be useful if you have multiple resources that can provide values, for example, servers around the world, but due to network conditions the latency is not predictable and varies significantly. Using this operator you can send the same request out to multiple data sources and consume the result of the first that responds.
const getPostOne3$ = Rx.Observable.timer(1000).mapTo({id: 1});const getPostTwo3$ = Rx.Observable.timer(1000).mapTo({id: 2});
Rx.Observable.race(getPostOne3$, getPostTwo3$).subscribe(res => console.log(res));
//cold observablevar cold = Rx.Observable.create( (observer) => { observer.next(Math.random())});
cold.subscribe( (a) => console.log(a))cold.subscribe( (a) => console.log(a))

//hot observable -> way1
var random = Math.random();
var cold1 = Rx.Observable.create( (observer) => { observer.next(random)});
cold1.subscribe( (a) => console.log(a))cold1.subscribe( (a) => console.log(a))

//hot observable -> way2
var cold2 = Rx.Observable.create( (observer) => { observer.next(random)});
var hot2 = cold2.publish()

hot2.subscribe( (a) => console.log(a))hot2.subscribe( (a) => console.log(a))

//completion of the observable//this is a simple case where observable is a type which is able to complete after given time..
const timer = Rx.Observable.timer(1000);
timer.finally(() => console.log(‘all done’)) .subscribe();        //completetion for the observable which are not the type of complete ..interval
//here finally will not be able to work… as mentioned in the below code//const interval = Rx.Observable.interval(500).finally(() => console.log(‘all done’)).subscribe( () => console.log(‘interval’));

//how to unsubscribe it..
const subscribtion = Rx.Observable.interval(500).subscribe( () => console.log(‘interval’));
setTimeout(() => { subscribtion.unsubscribe()},1000)

//of operator –> for aonymous value
Rx.Observable.of(1,2,[“Ram”,”shyam”]).subscribe((result) => console.log(result));
//map operator –> convert the stream before subscribe
const jsonString = ‘{“type” : “Dog”, “breed” : “Pug”}’;const apiCall = Rx.Observable.of(jsonString);
apiCall.map((res) => JSON.parse(res)) .subscribe((obj) => {                              console.log(obj.type);                               console.log(obj.breed) });

//do -> this will allow to execute the code without affecting the underlying observable
const names = Rx.Observable.of(‘ram’, ‘shyam’);
names.do((name) => console.log(name)).map((name) => name.toUpperCase()).do((name) => console.log(name)).subscribe()

//mergeMap//Only when the inner Observable emits, let me know by merging the value to the outer Observable.
const post$ = Rx.Observable.of({id: 1});const getPostInfo$ = Rx.Observable.timer(3000).mapTo({title: “Post title”});
const posts$ = post$.mergeMap(post => { return getPostInfo$                              .map( second => post.id + ‘ ‘ + second.title)                              }).subscribe(res => console.log(res));                                                            const posts_1$ = post$.switchMap(post => { return getPostInfo$                              .map( second => post.id + ‘ ‘ + second.title)                              }).subscribe(res => console.log(res));
//switchMap //it is useful in the app development when you need a userId before a query on the db based on the userIdlet btn = document.querySelector(“button”);let clicks  = Rx.Observable.fromEvent(btn, ‘click’);
clicks.switchMap(click => { return Rx.Observable.interval(533500)}).subscribe(i => console.log(i))

//takeUntil//it is useful to terminate an observable even without unsubsciber
const interval = Rx.Observable.interval(500)const notifier = Rx.Observable.timer(2000)
interval.takeUntil(notifier) .finally( () => console.log(‘completed’))        .subscribe( i => console.log(i));

//takeWhile –> opposite to takeUntil.. run till the condition set in while
const obs = Rx.Observable.of(‘h’, ‘hi’, ‘him’, ‘hima’);
obs.takeWhile( (r) => r != ‘hima’ ) .finally(() => console.log(‘just print’))    .subscribe( i => console.log(i))    //zip//zip –> combineding way -> //useful if we have two observables having same length and connec
const o1 = Rx.Observable.of(‘hi’, ‘pa’);const o2 = Rx.Observable.of(‘manshu’, ‘pareek’);
const result = Rx.Observable.zip(o1, o2);
result.subscribe( arr => console.log(arr));
//forkJoin -> wait for all the observables to complete and then return the combined values//here as we are delaying second observable, so that it will wait for the two seconds even for the first obsrvable.//it returns the last value from all of the observables..//so here output will be ‘pa’ and ‘pareek’//it can be useful, for the bunch of related api’s and want to resolve all before sending some data to the ui..
const o11 = Rx.Observable.of(‘hi’, ‘pa’);const o12 = Rx.Observable.of(‘manshu’, ‘pareek’).delay(2000);
const result1 = Rx.Observable.forkJoin(o11, o12);
result1.subscribe( arr => console.log(arr));
const obs = Rx.Observable.create( observer => { observer.next(‘good’)  observer.next(‘great’)  observer.next(‘grand’)    throw ‘catch me’    observer.next(‘after throw’)})
obs.catch( err => console.log(err)).subscribe(val => console.log(val));

//retry -> retry even after a catch
const obsNew = Rx.Observable.create( observer => { observer.next(‘good’)  observer.next(‘great’)  observer.next(‘grand’)    throw ‘catch me’    observer.next(‘after throw’)})
obsNew.catch( err => console.log(err)).retry(2).subscribe(val => console.log(val));

//a subject can be used just as observable to create an obserable, like belowconst obs_1 = Rx.Observable.of(‘Observable’);const sub_1 = Rx.Subject.of(‘Subject’);
obs_1.subscribe((res) => console.log(res));sub_1.subscribe((res1) => console.log(res1));
//subject -> main benefit -> being able to broadcast the new values to subscriber without having to rely on some source data.
const subject1 = new Rx.Subject()
const subA = subject1.subscribe(val => console.log(‘subA val1’))const subB = subject1.subscribe(val => console.log(‘subB val1’))
setTimeout( () => { subject1.next(‘World’)}, 1000)

var urls = [  ‘https://httpbin.org/get?1’,  ‘https://httpbin.org/get?2’,  ‘https://httpbin.org/get?3’,  ‘https://httpbin.org/get?4’,  ‘https://httpbin.org/get?5’%5D;
Rx.Observable.from(this.urls)  .concatMap(url => http.get(url))  .subscribe(response => console.log(response.url));
////filter//first//last//debounce//throttleTime –> helpful to check that is user is stopped doing typing ..etc..


(Debounce) You want to make sure that a user has finished performing an action before carrying out a task e.g wait for a user to finish typing into an input field before sending an ajax request to query the db.
//(Throttle) You want to control the rate at which an event is being fired so as to reduce the amount of time the corresponding task would run e.g you have an autcomplete field that queries your db and you want to trottle rate at which the input event is fired,(to avoid too many request to be sent in a short time).