rewrite a rxjs class (Subject classes: Behavior, Replay, Async)

    This is my experiment rewriting a part of rxjs lib with pure js, that provide same effect as the original one

    I made a simple version of BehaviorSubject before (

    Today I will implement others type of Subject including: BehaviorSubject, ReplaySubject, AsyncSubject using OOP. I guess I will try to make another one using FP as well.

    For the test cases, pls check the original doc

    Full implementation is at

    A. Overall

    These are all classes I created:

  1.     A base class MySubject that have these methods:

    class MySubject {
      subscribe(obj) { //store subscriber's callbacks and provide unsubscribe function 
      next(nextValue) { //push new value provided to value list 
      error(e) {//manage subject status (stopped or not)  and trigger subscriber's callbacks accordingly
      complete() {//same as error
  2.     A 2nd base class MySyncSubject extends MySubject

    class MySyncSubject extends MySubject {
      next(nextValue) {// override next method to trigger subscriber's next callback
  3.     MyReplaySubject extends MySyncSubject

    class MyReplaySubject extends MySyncSubject {
      subscribe(fn) {// override subscribe method to broadcast values based on bufferSize and timeFilter
  4.     MyBehaviorSubject extends MyReplaySubject

        This is BehaviorSubject rewritted, a special one of MyReplaySubject which have bufferSize = 1 and no timeFilter

  5.     MyAsyncSubject extends MySubject

    class MyAsyncSubject extends MySubject {
      complete() {//override complete method to broadcast the last value to all subscribers when subject completed

    B. Details

  1. MySubject
    class MySubject {
      constructor() {
        this.isStopped = false;
        this.subscribers = [];
        this.value = [];
      subscribe(obj) {//store subscriber's callbacks and provide unsubscribe function
        return () => {
      next(nextValue) {//push new value provided to value list
        if (!this.isStopped) {
          this.value.push({ nextValue: nextValue, timestamp: });
      error(e) {//manage subject status (stopped or not)  and trigger subscriber's callbacks
        if (!this.isStopped) {
          this.isStopped = true;
          this.subscribers.forEach((obj) => obj.error(e));
      complete() {
        if (!this.isStopped) {
          this.isStopped = true;
          this.subscribers.forEach((obj) => obj.complete());
  2. MySyncSubject
    class MySyncSubject extends MySubject {
     next(nextValue) {// override next method to trigger subscriber's next callback;
       if (!this.isStopped) {
         this.subscribers.forEach((obj) =>;
  3. MyReplaySubject
    class MyReplaySubject extends MySyncSubject {
     constructor(bufferSize, timeLimit = -1) {
       this.bufferSize = bufferSize;
       this.timeFilter = (i) => {
         return timeLimit > 0 ? - i.timestamp < timeLimit : true;
     subscribe(fn) {// override subscribe method to broadcast values based on bufferSize and timeFilter
         .slice(Math.max(this.value.length - this.bufferSize, 0))
         .map((i) =>;
  4. MyBehaviorSubject
    class MyBehaviorSubject extends MyReplaySubject {
     constructor(defaultValue) {// override bufferSize = 1 and no timeFilter
       this.value = [{ nextValue: defaultValue, timestamp: }];
       this.bufferSize = 1;
       this.timeFilter = () => true;
  5. MyAsyncSubject
    complete() {//override ```complete``` method to broadcast the last value to all subscribers when subject completed
       this.subscribers.forEach((obj) =>[this.value.length - 1].nextValue)

Nguồn: Viblo

Bình luận
Vui lòng đăng nhập để bình luận
Một số bài viết liên quan