8. Observables

Some entity broadcasts what is happening, a stream of events. Hook into this entity and attach behavior whenever an event occurs. Detach from entity when no longer care about events it is broadcasting.

Subject, or entity, is aware of any observers that are watching it. When an event is emitted subject is able to pass this event to each oberver via methods that are provided when the observer begins to subscribe to it. Observables are composable, transform and combine them into new observables. Encapsulate concept of continuous stream of events, request-cancel-request with many useful operators.

Any, she wants many, make her a bunch of functions.

HttpModule and RoutingModule make use of this Observable pattern natively.

Observables with HTTP:

import { Component } from '@angular/core';
import { Http } from '@angular/http';

import 'rxjs/add/operator/map';

@Component({
  selector: 'app-article',
  template: `
      <h2>{{ title }}</h2>
      <p>{{ author }}</p>
  `
})
export class ArticleComponent {
    title = '';
    author = '';

    constructor(private http: Http) {
      http.get('../assets/articles.json')
      .map(response => response.json())
      .subscribe(article => {
        this.title = article.title;
        this.author = article.author;
      }),
      error => console.error(error);
    }

}

Cold observable means Observable does not begin to emit until an Observer begins to subscribe to it. Hot observable emits events regardless if observer is subscribed to it. HTTP Observables demand a cold designation, to prevent data loss.

Subjects: implementing publish-subscribe model:

import { Component  } from '@angular/core';

import { Subject } from 'rxjs/Subject';

@Component({
  selector: 'app-click-observer',
  template: `
    <button (click)="clickEmitter.next($event)">Emit event!</button>
    <p *ngFor="let click of clicks; let i = index;">
      {{ i }}: {{ click }}
    </p>
  `
})
export class ClickObserverComponent {
      clickEmitter: Subject<Event> = new Subject();
      clicks: Event[] = [];

      constructor() {
        this.clickEmitter.subscribe(clickEvent => this.clicks.push(clickEvent));
      }

}

Encapsulate subject instance:

import { Component  } from '@angular/core';

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';

@Component({
  selector: 'app-click-observer',
  template: `
    <button (click)="publish($event)">Emit event!</button>
    <p *ngFor="let click of clicks; let i = index;">
      {{ i }}: {{ click }}
    </p>
  `
})
export class ClickObserverComponent {
      private clickSubject: Subject<Event> = new Subject();
      clickEmitter: Observable<Event>;
      clicks: Event[] = [];

      constructor() {
        this.clickEmitter = this.clickSubject.asObservable();
        this.clickEmitter.subscribe(clickEvent => this.clicks.push(clickEvent));
      }

      publish(event: Event): void {
        this.clickSubject.next(event);
      }

}

RxJS native implementation:

import { Component,ViewChild, AfterViewInit  } from '@angular/core';

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEvent';

@Component({
  selector: 'app-click-observer',
  template: `
    <button #myButton>Emit event!</button>
    <p *ngFor="let click of clicks; let i = index;">
      {{ i }}: {{ click }}
    </p>
  `
})
export class ClickObserverComponent implements AfterViewInit {
      @ViewChild('myButton') myButton;
      clickEmitter: Observable<Event>;
      clicks: Event[] = [];

      ngAfterViewInit() {
        this.clickEmitter = Observable.fromEvent(this.myButton.nativeElement, 'click');
        this.clickEmitter.subscribe(clickEvent => this.clicks.push(clickEvent));
      }

}

BehaviorSubjects Observable authentication service:

Service:

import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Observable } from 'rxjs/Observable';

export const enum AuthenticationState {
    LoggedIn,
    LoggedOut
}

@Injectable()
export class AuthenticationService {
    private authenticationState: AuthenticationState;
    private authenticationManager: BehaviorSubject<AuthenticationState>
    = new BehaviorSubject(AuthenticationState.LoggedOut);
    authenticationChange: Observable<AuthenticationState>;

    constructor() {
      this.authenticationChange = this.authenticationManager.asObservable();
    }

    login(): void {
      this.setAuthenticationState(AuthenticationState.LoggedIn);
    }

    logout(): void {
      this.setAuthenticationState(AuthenticationState.LoggedOut);
    }

    private emitAuthenticationState(): void {
      this.authenticationManager.next(this.authenticationState);
    }

    private setAuthenticationState(newAuthenticationState: AuthenticationState): void {
      this.authenticationState = newAuthenticationState;
      this.emitAuthenticationState();
    }

}

LogInComponent:

import { Component, OnDestroy } from '@angular/core';
import { AuthenticationState, AuthenticationService } from '../authentication.service';

import { Subscription } from 'rxjs/Subscription';

@Component({
  selector: 'app-log-in',
  template: `
        <button *ngIf="!loggedIn" (click)="logIn()">Log In</button>
        <button *ngIf="loggedIn" (click)="logOut()">Log Out</button>
  `
})
export class LogInComponent implements OnDestroy {
      private authenticationChangeSubscription: Subscription;
      loggedIn: boolean;

      constructor(private authenticationService: AuthenticationService) {
        this.authenticationChangeSubscription =
        authenticationService.authenticationChange.subscribe(newAuthenticationState =>
        this.loggedIn = (newAuthenticationState === AuthenticationState.LoggedIn));
      }

