내용 보기

작성자

관리자 (IP : 106.247.248.10)

날짜

2022-12-19 14:04

제목

[Flutter] [스크랩] 스트림 (Stream)


스트림이란?

스트림은 데이터나 이벤트가 들어오는 통로다. 즉, 파이프나 다리와 같다고 생각하면 된다. 앱을 만들다보면 데이터를 처리할 일이 많은데, 어느 타이밍에 데이터가 들어올지 정확히 알기 어렵다. 스트림은 이와 같은 비동기 작업을 할 때 주로 쓰인다. 예를 들어, 네트워크에서 데이터를 받아 UI에 보여주는 상황을 생각해보자. 언제 네트워크에서 데이터를 받을지 알 수 없다. 이런 문제를 스트림은 데이터 생성과 소비하는 곳을 따로둬서 이 문제를 해결한다.

import 'dart:async';

void main() {
  // 1초마다 데이터 1개를 최대 5개까지 만듦.
  Stream stream = Stream.periodic(Duration(seconds: 1), (int x) => x).take(5);
  // 이벤트 처리
  stream.listen(print);
}

한쪽 끝에 값을 넣고 다른 쪽 끝에 listener가 있으면 listener가 해당 값을 받는다. 스트림에는 여러 개의 listener가 있을 수 있으며, 파이프 라인에 넣으면 모든 listener가 동일한 값을 받는다. 


스트림 생성

스트림을 만드는 방법은 다양하다. 즉시 만들거나 혹은 일정 시간마다 만들거나 Future를 사용해 만들 수도 있다.

import 'dart:async';

// 동기적인 iterable 형식을 반환하는 함수
Iterable<int> getIterable(int maxNum) sync* {
  for (int i = 0; i < maxNum; i++) {
    yield i;
  }
}

// 비동기적인 future 형식을 반환하는 함수
Future getData(int value) async {
  await Future.delayed(Duration(seconds: value));
  print('Fetched Data');
  return '$value초 대기후의 데이터';
}

void main() {
  // 일반적인 데이터를 다룰 때.
  Stream.fromIterable(<int>[1, 2, 3, 4, 5])
      .listen((int x) => print('iterable: $x'));

  // 동기적인 Iterable 반환 함수를 이용한 데이터를 다룰 때.
  Stream.fromIterable(getIterable(5)).listen((int x) {
    print('getIterable: $x');
  });

  // 스트림 데이터를 생성 후 5초간 초당 데이터를 소비한다.
  Stream.periodic(Duration(seconds: 1), (x) => x).take(5).listen((int x) {
    print('periodic: $x');
  });

  // 비동기적인 future 객체를 받아 소비하는 스트림. 3초간 기다린 데이터를 받는다.
  Stream.fromFuture(getData(3)).listen((event) {
    print('from future: $event');
  });

  // 비동기적인 future 객체 List를 받아 소비하는 스트림. 5, 7초간 기다린 데이터를 받는다.
  Stream.fromFutures(>[getData(5), getData(7)]).listen((event) {
    print('from futures: $event');
  });
}


/* 결과

iterable: 1
getIterable: 0
iterable: 2
getIterable: 1
iterable: 3
getIterable: 2
iterable: 4
getIterable: 3
iterable: 5
getIterable: 4
periodic: 0
periodic: 1
periodic: 2
Fetched Data
from future: 3초 대기후의 데이터
periodic: 3
periodic: 4
Fetched Data
from futures: 5초 대기후의 데이터
Fetched Data
from futures: 7초 대기후의 데이터

*/

스트림 데이터 다루기

스트림 데이터 다루는 방법은 다양한다. 다음의 코드는 스트림 데이터의 첫 번째, 마지막 데이터 그리고 스트림의 길이를 가져오는 코드이다.

import 'dart:async';

void main() {
  Stream<int> stream = Stream.fromIterable([1, 2, 3, 4, 5]);
  // 첫 번째 데이터를 가져온다.
  stream.first.then((value) => print('stream.first: $value'));

  // Stream은 기본적으로 하나의 listen만 가능하다. 그래서 아래처럼 stream을 갱신했다.
  stream = Stream.fromIterable([1, 2, 3, 4, 5]);
  // 마지막 데이터를 가져온다.
  stream.last.then((value) => print('stream.last: $value'));

  stream = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7]);
  // 스트림의 전체 길이
  stream.length.then((value) => print('stream.length: $value'));
}

