Programming

Java 8 스트림과 RxJava 옵저버 블의 차이점

procodes 2020. 6. 27. 14:20
반응형

Java 8 스트림과 RxJava 옵저버 블의 차이점


Java 8 스트림은 RxJava와 비슷합니까?

Java 8 스트림 정의 :

java.util.stream패키지의 클래스 는 요소 스트림에서 기능 스타일 작업을 지원하는 Stream API를 제공합니다.


TL; DR : 모든 시퀀스 / 스트림 프로세싱 라이브러리는 파이프 라인 구축을 위해 매우 유사한 API를 제공합니다. 멀티 스레딩 및 파이프 라인 구성을 처리하기위한 API의 차이점이 있습니다.

RxJava는 Stream과 상당히 다릅니다. 모든 JDK 것들 중, rx.Observable에 가장 가까운 아마도 java.util.stream.Collector (즉, 별도의 모나드 층을 다루는 사이의 변환을 처리해야하는 비용으로 제공 스트림 + CompletableFuture 콤보 Stream<CompletableFuture<T>>CompletableFuture<Stream<T>>).

Observable과 Stream 사이에는 중요한 차이점이 있습니다.

  • 스트림은 풀 기반이며, Observable은 푸시 기반입니다. 이것은 너무 추상적으로 들릴지 모르지만 매우 구체적인 결과를 초래합니다.
  • 스트림은 한 번만 사용할 수 있으며 Observable은 여러 번 구독 할 수 있습니다
  • Stream#parallel()시퀀스를 파티션으로 분할 Observable#subscribeOn()하고 Observable#observeOn()그렇지 않습니다. Stream#parallel()Observable로 동작 을 에뮬레이트하는 것은 까다 롭고 한 번 .parallel()방법이 있었지만이 방법은 너무 혼란 .parallel()스러워서 지원이 github RxJavaParallel의 별도 저장소로 이동되었습니다. 자세한 내용은 다른 답변에 있습니다.
  • Stream#parallel()선택적 스케줄러를 허용하는 대부분의 RxJava 메소드와 달리 사용할 스레드 풀을 지정할 수 없습니다. 이후 모든 JVM을에서 스트림 인스턴스가 동일한 사용 추가, 수영장 포크 - 조인 .parallel()실수로 프로그램의 다른 모듈의 동작에 영향을 미칠 수
  • 스트림은 같은 시간 관련 작업 부족 Observable#interval(), Observable#window()및 많은 다른 사람을; 스트림은 풀 기반이며 업스트림은 다음 요소 다운 스트림 언제 방출해야하는지 제어 없기 때문입니다.
  • 스트림은 RxJava와 비교하여 제한된 조작 세트를 제공합니다. 예를 들어 스트림에는 컷오프 작업이 없습니다 ( takeWhile(), takeUntil()). 해결 방법 Stream#anyMatch()은 제한적입니다. 터미널 작업이므로 스트림 당 두 번 이상 사용할 수 없습니다.
  • JDK 8부터는 Stream # zip 작업이 없으므로 때로는 유용합니다.
  • 스트림은 스스로 구성하기 어렵고, Observable은 여러 가지 방법으로 구성 할 수 있습니다 . 편집 : 의견에서 언급했듯이 스트림을 구성하는 방법이 있습니다. 그러나 비단 말 단락이 없기 때문에 파일에서 쉽게 라인 스트림을 생성 할 수 없습니다 (JDK는 즉시 파일 # 라인 및 BufferedReader # 라인을 제공하지만 스트림을 구성하여 다른 유사한 시나리오를 관리 할 수 ​​있음) Iterator에서).
  • 관찰 가능한 자원 관리 시설 ( Observable#using()); IO 스트림 또는 뮤텍스를 래핑하여 사용자가 리소스를 해제하는 것을 잊지 않도록해야합니다. 구독이 종료되면 자동으로 처리됩니다. 스트림에는 onClose(Runnable)메서드가 있지만 리소스를 사용하거나 리소스를 통해 직접 호출해야합니다. 예 : Files # lines () try-with-resources 블록으로 묶어야 한다는 것을 명심 해야합니다 .
  • Observables는 끝까지 동기화됩니다 (실제로 Streams가 동일한 지 여부는 실제로 확인하지 않았습니다). 이렇게하면 기본 작업이 스레드 안전인지 여부를 생각할 필요가 없습니다 (버그가 없으면 대답은 항상 '예'입니다).하지만 코드의 필요 여부에 관계없이 동시성 관련 오버 헤드가 있습니다.

반올림 : RxJava는 Streams와 크게 다릅니다. Real RxJava 대안은 Akka의 관련 부분과 같은 ReactiveStreams의 다른 구현입니다 .

업데이트 . 에 대해 기본이 아닌 포크 조인 풀을 사용하는 트릭이 있습니다. Java 8 병렬 스트림의 사용자 정의 스레드 풀을Stream#parallel 참조하십시오.

업데이트 . 위의 모든 내용은 RxJava 1.x 경험을 기반으로합니다. 지금 RxJava 2.x에서이 여기 ,이 답변이 오래된 될 수있다.


Java 8 Stream과 RxJava는 매우 비슷합니다. 그들은 비슷한 연산자 (필터, 맵, flatMap ...)를 가지고 있지만 동일한 사용법을 위해 만들어지지 않았습니다.

RxJava를 사용하여 비동기 작업을 수행 할 수 있습니다.

Java 8 스트림을 사용하면 컬렉션의 항목을 순회합니다.

RxJava (컬렉션의 트래버스 항목)에서 거의 동일한 작업을 수행 할 수 있지만 RxJava는 동시 작업에 중점을 두므로 동기화, 래치를 사용합니다. 따라서 RxJava를 사용하는 동일한 작업이 느릴 수 있습니다. Java 8 스트림으로.

RxJava는와 비교할 수 CompletableFuture있지만 둘 이상의 값을 계산할 수 있습니다.


기술적으로나 개념적으로는 약간의 차이가 있습니다. 예를 들어 Java 8 스트림은 단일 사용, 풀 기반 동기 값 시퀀스이며 RxJava Observables는 재확인 가능하고 적응 적으로 푸시 풀 기반이며 잠재적으로 비동기 값 시퀀스입니다. RxJava는 Java 6 이상을 목표로하며 Android에서도 작동합니다.


Java 8 스트림은 풀 기반입니다. 각 항목을 사용하는 Java 8 스트림을 반복합니다. 그리고 그것은 끝없는 흐름이 될 수 있습니다.

RXJava Observable는 기본적으로 푸시 기반입니다. Observable에 가입하면 다음 항목이 도착 onNext하거나 ( ), 스트림이 완료 onCompleted될 때 ( ), 오류가 발생했을 때 ( ) 알림을받습니다 onError. 함께하기 때문에 Observable당신이받는 onNext, onCompleted, onError이벤트, 당신은 다른 결합과 같은 몇 가지 강력한 기능을 할 수있는 Observable새에들 ( zip, merge, concat). 당신이 할 수있는 다른 일은 캐싱, 조절, ... 그리고 그것은 다른 언어 (RxJava, C #의 RX, RxJS 등)에서 거의 동일한 API를 사용합니다.

기본적으로 RxJava는 단일 스레드입니다. 스케줄러를 사용하지 않으면 모든 것이 동일한 스레드에서 발생합니다.


기존 답변은 포괄적이고 정확하지만 초보자에게는 명확한 예가 부족합니다. "푸시 / 풀 기반"및 "재 관측 가능"과 같은 용어 뒤에 구체적인 내용을 설명하겠습니다. 참고 : 나는 Observable(하늘을위한 스트림) 라는 용어를 싫어 하므로 J8 대 RX 스트림을 간단히 언급 할 것입니다.

정수 목록을 고려하십시오.

digits = [1,2,3,4,5]

J8 스트림은 콜렉션을 수정하는 유틸리티입니다. 예를 들어 짝수는 다음과 같이 추출 할 수 있습니다.

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

이것은 기본적으로 Python의 map, filter, reduce 이며 Java에 매우 훌륭하고 오래 연체되었습니다. 그러나 사전에 숫자가 수집되지 않은 경우-앱이 실행되는 동안 숫자가 스트리밍되는 경우-짝수를 실시간으로 필터링 할 수 있습니까?

Imagine a separate thread process is outputting integers at random times while the app is running (--- denotes time)

digits = 12345---6------7--8--9-10--------11--12

In RX, evencan react to each new digit and apply the filter in real-time

even = -2-4-----6---------8----10------------12

There's no need to store input and output lists. If you want an output list, no problem that's streamable too. In fact, everything is a stream.

evens_stored = even.collect()  

This is why terms like "stateless" and "functional" are more associated with RX


RxJava is also closely related to the reactive streams initiative and considers it self as a simple implementation of the reactive streams API (e.g. compared to the Akka streams implementation). The main difference is, that the reactive streams are designed to be able to handle back pressure, but if you have a look at the reactive streams page, you will get the idea. They describe their goals pretty well and the streams are also closely related to the reactive manifesto.

The Java 8 streams are pretty much the implementation of an unbounded collection, pretty similar to the Scala Stream or the Clojure lazy seq.


Java 8 Streams enable processing of really large collections efficiently, while leveraging multicore architectures. In contrast, RxJava is single-threaded by default (without Schedulers). So RxJava won't take advantage of multi-core machines unless you code that logic yourself.

참고URL : https://stackoverflow.com/questions/30216979/difference-between-java-8-streams-and-rxjava-observables

반응형