      logIn(): void {
        this.authenticationService.login();
      }

      logOut(): void {
        this.authenticationService.logout();
      }

      ngOnDestroy() {
        this.authenticationChangeSubscription.unsubscribe();
      }

}

Publish-Subscribe Service:

Service:

import { Injectable } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';

import 'rxjs/add/operator/map';
import 'rxjs/add/operator/filter';

@Injectable()
export class PublishSubscribeService {

  private publishSubscribeSubject: Subject<any> = new Subject();
  emitter: Observable<any>;

  constructor() {
      this.emitter = this.publishSubscribeSubject.asObservable();
   }

   publish(channel: string, event: any): void {
     this.publishSubscribeSubject.next({
       channel: channel,
       event: event
     });
   }

  subscribe(channel: string, handler: ((value: any) => void)): Subscription {
      return this.emitter.filter(emission => emission.channel === channel)
                          .map(emission => emission.event)
                          .subscribe(handler);
   }

}

NodeComponent:

import { Component, Input, AfterViewInit, OnDestroy } from '@angular/core';

import { PublishSubscribeService } from '../publish-subscribe.service';
import { Subscription } from 'rxjs/Subscription';

@Component({
  selector: 'app-node',
  template: `
          <p>Heard {{ count }} of {{ subscribeChannel }}</p>
          <button (click)="send()">Send {{ publishChannel }}</button>
  `
})
export class NodeComponent implements AfterViewInit, OnDestroy {
      @Input() publishChannel: string;
      @Input() subscribeChannel: string;
      private publishSubscribeServiceSubscription: Subscription;
      count = 0;

  constructor(private publishSubscribeService: PublishSubscribeService) { }

  send() {
    this.publishSubscribeService.publish(this.publishChannel, {});
  }

  ngAfterViewInit() {
    this.publishSubscribeServiceSubscription
    = this.publishSubscribeService.subscribe(this.subscribeChannel, event => this.count++);
  }

  ngOnDestroy() {
    this.publishSubscribeServiceSubscription.unsubscribe();
  }

}

AppComponent:

import { Component } from '@angular/core';
import { Technology } from './technology';

@Component({
  selector: 'app-root',
  template: `
      <div class="row">
      <h1>{{ title }}</h1>
        <div class="col-md-6">
          <app-node subscribeChannel="polymer" publishChannel="keras"></app-node>
          <app-node subscribeChannel="keras" publishChannel="polymer"></app-node>
        </div>
        <div class="col-md-6">
            <app-princess></app-princess>
        </div>
      </div>
  `,
  styles: ['']
})
export class AppComponent {
  title = 'Angular 4 Love Affair';

}

Track ViewChildren changes with querylists and observables:

InnerComponent:

import { Component, Input } from '@angular/core';

@Component({
  selector: 'app-inner',
  template: `
          <p>{{ value }}</p>
  `
})
export class InnerComponent {
       @Input() value: number;

}

OuterComponent:

import { Component, ViewChildren, QueryList,
        AfterViewInit, ChangeDetectorRef } from '@angular/core';
import { InnerComponent } from '../inner/inner.component';

@Component({
  selector: 'app-outer',
  template: `
    <button (click)="add()">More</button>
    <button (click)="remove()">Less</button>
    <button (click)="shuffle()">Shuffle</button>
    <app-inner *ngFor="let l of list" value="{{l}}"></app-inner>
    <p>Value of last: {{ lastValue }}</p>
  `
})
export class OuterComponent implements AfterViewInit {
       @ViewChildren(InnerComponent) innerComponents: QueryList<InnerComponent>;
       list: number[] = [];
       lastValue: number;

       constructor(private changeDetectorRef: ChangeDetectorRef) { }

       add(): void {
         this.list.push(this.list.length);
       }

       remove(): void {
         this.list.pop();
       }

       shuffle(): void {
         this.list = this.list.sort(() => (4 * Math.random() > 2) ? 1 : -1 );
       }

       ngAfterViewInit() {
         this.innerComponents.changes.subscribe(innerComponents =>
                            this.lastValue = (innerComponents.last || {}).value);
         this.changeDetectorRef.detectChanges();
       }

}

Observables Autocomplete:

Service:

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Rx';

@Injectable()
export class APIService {

  constructor(private http: Http) { }

  search(query: string): Observable<string> {
    return this.http.get('../assets/response.json')
                    .map(response => response.json()['prefix'] + query)
                    .concatMap(a => Observable.of(a).delay(Math.random() * 1000));
  }

}

SearchComponent:

import { Component } from '@angular/core';
import { FormControl } from '@angular/forms';

import { APIService } from '../api.service';

@Component({
  selector: 'app-search',
  template: `
              <input [formControl]="queryInput">
              <p *ngFor="let result of results">{{ result }}</p>
  `
})
export class SearchComponent {
    results: string[] = [];
    queryInput: FormControl = new FormControl();

    constructor(private apiService: APIService) {
        this.queryInput.valueChanges
                        .debounceTime(400)
                        .distinctUntilChanged()
                        .subscribe(query => apiService.search(query) // switchMap
                        .subscribe(result => this.results.push(result)));
     }

}

results matching ""

    No results matching ""