/* 결과

stream.first: 1
stream.last: 5
stream.length: 7

*/
스트림은 기본적으로 싱글 서브스크립션(Single Subscription)이다. 싱글 서브스크립션은 단 한곳에서만 listen 할 수 있다. 여러곳에서 listen 을 하려면 Broadcast 로 변경해줘야 한다.

스트림 변경 (Stream Transformer)

map을 사용하면 스트림을 어느정도는 변경 가능하다. 다음의 코드를 보자.

import 'dart:async';

void main() {
  Stream streamMap = Stream.periodic(Duration(milliseconds: 200), (x) => x)
      .take(3)
      .map((x) => x + 10);

  streamMap.listen(print);
}

/* 결과

10
11
12

*/

간단한 것은 map을 이용하면 되겠지만, 복잡한 것은 처리하기 힘들다. 이럴 때는 StreamTransformer를 사용하자. StreamTransformer를 사용하면 스트림을 변경하여 사용할 수 있다. 다음의 예제를 보자.

import 'dart:async';

void main() {
  StreamTransformer streamTransformer = new StreamTransformer.fromHandlers(
    handleData: (data, sink) {
      // Stream Sink는 스트림 이벤트를 받아들인다.
      sink.add('first: $data');
      sink.add('second: $data');
    },
  );

  Stream stream = new Stream.fromIterable(<dynamic>['hello', 1, 2, 3, 4, 5]);
  stream
      .transform(streamTransformer)
      .listen((event) => print('listen: $event'));
}

/* 결과

listen: first: hello
listen: second: hello
listen: first: 1
listen: second: 1
listen: first: 2
listen: second: 2
listen: first: 3
listen: second: 3
listen: first: 4
listen: second: 4
listen: first: 5
listen: second: 5

*/

map을 사용했을 때보다 활용도가 높다.


사용자 정의 스트림 : async*, yield

스트림을 직접 만들어서 사용할 수 있다. 다음의 코드를 보자.

import 'dart:async';

// async* 키워드는 yield 키워드를 사용하겠다는 뜻이다.
Stream<int> createStream(List<int> numbers) async* {
  for (int element in numbers) {
    yield element;
  }
}

void main() {
  // 스트림 생성
  Stream<int> stream = createStream(<int>[1, 2, 3, 4, 5]);
  // 스티림 소비
  stream.listen(print);
}

/* 결과

1
2
3
4
5

*/
  • async* : async*는 generator를 만든다는 뜻이다. generator는 게이르게(lazily) 데이터 연산을 할 때 사용한다. 게으르다라는 의미는 미리 연산을 다 하는 것이 아니라, 요청이 있을 때까지 연산을 미루다가 필요할 때 처리하는 것을 뜻한다.
  • yield : return 키워드랑 유사하다. return은 한번 반환하면 함수가 종료되지만, yield는 열린 채로 존재하여 필요할 때 다른 연산을 할 수 있다.

Stream의 내부 동작 원리

스트림은 3가지로 구성되어 있다.

  1. Stream
  2. StreamSubscription : 스트림과 이벤트의 연결고리이다. 이벤트에 변경이 생기면 처리한다.
  3. Event (Data)



스트림의 내부 동작

import 'dart:async';

void main() {
  Stream<int> stream = new Stream.fromIterable(<int>[1, 2, 3, 4, 5]);
  
  StreamSubscription streamSubscription = stream.listen(print);

  // 연결 해제
  streamSubscription.cancel();
}

위의 그림에서 볼 수 있듯이 스트림은 이벤트 소스와 연결되어 있다.

  • listen (구독) 하기 전 : 강한 연결, 실선 화살표
  • listen (구독) 한 후 : 약한 연결, 점선 화살표, StreamSubscription과 EventSource가 연결된다.

위의 예제들을 보면 listen() 에서 이벤트 처리를 한 것 같았지만, 실제로는 StreamSubscription에서 이벤트 콜백을 다룬다. 새로운 이벤트가 생기거나 에러가 생기면, StreamSubscription에서 이를 처리한다. 뿐만 아니라 StreamSubscription은 이벤트 소스와의 연결도 끊을 수 있다.

StreamSubscriptoin은 이벤트 처리를 콜백을 정하여 실행한다. 

