Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | import { HttpHeaders } from '@angular/common/http';
import * as signalR from '@microsoft/signalr';
import { BehaviorSubject, from, fromEventPattern, Observable, using } from 'rxjs';
import { concatMap, filter, share, skipWhile } from 'rxjs/operators';
import { LogService } from 'src/app/log/log.service';
export abstract class SignalRBaseService<T> {
protected readonly connection: signalR.HubConnection;
// eslint-disable-next-line @typescript-eslint/naming-convention
protected readonly headers = new HttpHeaders({ 'Content-Type': 'application/json' });
protected isConnected = new BehaviorSubject<boolean>(false);
private readonly status$: Observable<T>;
private readonly isStarted = new BehaviorSubject<boolean>(false);
private readonly startAfterConnected$: Observable<void>;
public constructor(private readonly url: string, method: string, protected readonly logService: LogService) {
this.connection = new signalR.HubConnectionBuilder()
.withUrl(this.url)
.build();
// update connection status the rxjs way
from(this.connection.start()).subscribe(
() => this.isConnected.next(true),
(error) => {
console.error(error);
this.logService.sendErrorLog(`${error}`);
}
);
this.status$ = fromEventPattern<T>(
(handler) => this.connection.on(method, handler),
(handler) => this.connection.off(method, handler)
)
.pipe(
share(),
filter((x) => x !== undefined)
);
// send 'startState' only after 'isConnected' emits true
this.startAfterConnected$ = this.isConnected.pipe(
skipWhile((value) => !value),
concatMap(async () => this.connection.send('startState').catch((error) => {
console.error(error);
this.logService.sendErrorLog(`${error}`);
}))
);
}
public getStatus(): Observable<T> {
return using(
() => {
this.startAfterConnected$.subscribe(() => this.isStarted.next(true));
// eslint-disable-next-line @typescript-eslint/no-misused-promises
return { unsubscribe: async () => this.connection.send('stopState').catch((error) => {
console.error(error);
this.logService.sendErrorLog(`${error}`);
}) };
},
() => this.status$
);
}
}
|