본문 바로가기

Android/RX

4. Observable, Flowable

Observable과 Flowable 비교

Flowable

Observable

Reactive Streams 인터페이스를 구현함

Reactive Streams 인터페이스를 구현하지 않음

Subscriber에서 데이터를 처리한다.

Observer에서 데이터를 처리한다.

데이터 개수를 제어하는 배압 기능이 있음

데이터 개수를 제어하는 배압 기능이 없음

Subscription으로 전달 받는 데이터 개수를 제어할 수 있다.

배압 기능이 없기때문에 데이터 개수를 제어할 수 없다.

Subscription으로 구독을 해지한다.
Disposable로 구독을 해지한다.

 

 

참고 공통 소스

dennis.tistory.com/30

 

4. 공통 소스 (참고용)

import java.time.LocalTime; import java.time.format.DateTimeFormatter; public class TimeUtil { public static long start; public static long end; final static DateTimeFormatter formatter = DateTimeFo..

dennis.tistory.com

Observable 구현

import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class Todo {
    public static void main(String[] args) throws InterruptedException {
    
       // Observable 사용법
        Observable<String> observable =
                Observable.create(emitter -> {
                    String[] datas = {"Hello", "RxJava!"};
                    for(String data : datas){
                        if(emitter.isDisposed())
                            return;

                        emitter.onNext(data);
                    }
                    emitter.onComplete();
                });

        observable.observeOn(Schedulers.computation())
                .subscribe(
                        data -> Logger.log(LogType.ON_NEXT, data),
                        error -> Logger.log(LogType.ON_ERROR, error),
                        () -> Logger.log(LogType.ON_COMPLETE),
                        disposable -> {/**아무것도 하지 않는다.*/}
        );

         Thread.sleep(500L);
    }
}

 

 

Flowable 구현

import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class Todo {
    public static void main(String[] args) throws InterruptedException {
        Flowable<String> flowable =
                Flowable.create(emitter -> {
                    String[] datas = {"Hello", "RxJava!"};
                    for(String data : datas) {
                        // 구독이 해지되면 처리 중단
                        if (emitter.isCancelled())
                            return;

                        // 데이터 발행
                        emitter.onNext(data);
                    }

                    // 데이터 발행 완료를 알린다
                    emitter.onComplete();
                }, BackpressureStrategy.BUFFER);

        flowable.observeOn(Schedulers.computation())
                .subscribe(data -> Logger.log(LogType.ON_NEXT, data),
                           error -> Logger.log(LogType.ON_ERROR, error),
                           () -> Logger.log(LogType.ON_COMPLETE),
                           subscription -> subscription.request(Long.MAX_VALUE));

        Thread.sleep(500L);
    }
}

 

 

 

배압(Back Pressure)이란?

 

Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도 보다 빠를 때 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능을 말한다.

 

 

 

 

배압 전략(BackpressureStrategy)

: Rxjava에서는 BackpressureStrategy 를 통해 Flowable이 통지 대기 중인 데이터를 어떻게 다룰지에

대한 배압 전략을 제공

 

1. MISSING 전략

 - 배압을 적용하지 않는다.

 - 나중에 onBackpressureXXX( ) 로 배압 적용을 할 수 있다.

 

 

2. ERROR 전략

 - 통지된 데이터가  버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지한다.

 - , 소비자가 생산자의 통지 속도를 따라 잡지 못할 때 발생한다.

 

 

3. BUFFER 전략 : DROP_LATEST

  - 버퍼가 가득 찬 시점에 버퍼내에서 가장 최근에 버퍼로 들어온 데이터를 DROP한다.

  - DROP 된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.

 

 

4. BUFFER 전략 : DROP_OLDEST

  - 버퍼가 가득 찬 시점에 버퍼내에서 가장 오래전에(먼저) 버퍼로 들어온 데이터를 DROP한다.

  - DROP 된 빈 자리에는 버퍼 밖에서 대기하던 데이터를 채운다.

 

 

5. DROP 전략

  - 버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 버리고(DROP),

    버퍼가 비워지는 시점에 DROP 되지 않은 데이터부터 다시 버퍼에 담는다.

 

 

6. LATEST 전략

  -  버퍼에 데이터가 모두 채워진 상태가 되면 버퍼가 비워질 때까지 통지된 데이터는 버퍼 밖에서

     대기하며 버퍼가 비워지는 시점에 가장 나중에(최근에) 통지된 데이터부터 버퍼에 담는다.

 

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class ToDoSample {

    public static void main(String[] args) {

	    // 에러 발생 
        Flowable.interval(1L, TimeUnit.MILLISECONDS) // 생성자
                .doOnNext(System.out::println)
                .observeOn(Schedulers.computation()) // 구독자 쓰레드
                .subscribe(  // 구독자
                  data -> {
                      Thread.sleep(1000);
                      System.out.println(data);

                     // 에러 발생 (생성자 속도를 구독자가 못따라가서 오류 )
                  }, System.out::println,
                  () -> {}
                );



        Flowable.interval(300L, TimeUnit.MILLISECONDS)
                .doOnNext(System.out::println)
                .onBackpressureBuffer(  2,  // 버퍼가 수용할 수 있는 개수
                                        () -> System.out.println("overflow"),
                                        BackpressureOverflowStrategy.DROP_LATEST) // 1. DROP_LATEST/2. DROP_OLDEST

//                .onBackpressureDrop(System.out::println)  // -> 3. drop
//                .onBackpressureLatest() // -> 4. Latest
                .observeOn(Schedulers.computation(), false, 1) // 소비자쪽에 요청하는 사이즈 개수
                .subscribe(
                        data -> {
                            Thread.sleep(1000);
                            System.out.println(data);
                        },System.out::println
                );
                
        ////////////////////////////////////// 결과 /////////////////////////        
        // DROP_LATEST
        // -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
        //0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
        //0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
        //0.9초 -> 4 들어왔을 때 1초가 안지났으므로 오버플로 발생 3지우고 4저장 (2,4)
        //1초 -> 1번 처리가 완료 2처리 중 버퍼는 4
        //1.2초 -> 5 들어왔을 때 4,5 저장
        //.....


        // 2. DROP_OLDEST
        // -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
        //0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
        //0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
        //0.9초 -> 4 들어왔을 때 1초가 안지났으므로 오버플로 발생 2지우고 4저장 (3,4)
        //1초 -> 1번 처리가 완료 3처리 중 버퍼는 4
        //1.2초 -> 5 들어왔을 때 4,5 저장
        //.....

        // 3. Drop
        // -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
        //0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
        //0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
        //0.9초 -> 4 들어왔을 때 1초가 안지났으므로 오버플로 발생 4버림
        //1초 -> 1번 처리가 완료 2처리 중 버퍼는 3
        //1.2초 -> 5 들어왔을 때 3,5 저장
        //.....

        // Latest
        // -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
        //0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
        //0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
        //0.9초 -> 4 들어왔을 때 4 대기
        //1초 -> 1번 처리가 완료 2처리 중 버퍼는 3 + 4 추가
        //1.2초 -> 5 들어왔을 때 5 대기
        //1.5초 -> 6 들어왔을 때 6 대기
        //1.8초 -> 7 들어왔을 때 7 대기
        //2.0초 -> 2번 처리가 완료 3번 처리 4 + 7 추가 ( 대기중에서 뒷자리 부터 들어오는 형식)
	    ////////////////////////////////////// 결과 /////////////////////////
    }
}

'Android > RX' 카테고리의 다른 글

4. 공통 소스 (참고용)  (0) 2021.01.14
4. Simple, Maybe, Completable  (0) 2021.01.14
4. Reactive Streams  (0) 2021.01.14
4. 리액티브 프로그래밍이란 ?  (0) 2021.01.11