listen 콜백 함수는 스트림 유형 중 하나인 StreamSubscription (구독 신청)을 반환한다. 이것은 스트림 구독을 관리하는 데 사용할 수 있다. 구독의 가장 일반적인 사용법은 더 이상 데이터를 수신할 필요가 없을 때 청취를 취소하는 것이다. 그리고 스트림 구독은 앱이 종료되기 전까지 항상 열려있으므로 메모리 누수가 없는지 확인해야 한다. 아래의 코드는 등록 된 콜백에 대한 구독 객체를 생성해서 관리하기 위해 StreamSubscription으로 받아주고 있다.

StreamSubscription<int> streamSubscription = stream.listen((event) => print('from stream: $event'));
StreamSubscription : 스트림의 리스터는 구독에 대한 참조를 저장할 수 있으며, 이를 통해 수신한 데이터 흐름을 힐시 중지, 재개 또는 취소할 수 있다.

스트림 취소

스트림 구독을 취소할 필요가 없으면 그냥 놔두면 되지만 취소해야 할 때는 dispose 함수에 아래의 코드를 적용해주면 된다.

void dispose() {
  streamSubscription.cancel();
  super.dispose();
}

이벤트 처리에 대한 코드를 살펴보자.

# onData, onError, onDone을 streamSubscription에 정의

import 'dart:async';

void main() {
  Stream<int> stream =
      new Stream.periodic(new Duration(milliseconds: 200), (x) => x);

  StreamSubscription streamSubscription = stream.listen(null);

  // onData : 데이터를 하나씩 처리하는 이벤트
  streamSubscription.onData((data) {
    print('data: $data');
    if (data == 5) {
      streamSubscription.cancel();
    }
  });

  // onError : 에러가 발생했을 때 처리하는 이벤트
  streamSubscription.onError((err) => print('에러: $err'));
  // onDone : 스트림의 끝에 도달했을 때(더 이상 데이터가 없을 때) 처리하는 이벤트
  streamSubscription.onDone(() => print('스트림 작업 완료'));
}

/* 결과

data: 0
data: 1
data: 2
data: 3
data: 4
data: 5

*/
  • onData : 데이터를 하나씩 처리하는 이벤트
  • onError : 에러가 발생했을 때 처리하는 이벤트
  • onDone : 스트림의 끝에 도달했을 때 (더 이상 데이터가 없을 때) 처리하는 이벤트

# listen 내부에 정의

import 'dart:async';

void main() {
  Stream<int> stream =
      new Stream.periodic(new Duration(milliseconds: 200), (x) => x).take(5);

  StreamSubscription streamSubscription = stream.listen(
    (data) => print('data: $data'),
    onDone: () {
      print('스트림 완료');
    },
    onError: (err) {
      print('에러 : $err');
    },
  );
}

/* 결과

data: 0
data: 1
data: 2
data: 3
data: 4
스트림 완료

*/

사용법은 약간 다르지만 동작은 동일하다. 편한 것으로 사용하면 되겠다.


Stream : Broadcast

기본적으로 만들어지는 스트림은 한 곳에서만 listen 할 수 있다. 그래서 다음의 코드를 실행해보면 "Bad state: Stream has already been listened to." 라는 에러가 발생한다.

import 'dart:async';

void main() {
  Stream<int> stream =
      new Stream.periodic(new Duration(milliseconds: 200), (x) => x).take(5);

  stream.listen((event) => print(event));
  stream.listen((event) => print(event));
}

/* 결과 - 에러 발생

Bad state: Stream has already been listened to.
#0      _StreamController._subscribe (dart:async/stream_controller.dart:710:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:860:19)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:493:9)
...

*/

Flutter에서 Stream을 다룰 때, 가장 흔히 볼 수 있는 것은 "Stream has already been..." 에러 메시지다. 또 다른 예를 보도록하자.

import 'dart:async';

void main() {
  StreamController streamController = new StreamController();

  Stream stream = streamController.stream;

  stream.listen((event) => print('1st stream: $event'));
  stream.listen((event) => print('2nd stream: $event'));

  streamController.sink.add('Hello');
  streamController.sink.add('World');

  streamController.close();
}

/* 결과 - 에러 발생

Bad state: Stream has already been listened to.
#0      _StreamController._subscribe (dart:async/stream_controller.dart:710:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:860:19)
...

*/

