Observable In Angular
In this article, we will learn about what is observable in Angular and how to use them in Angular applications. We hear a lot of terms like Reactive programming, data streams, Observable, Observers, RxJS, and so on when we talk about Angular Observable. Before we begin using the observables, it is critical that we understand this terminology.
Reactive programming is abbreviated as Rx. Programming with asynchronous data streams is what it's called. As a result, it is critical that you comprehend what a data stream is.
What is a data stream?
A stream is a collection of data that arrives over time. The data stream can be anything. Variables, user inputs, attributes, caches, data structures, and even failures are just a few examples.
Consider a series of mouse click events with varying x and y positions. Assume the user has clicked on the following sites in that order: (12,15), (10,12), (15,20), and (17,15).
The following diagram depicts how the values change over time. As you can see, the stream emits values asynchronously as they occur.
The value that a stream emits is not the only thing it emits. As the user closes the window or app, the stream may end. Alternatively, an error could occur, causing the stream to be closed. Any of the three things listed below may be emitted by the stream at any moment in time.
The next value in the stream is called a value.
Complete: The stream is no longer active.
The feed has been interrupted due to an error.
The figure below depicts all three options in a stream.
As previously said, the data stream can be anything. As an example,
- With x and y positions, mouse click or mouse hover events are possible.
- Keyboard events such as keyup, keydown, and keypress, among others.
- Value changes, for example, are examples of form events.
- Following an HTTP request, data is received.
- Notifications to users.
- Any sensor can be used to make measurements.
Important information about streams can be found here.
- emit zero, one, or more times values
- Errors are also possible.
- When the signal is complete, it must be broadcast (finite streams).
- They have the potential to be limitless, implying that they will never be completed.
Let's look at what Reactive Programming is now that we know what a data stream is.
Reactive Programming
It's all about creating the stream, emitting value, error, or full signals, manipulating, transferring, or doing something helpful with the data streams in reactive application.
This is where RxJs enter the equation.
The introduction to Reactive Programming you've been missing is a great way to get started with the language. Also see Rx: A Beginner's Guide.
What is RxJS
RxJS (Reactive Extensions Package for JavaScript) is a javascript library that lets us work with asynchronous data streams.
To implement Reactive Programming, the Angular framework significantly relies on the RxJS library. The following are some instances of reactive programming in action.
- In Angular, responding to an HTTP request
- Angular Forms Value Changes/Status Changes
- Observables are used by the Router and Forms modules to listen for and respond to user input events.
- Custom events can be defined to convey observable output data from a child component to a parent component.
- AJAX requests and answers are handled by the HTTP module using observables.
There are two primary players in the RxJs.
- Observable
- Observers ( Subscribers)
What is an Observable in Angular
A function that changes an ordinary stream of data into an observable stream of data is called observable. Consider Observable to be a wrapper around a regular stream of data.
The value from the stream is emitted asynchronously by an Observable stream or a plain Observable. When the stream completes, it produces a complete signal, or an error signal if the stream fails.
Declarative observables. An observable function is defined in the same way as any other variable. When someone subscribes to the observable, it begins to emit values.
Who are observers (subscribers)
The Observable is meaningless in and of itself unless someone consumes the value it emits. We refer to them as "observers" or "subscribers."
Callbacks are used by the observers to communicate with the Observable.
To receive the value from the observable, the observer must subscribe to it. It optionally passes the three callbacks while subscribing. complete(), next(), and error().
As soon as the observer or consumer subscribes to it, the observable begins emitting the value.
When a value arrives in the stream, the observable calls the next() callback. The value is passed to the next callback as an argument. The error() callback is called if an error occurs. When the stream is finished, it calls the complete() function.
- Observers/subscribers are people who are interested in Observables.
- At the moment of subscribing, the observer registers three callbacks with the observable. next(), error(), and complete() are examples of these methods ()
- Each of the three callbacks is optional.
- The data is sent from the observer to the observer via the next() function.
- They also get the Observable's errors and completion events via the error() and complete() callbacks.
Angular Observable
Now that we've covered the fundamentals of RxJs Observable, let's look at an example of how it works.
Create a new angular project. The contents in app.component.html should be removed. Activate the app.component.ts file.
Import the required libraries
When you build an Angular project, the RxJs library is immediately installed. As a result, there is no need to set it up.
Using the rxjs library, import the Observable.
import { Observable } from 'rxjs';
Observable Creation
obs = new Observable((observer) => { console.log("Observable starts") observer.next("1") observer.next("2") observer.next("3") observer.next("4") observer.next("5") })
- create
- defer
- empty
- from
- fromEvent
- interval
- of
- range
- throwError
- timer
Subscribing to the observable
ngOnInit() { this.obs.subscribe( val => { console.log(val) }, //next callback error => { console.log("error") }, //error callback () => { console.log("Completed") } //complete callback ) }
The whole code for app.component.ts is displayed below:
import { Component, OnInit } from '@angular/core'; import { Observable } from 'rxjs'; @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit { title = 'Angular Observable using RxJs - Getting Started'; obs = new Observable((observer) => { console.log("Observable starts") observer.next("1") observer.next("2") observer.next("3") observer.next("4") observer.next("5") }) data=[]; ngOnInit() { this.obs.subscribe( val=> { console.log(val) }, error => { console.log("error")}, () => {console.log("Completed")} ) } }
Adding interval
obs = new Observable((observer) => { console.log("Observable starts") setTimeout(() => { observer.next("1") }, 1000); setTimeout(() => { observer.next("2") }, 2000); setTimeout(() => { observer.next("3") }, 3000); setTimeout(() => { observer.next("4") }, 4000); setTimeout(() => { observer.next("5") }, 5000); })
Error event
obs = new Observable((observer) => { console.log("Observable starts") setTimeout(() => { observer.next("1") }, 1000); setTimeout(() => { observer.next("2") }, 2000); setTimeout(() => { observer.next("3") }, 3000); setTimeout(() => { observer.error("error emitted") }, 3500); //sending error event. observable stops here setTimeout(() => { observer.next("4") }, 4000); //this code is never called setTimeout(() => { observer.next("5") }, 5000); })
Complete Event
obs = new Observable((observer) => { console.log("Observable starts") setTimeout(() => { observer.next("1") }, 1000); setTimeout(() => { observer.next("2") }, 2000); setTimeout(() => { observer.next("3") }, 3000); setTimeout(() => { observer.complete() }, 3500); //sending complete event. observable stops here setTimeout(() => { observer.next("4") }, 4000); //this code is never called setTimeout(() => { observer.next("5") }, 5000); })
Observable Operators
obs.pipe( obs = new Observable((observer) => { observer.next(1) observer.next(2) observer.next(3) observer.next(4) observer.next(5) observer.complete() }).pipe( filter(data => data > 2), //filter Operator map((val) => {return val as number * 2}), //map operator )
The table below lists some of the most regularly used operators.
AREA | OPERATORS |
---|---|
Combination | combineLatest, concat, merge, startWith , withLatestFrom, zip |
Filtering | debounceTime, distinctUntilChanged, filter, take, takeUntil, takeWhile, takeLast, first, last, single, skip, skipUntil, skipWhile, skipLast, |
Transformation | bufferTime, concatMap, map, mergeMap, scan, switchMap, ExhaustMap, reducex1 |
Utility | tap, delay, delaywhen |
Error Handling | throwerror, catcherror, retry, retrywhen |
Multicasting | share |
Unsubscribing from an Observable
obs: Subscription;
Assign the obs variable to the subscription.
this.obs = this.src.subscribe(value => { console.log("Received " + this.id); });
In the ngOnDestroy method, call the unsubscribe() method.
ngOnDestroy() { this.obs.unsubscribe(); }