Try   HackMD

RxJs 學習筆記(未完)

主要觀念:

非同步程式 (Asynchronous programming)
串流 (Stream)
觀察者模式 (Observer pattern)
疊代器模式 (Iterator pattern)
函式語言程式設計 (Functional programming)

學習資源:

https://www.youtube.com/watch?v=tGWBy6Vqq9w&ab_channel=LaithAcademy

想像:

人(observable)提著ingredient(data)
可以直接拿個處理的人/店家(observer)
店家下一步可能是next/error/complete
後兩者都會中斷接單

但也可中間先處理過(pipe)
中間每個處理單位就是(operator)

人跟店家的串連就是用subscribe

const observable = new Observable((subscriber) => {
  subscriber.next(users);
}).pipe(
  pluck("data"),
  filter((users) => users.length >= 10),
  map((users) => {
    return users.filter((user) => user.status === "active");
  }),
  map((users) => {
    return users.reduce((sum, user) => sum + user.age, 0) / users.length;
  }),
  map((average) => {
    if (average < 18) throw new Error(`Average age is too small (${average})`);
    else return average;
  }),
  map((average) => `The average age is ${average}`)
);

const observer = {
  next: (x) => console.log("Observer got a next value: " + x),
  error: (err) => console.error("Observer got an error: " + err),
  complete: () => console.log("Observer got a complete notification"),
};


observable.subscribe(observer);

請chatgpt舉例:
example of handling real-time data updates with and without using RxJS.

Scenario: We have a dashboard that displays real-time stock prices for multiple symbols. The prices are received from a WebSocket API, and we want to update the UI with the latest prices as they arrive.

Without RxJS:


import React, { useEffect, useState } from 'react';
import { connectToWebSocket, disconnectFromWebSocket } from './websocketAPI';

const StockDashboard = () => {
  const [stockData, setStockData] = useState({});

  useEffect(() => {
    const socket = connectToWebSocket();

    socket.on('message', event => {
      const updatedData = JSON.parse(event.data);
      setStockData(prevData => ({ ...prevData, ...updatedData }));
    });

    return () => {
      disconnectFromWebSocket(socket);
    };
  }, []);

  return (
    <div>
      {Object.entries(stockData).map(([symbol, price]) => (
        <div key={symbol}>
          <span>{symbol}: </span>
          <span>{price}</span>
        </div>
      ))}
    </div>
  );
};

export default StockDashboard;

In this non-RxJS example, we manually handle the WebSocket connection and update the UI state whenever a message is received. We use the useState hook to manage the stockData state, which is an object containing the latest stock prices for different symbols. When a new message arrives, we parse the data and merge it with the previous state.

With RxJS:


import React, { useEffect, useState } from 'react';
import { fromEvent } from 'rxjs';
import { map, scan } from 'rxjs/operators';
import { connectToWebSocket, disconnectFromWebSocket } from './websocketAPI';

const StockDashboard = () => {
  const [stockData, setStockData] = useState({});

  useEffect(() => {
    const socket = connectToWebSocket();
    const messageStream = fromEvent(socket, 'message').pipe(
      map(event => JSON.parse(event.data)),
      scan((prevData, updatedData) => ({ ...prevData, ...updatedData }), {})
    );

    const subscription = messageStream.subscribe(data => {
      setStockData(data);
    });

    return () => {
      subscription.unsubscribe();
      disconnectFromWebSocket(socket);
    };
  }, []);

  return (
    <div>
      {Object.entries(stockData).map(([symbol, price]) => (
        <div key={symbol}>
          <span>{symbol}: </span>
          <span>{price}</span>
        </div>
      ))}
    </div>
  );
};

export default StockDashboard;

In this RxJS example, we use the fromEvent operator to create an observable from the WebSocket 'message' event. We then apply the map operator to parse the event data into an object. The scan operator accumulates the data over time, merging it with the previous state.

The messageStream observable represents the stream of stock data updates. We subscribe to it and update the UI state using the setStockData function whenever new data arrives.

The RxJS example provides a more declarative and functional approach. It allows us to easily transform and manipulate the data stream using operators like map and scan. The state management is handled by the observable itself, eliminating the need for manual state updates.

Using RxJS, we can also leverage additional features like filtering, debouncing, throttling, or combining multiple data streams, depending on the requirements of the application.