위의 코드가 실행되면 마찬가지로 "Bad state..." 스트림 에러가 발생한다. 그 이유는 위에서 설명했듯이 기본적으로 만들어지는 스트림은 한 곳에서만 listen할 수 있기 때문이다. 그래서 두 가지 유형의 스트림이 존재한다.

  • 단일 구독 스트림 (Single) : 파일이나 웹 요청을 읽는 것과 같은 유형에 사용된다. 먼저 구독한 구독자가 올바른 순서로 올바른 정보를 모두 얻도록하려면 스트림이 존재하는 수명 주기 동안 한 번만 구독할 수 있는 제한이 필요하다. 이럴때 사용한다.
  • 방송 스트림 (Broadcast) : 이 유형의 스트림은 여러 곳에서 listen 할 수 있다.
StreamController streamController = new StreamController.broadcast();

위의 코드처럼 broadcast로 선언해주면 에러가 발생하지 않는다. 그리고 만약, Stream을 직접 구현하고 사용중에 있다면, 구독 중 Stream에 변경 사항 발생 시 스트림을 정리(cancel)하고 다시 연결해야 한다. 그렇지 않고 여러 번 구독하면 메모리 누수가 발생한다.

broadcast를 이용한 스트림 코드는 다음과 같다.

import 'dart:async';

void main() {
  StreamController<int> streamController = new StreamController.broadcast();
  Stream<int> stream = streamController.stream;

  stream.listen((event) => print('broadcast1: $event'));
  stream.listen((event) => print('broadcast2: $event'));

  streamController.sink.add(5);
  streamController.sink.add(55);

  streamController.close();
}

/* 결과

broadcast1: 5
broadcast2: 5
broadcast1: 55
broadcast2: 55

*/

broadcast를 선언하여 여러 곳에서 listen을 사용할 수 있다. 이처럼 같은 데이터를 다른 뷰에서 처리할 때 효과적으로 사용할 수 있다.

그리고 StreamController의 속성인 sink를 통해 add 메소드를 호출하고 있다. sink는 다음과 같이 정의된 속성이다.

StreamSink get sink;

즉, 이벤트 데이터를 StreamSink에 추가하는 의미이다.


StreamController

Stream을 매번 열었다가(listen) 닫는 것(cancel)은 비효율적이다. 만약 스트림이 여러개라면 정말 귀찮은 작업일 것이다. 더군다나 실수로 cancel을 하지 않는다면 메모리 누수로 인하여 불안정한 앱을 만들 수 있다.

이럴 때, StreamController를 사용하면 된다. StreamController는 여러 개의 스트림을 관리하기 위해 만들어졌기 때문이다.

StreamController<int> controller = new StreamController<int>();

값을 넣을 수 있는 스트림을 만드려면 StreamController 클래스를 이용하면 된다. 그러면 컨트롤러가 관리하는 스트림을 조작하는데 사용할 수 있는 컨트롤러가 구성된다. 컨트롤러 스트림은 stream 속성을 통해 접근할 수 있다.

Stream stream = controller.stream;

스트림 종료

스트림을 생성했다면 필히 앱이 종료될 때 스트림도 종료해주어야 한다.

streamController.close();

스트림 사용

스트림을 생성했다면 다음으로 할 일은 스트림으로부터 값을 얻는 것이다. 이것은 일반적으로 스트림 구독 또는 청취라고 한다. 스트림을 구독하면 구독 후 생성 된 값(스트림에 입력된 값)을 가져온다.

stream.listen((event) {
  print('From Stream: $event');
});

스트림에 값 추가

다음의 코드를 실행하면 위 섹션에 listen(구독) 하고 있는 콜백 함수가 실행된다.

controller.add(55);

/* 결과

From Stream: 55

*/

다음의 코드를 살펴보자.

import 'dart:async';

void main() {
  final StreamController<int> streamController =
      new StreamController.broadcast();

  final Stream<int> stream = streamController.stream;

  // 데이터 소비
  final StreamSubscription streamSubscription1 =
      stream.listen((data) => print('subscript1: $data'));
  final StreamSubscription streamSubscription2 =
      stream.listen((data) => print('subscript2: $data'));

  // 데이터 생성
  streamController.sink.add(5);
  streamController.sink.add(50);
  streamController.sink.add(500);

  // 컨트롤러 닫음
  streamController.close();

  // 위에서 컨트롤러가 닫혔기때문에 출력되지 않고 에러가 발생한다.
  // Bad state: Cannot add event after closing
  streamController.add(555);
}

StreamController를 사용하면 여러 스트림을 한꺼번에 관리할 수 있다. 그래서 앱을 제작할 때 큰 도움이 된다.


출처1

https://nomad-programmer.tistory.com/268

출처2