본문 바로가기

Android/RX

4. Reactive Streams

Reactive Streams?

 

https://github.com/reactive-streams/reactive-streams-jvm/

 

reactive-streams/reactive-streams-jvm

Reactive Streams Specification for the JVM. Contribute to reactive-streams/reactive-streams-jvm development by creating an account on GitHub.

github.com

 

 

1. 리액티브 프로그래밍 라이브러리의 표준 사양

2. 리액티브 프로그래밍에 대한 인터페이스만 제공

3. Reactive StreamsPublisher, Subscriber, Subscription, Processor 라는 4개의 인터페이스를 제공 

  - Publisher : 데이터를 생성하고 통지

  - Subscriber : 통지된 데이터를 전달받아서 처리

  - Subscription : 전달 받을 데이터의 개수를 요청하고 구독을 해지

  - Processor : PublisherSubscriber의 기능이 모두 있음.

 

 

 

 

PublisherSubscriber간의 프로세스 흐름

 

 

Cold Publisher & Hot Publisher

 

1. Cold Publisher(차가운 생산자)

 - 생산자는 소비자가 구독 할때마다 데이터를 처음부터 새로 통지

 - 데이터를 통지하는 새로운 타임 라인이 생성

 - 소비자는 구독 시점과 상관없이 통지된 데이터를 처음부터 전달 받을 수 있다.

 

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class ToDoSample {

    public static void main(String[] args) {

        // ColdPublisher
        Flowable<Integer> flowable = Flowable.just(1,3,5,7);

        flowable.subscribe(data-> System.out.println("구독자1: " + data));
        flowable.subscribe(data-> System.out.println("구독자2: " + data));

        // 결과 -> 둘다 처음부터 시작됨됨
 	}
}

 

 

 

2. Hot Publisher(뜨거운 생산자)

 - 생산자는 소비자 수와 상관없이 데이터를 한번만 통지

 - , 데이터를 통지하는 타임 라인은 하나이다

 - 소비자는 발행된 데이터를 처음부터 전달 받는게 아니라 구독한 시점에 통지된 데이터들만 전달 받을 수 있다

 

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class ToDoSample {

    public static void main(String[] args) {

        // HotPublisher
        PublishProcessor<Integer> processor = PublishProcessor.create();
        processor.subscribe(data -> System.out.println("구독자1: " + data));
        processor.onNext(1);
        processor.onNext(3);


        processor.subscribe(data -> System.out.println("구독자2: " + data));
        processor.onNext(5);
        processor.onNext(7);

        processor.onComplete();

        // 구독자1: 1,3,5,7
        // 구독자2: 5,7

    }
}

 

'Android > RX' 카테고리의 다른 글

4. 공통 소스 (참고용)  (0) 2021.01.14
4. Simple, Maybe, Completable  (0) 2021.01.14
4. Observable, Flowable  (0) 2021.01.14
4. 리액티브 프로그래밍이란 ?  (0) 2021.01.11