Observable과 Flowable 비교
Flowable |
Observable |
Reactive Streams 인터페이스를 구현함 |
Reactive Streams 인터페이스를 구현하지 않음 |
Subscriber에서 데이터를 처리한다. |
Observer에서 데이터를 처리한다. |
데이터 개수를 제어하는 배압 기능이 있음 |
데이터 개수를 제어하는 배압 기능이 없음 |
Subscription으로 전달 받는 데이터 개수를 제어할 수 있다. |
배압 기능이 없기때문에 데이터 개수를 제어할 수 없다. |
Subscription으로 구독을 해지한다. |
Disposable로 구독을 해지한다. |
참고 공통 소스
4. 공통 소스 (참고용)
import java.time.LocalTime; import java.time.format.DateTimeFormatter; public class TimeUtil { public static long start; public static long end; final static DateTimeFormatter formatter = DateTimeFo..
dennis.tistory.com
Observable 구현
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class Todo {
public static void main(String[] args) throws InterruptedException {
// Observable 사용법
Observable<String> observable =
Observable.create(emitter -> {
String[] datas = {"Hello", "RxJava!"};
for(String data : datas){
if(emitter.isDisposed())
return;
emitter.onNext(data);
}
emitter.onComplete();
});
observable.observeOn(Schedulers.computation())
.subscribe(
data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE),
disposable -> {/**아무것도 하지 않는다.*/}
);
Thread.sleep(500L);
}
}
Flowable 구현
import com.itvillage.utils.LogType;
import com.itvillage.utils.Logger;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
public class Todo {
public static void main(String[] args) throws InterruptedException {
Flowable<String> flowable =
Flowable.create(emitter -> {
String[] datas = {"Hello", "RxJava!"};
for(String data : datas) {
// 구독이 해지되면 처리 중단
if (emitter.isCancelled())
return;
// 데이터 발행
emitter.onNext(data);
}
// 데이터 발행 완료를 알린다
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
flowable.observeOn(Schedulers.computation())
.subscribe(data -> Logger.log(LogType.ON_NEXT, data),
error -> Logger.log(LogType.ON_ERROR, error),
() -> Logger.log(LogType.ON_COMPLETE),
subscription -> subscription.request(Long.MAX_VALUE));
Thread.sleep(500L);
}
}
배압(Back Pressure)이란?
Flowable에서 데이터를 통지하는 속도가 Subscriber에서 통지된 데이터를 전달받아 처리하는 속도 보다 빠를 때 밸런스를 맞추기 위해 데이터 통지량을 제어하는 기능을 말한다.
배압 전략(BackpressureStrategy)
: Rxjava에서는 BackpressureStrategy 를 통해 Flowable이 통지 대기 중인 데이터를 어떻게 다룰지에
대한 배압 전략을 제공
1. MISSING 전략
- 배압을 적용하지 않는다.
- 나중에 onBackpressureXXX( ) 로 배압 적용을 할 수 있다.
2. ERROR 전략
- 통지된 데이터가 버퍼의 크기를 초과하면 MissingBackpressureException 에러를 통지한다.
- 즉, 소비자가 생산자의 통지 속도를 따라 잡지 못할 때 발생한다.
3. BUFFER 전략 : DROP_LATEST
- 버퍼가 가득 찬 시점에 버퍼내에서 가장 최근에 버퍼로 들어온 데이터를 DROP한다.
- DROP 된 빈 자리에 버퍼 밖에서 대기하던 데이터를 채운다.
4. BUFFER 전략 : DROP_OLDEST
- 버퍼가 가득 찬 시점에 버퍼내에서 가장 오래전에(먼저) 버퍼로 들어온 데이터를 DROP한다.
- DROP 된 빈 자리에는 버퍼 밖에서 대기하던 데이터를 채운다.
5. DROP 전략
- 버퍼에 데이터가 모두 채워진 상태가 되면 이후에 생성되는 데이터를 버리고(DROP),
버퍼가 비워지는 시점에 DROP 되지 않은 데이터부터 다시 버퍼에 담는다.
6. LATEST 전략
- 버퍼에 데이터가 모두 채워진 상태가 되면 버퍼가 비워질 때까지 통지된 데이터는 버퍼 밖에서
대기하며 버퍼가 비워지는 시점에 가장 나중에(최근에) 통지된 데이터부터 버퍼에 담는다.
import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ToDoSample {
public static void main(String[] args) {
// 에러 발생
Flowable.interval(1L, TimeUnit.MILLISECONDS) // 생성자
.doOnNext(System.out::println)
.observeOn(Schedulers.computation()) // 구독자 쓰레드
.subscribe( // 구독자
data -> {
Thread.sleep(1000);
System.out.println(data);
// 에러 발생 (생성자 속도를 구독자가 못따라가서 오류 )
}, System.out::println,
() -> {}
);
Flowable.interval(300L, TimeUnit.MILLISECONDS)
.doOnNext(System.out::println)
.onBackpressureBuffer( 2, // 버퍼가 수용할 수 있는 개수
() -> System.out.println("overflow"),
BackpressureOverflowStrategy.DROP_LATEST) // 1. DROP_LATEST/2. DROP_OLDEST
// .onBackpressureDrop(System.out::println) // -> 3. drop
// .onBackpressureLatest() // -> 4. Latest
.observeOn(Schedulers.computation(), false, 1) // 소비자쪽에 요청하는 사이즈 개수
.subscribe(
data -> {
Thread.sleep(1000);
System.out.println(data);
},System.out::println
);
////////////////////////////////////// 결과 /////////////////////////
// DROP_LATEST
// -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
//0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
//0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
//0.9초 -> 4 들어왔을 때 1초가 안지났으므로 오버플로 발생 3지우고 4저장 (2,4)
//1초 -> 1번 처리가 완료 2처리 중 버퍼는 4
//1.2초 -> 5 들어왔을 때 4,5 저장
//.....
// 2. DROP_OLDEST
// -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
//0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
//0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
//0.9초 -> 4 들어왔을 때 1초가 안지났으므로 오버플로 발생 2지우고 4저장 (3,4)
//1초 -> 1번 처리가 완료 3처리 중 버퍼는 4
//1.2초 -> 5 들어왔을 때 4,5 저장
//.....
// 3. Drop
// -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
//0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
//0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
//0.9초 -> 4 들어왔을 때 1초가 안지났으므로 오버플로 발생 4버림
//1초 -> 1번 처리가 완료 2처리 중 버퍼는 3
//1.2초 -> 5 들어왔을 때 3,5 저장
//.....
// Latest
// -> 1 들어왔을 때 소비자쪽에서는 1을 처리 (1초 동안)
//0.3초 -> 2 들어왔을 때 소비자쪽 버퍼 2 저장
//0.6초 -> 3 들어왔을 때 소비자쪽 버퍼 2,3 저장
//0.9초 -> 4 들어왔을 때 4 대기
//1초 -> 1번 처리가 완료 2처리 중 버퍼는 3 + 4 추가
//1.2초 -> 5 들어왔을 때 5 대기
//1.5초 -> 6 들어왔을 때 6 대기
//1.8초 -> 7 들어왔을 때 7 대기
//2.0초 -> 2번 처리가 완료 3번 처리 4 + 7 추가 ( 대기중에서 뒷자리 부터 들어오는 형식)
////////////////////////////////////// 결과 /////////////////////////
}
}
'Android > RX' 카테고리의 다른 글
4. 공통 소스 (참고용) (0) | 2021.01.14 |
---|---|
4. Simple, Maybe, Completable (0) | 2021.01.14 |
4. Reactive Streams (0) | 2021.01.14 |
4. 리액티브 프로그래밍이란 ? (0) | 2021.01.11 |