RxJava - 실제 안드로이드 업무에 적용할 수 있는 것들 1편
본 글은 https://github.com/kaushikgopal/RxJava-Android-Samples 의 내용 중 필요한 부분만 발췌하여 번역한 것이므로 오역이 있을 수 있습니다.
본 글은 실제 RxJava의 요소들이 안드로이드 어플리케이션 내의 어떤 부분에서 적용될 수 있을지를 다룬 내용이다. 처음 Rx을 접하면 어떠한 방식으로 어플리케이션 내에서 응용이 가능한 지 가늠하기 어렵다.(현재 그런 상태이다..) 본 글을 정리하므로서 좀 더 Rx를 잘 활용할 수 있게 되기를 기대한다.
Retrofit과 RxJava를 활용한 네트워크 통신 (zip, flatmap 사용하기)
Sqaure사의 네트워크 통신 라이브러리인 Retrofit과 RxJava을 같이 사용하는 법을 다룬다.
_disposables.add(
_githubService
.contributors(_username.getText().toString(), _repo.getText().toString())
.flatMap(Observable::fromIterable)
.flatMap(
contributor -> {
Observable<User> _userObservable =
_githubService
.user(contributor.login)
.filter(user -> !isEmpty(user.name) && !isEmpty(user.email);
return Observable.zip(_userObservable, Observable.just(contributor), Pair::new);
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(
new DisposableObserver<Pair<User, Contributor>>() {
@Override
public void onComplete() {
Timber.d("Retrofit call 2 completed ");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Pair<User, Contributor> pair) {
User user = pair.first;
Contributor contributor = pair.second;
// ArrayAdapter 생략
// Log 생략
}
));
전체적인 코드의 동작은 다음과 같다. Github API를 통해 특정 레포의 컨트리뷰터들의 리스트를 획득한 후, 해당 리스트의 각 컨티리뷰터의 요소를 통해 유저 상세정보 API를 요청한다. 두 번의 API Call을 통해 얻어진 정보들을 .zip()을 이용하여 하나의 스트림으로 합친 후, down stream으로 전달한다. 이 때 .flatmap()을 사용했기때문에 인터리빙이 가능하며 방출되는 데이터의 순서는 보장되진 않는다.
추가적으로 .subscribeWith()를 통하여 스트림을 구독하였기때문에 observer가 return되며, 해당 observer는 compositedisposable 객체에 추가되어, 구독을 종료해야할 시점에 종료될 수 있다.
Rx의 초심자로서 조금 흥미로웠던 코드는 4행의 코드이다.
.flatMap(Observable::fromIterable)
flatMap은 Observable에서 방출된 item을 다시 nested Observable로 감싸는데, 이 때 방출된 item이 List
Exponential BackOff - 지수 백오프, 기초 그리고 심화 (delay, retryWhen 사용하기)
네트워크 요청이 실패하게 되면 서버에 재요청을 보내야하는데, 특별한 전략없이 recall을 하는 것은 비효율적이다. 이번 예제에서 소개하는 이 ‘특별한 전략’은 Exponential BackOff다.
Exponential BackOff(지수 백오프)란?
Network Call에 대한 오류 응답이 연이어 나올 때마다 재시도 간 대기 시간을 점진적으로 늘려 요청하는 것
사실 내용만으로 보면 특별한 전략인지는 모르겠지만, 필자는 Exponential BackOff를 물어보는 면접질문에 대답을 하지 못했다. 그 때의 상실감이 크니, 본 예제에서 Exponential BackOff를 분석해보자.
retryWhen으로 Exponential BackOff 구현하기
retryWhen은 error가 났을 경우, retry에 대한 전략을 커스텀할 수 있게 하는 retry이다.
Flowable.error(new RuntimeException("testing")) // 강제 실패 .retryWhen(new RetryWithDelay(5, 1000)) .doOnSubscribe(subscription -> _log("log 내용 생략")) .subscribe(disposableSubscriber);
본 예제에서는 RetryWithDelay라는 custom function를 정의하고, 해당 내부에 Exponetial BackOff 전략을 구현하였다.
이하는 RetryWithDelay클래스가 io.reactivex.functions의 apply()를 오버라이딩한 코드이다. apply()는 downstream으로 방출하는 과정에서 RxJavaPlugins::onAssembly()를 통해 내부적으로 자동 실행되는 함수이다.
@Override
public Publisher<?> apply(Flowable<? extends Throwable> inputObservable) {
return inputObservable.flatMap(
new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) {
if (++_retryCount < _maxRetries) {
return Flowable.timer(_retryCount * _retryDelayMillis, TimeUnit.MILLISECONDS);
}
return Flowable.error(throwable);
}
});
}
(자바 초보인 내가 보기엔) 좀 복잡해보인다. 차근차근 분석해보자.
.apply()의 파라미터로 Throwable 아이템을 다루는 스트림을 인자로 받았다. 그 후 flatMap을 통해 각 Throwable 객체에 접근하는데, 우리는 해당 에러 바로 throw할 생각이 없다. 그렇기때문에 다시 한 번 custom function을 통해서 내부적인 처리를 해줘야한다. 그렇게 처리한 내부는 다음과 같은 분기가 있다.
1) 최대 재요청 횟수를 초과하지 않으면 _retryCount * _retryDelayMillis만큼의 delay를 발생.
2) 최대 재요청 횟수를 초과하면 downstream에 error를 방출
실제 1번은 timer만 방출하는데 retry하는 방법이 궁금할 수 있다. 해당 기능은 .retryWhen()에서 다시 upstream을 재구독(.subscribe())하는 것으로 retry한다.
repeat으로 Exponential BackOff 구현하기
본 전략은 미리 주어진 range 값을 이용해 delay하는 것이다. 점진적으로 증가하는 delay를 기준으로 3회 요청하고 그 이상 응답이 없다면 네트워크 Call을 취소한다. 본 방식은 .retryWhen() 방식보다는 간단하기에 코드 리뷰는 별첨하지 않겠다.
폴링, 기초 그리고 심화 (interval, repeatWhen 사용하기)
본 에제는 RxJava를 통해서 구현한 polling에 대해서 다룬다.
polling이란?
충돌 회피 또는 동기화 처리 등을 목적으로 다른 프로그램의 상태를 주기적으로 검사하여 일정한 조건을 만족할 때 송수신 등의 자료처리를 하는 방식
예제 내에서 simple polling은 다루지 않고, Exponential BackOff를 기반으로 하는 점진적 딜레이 폴링에 대해서만 다루겠다.
이전의 Error처리 예제에서 RetryWithDelay를 사용했다면, 이번 예제에서는 RepeatWithDelay를 구현하여 사용한다.
스트림
//pollCount=8
//pollingInterval=1000
Flowable.just(1L)
.repeatWhen(new RepeatWithDelay(pollCount, pollingInterval))
.subscribe{println(it)}
스트림에 별다른 특이사항은 없다. .repeatWhen()을 통해서 custom function인 RepeatWithDelay를 실행시킨다.
RepeatWithDelay 내부의 .apply() 구현은 다음과 같다.
@Override
public Publisher<Long> apply(Flowable<Object> inputFlowable) throws Exception {
return inputFlowable.flatMap(
new Function<Object, Publisher<Long>>() {
@Override
public Publisher<Long> apply(Object o) throws Exception {
if (_repeatCount >= _repeatLimit) {
return Flowable.empty();
}
_repeatCount++;
return Flowable.timer(_repeatCount * _pollingInterval, TimeUnit.MILLISECONDS);
}
});
}
1) 지정한 반복 횟수(_repeatLimit)까지 반복했다면 Flowable.empty()를 방출하여 스트림을 끝낸다.
2) 그렇지 않다면, delay를 주고 repeat하는데 이 때 delay는 점진적 시간을 늘려간다.
코드 중 조금 특이한 사항이 있다면, Upstream에서 Long 타입 값을 방출했으니 inputFlowable이 Flowable