irpas技术客

JAVA Reactor API ( Flux和Mono)的简单使用_CJ_simple_java mono

irpas 6049

1. 创建Flux及Mono 1.1 使用just从现有的已知内容和大小的数据创建Flux或Mono Flux.just(new Integer[]{1, 2, 3, 4}) //观察者监听被观察者(消费者) .subscribe(System.out::println); //使用可变参数创建Flux Flux.just(1, 2, 3, 4) .subscribe(System.out::println); //使用just创建Mono Mono.just("1s") .subscribe(System.out::println); Mono.just(new Integer[]{1, 2, 3, 4}) .subscribe(System.out::println); 1.2 使用fromIterable从可迭代对象中创建Flux //从可迭代的对象中创建Flux Flux.fromIterable(Arrays.asList(1,2,3,4)) .subscribe(System.out::println); ArrayList<Integer> list = Lists.newArrayList(1, 2, 3, 4); Flux<Integer> flux = Flux.fromIterable(list); //在创建Flux后追加元素 list.add(5); flux.subscribe(System.out::println); 1.3 使用fromStream从集合流中创建Flux Flux.fromStream(Stream.of(1,2,3,4)) .subscribe(System.out::println); 1.4 使用range中创建一个范围内迭代的Flux Flux.range(0,10) .subscribe(System.out::println); 1.5 创建定时发送Flux 1.5.1 使用interval创建间隔某一时间异步执行的Flux //interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列 Flux.interval(Duration.ofMillis(100)) // map可以对数据进行处理 .map(i->"执行内容:"+i) //限制执行10次 .take(5) .subscribe(System.out::println); //避免主线程提前结束 Thread.sleep(1100); 1.5.2 使用delayElements延时发送 Flux.fromIterable(Lists.newArrayList(1,2,3,4)) //延时发送 .delayElements(Duration.ofMillis(100L)) .subscribe(System.out::println); //避免主线程提前结束 Thread.sleep(1100); 1.6 Flux与Mono之间的相互转换 1.6.1 Flux与Mono之间的相互转换 Flux<Integer> just = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); just.subscribe(System.out::println); Mono<List<Integer>> mono = just.collectList(); mono.subscribe(System.out::println); //自定义收集器 Mono<List<Integer>> monoList = just.collect(toList()); monoList.subscribe(System.out::println); //Mono转仅有一个元素的Flux Flux<List<Integer>> flux = mono.flux(); flux.subscribe(System.out::println); //将一个元素的Flux转Mono Mono<Integer> single = Flux.just(1).single(); single.subscribe(System.out::println); 1.6.2 使用concatWith从多个Mono组合成Flux Flux<String> flux = Mono.just("1").concatWith(Mono.just("2")); flux.subscribe(System.out::println); 1.6.3 使用concatWithValues追加Flux //连接多个Flux Flux.just("连接") //连接两个Flux .concatWith(Flux.just("两个")) //将元素追加到Flux .concatWithValues("或追加") .subscribe(System.out::print); 1.7 动态方法创建 Flux 1.7.1 generate动态创建Flux

generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件,定义如下。

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。这里要注意的是 next() 方法只能最多被调用一次。使用 generate() 方法创建 Flux 的示例代码如下。

// 同步动态创建, next() 方法只能最多被调用一次 Flux.generate(sink -> { sink.next("1"); //第二次会报错: //java.lang.IllegalStateException: More than one call to onNext //sink.next("2"); //如果不调用 complete() 方法,那么就会生成一个所有元素均为“1”的无界数据流 sink.complete(); }).subscribe(System.out::println);

如果想要在序列生成过程中引入状态,那么可以使用如下所示的 generate() 方法重载。 generate() 重载方法:

public static <T, S> Flux<T> generate( Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) { return onAssembly(new FluxGenerate<>(stateSupplier, generator)); }

示例:

Flux.generate(() -> 1, (i, sink) -> { sink.next(i); if (i == 5) { sink.complete(); } return ++i; }).subscribe(System.out::println);

这里我们引入了一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。()->1 设置初始态 显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。

1.7.2 create动态创建Flux

create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件,定义如下。

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 create() 方法创建 Flux 的示例代码如下。

Flux.create(sink -> { for (int i = 0; i < 5; i++) { sink.next("Tang" + i); } sink.complete(); }).subscribe(System.out::println);

运行该程序,我们会在系统控制台上得到从“Tang0”到“Tang4”的 5 个数据。

1.8 Mono 对象创建响应式流

对于 Mono 而言,可以认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。

justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下。

Mono.justOrEmpty(Optional.of("Tang")).subscribe(System.out::println);

另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件,示例代码如下。

Mono.create(sink -> sink.success("Tang")).subscribe(System.out::println);

使用fromCallable动态创建Mono

Mono.fromCallable(() -> { Thread.sleep(1000); return "1"; }).subscribe(System.out::println); 2.异常处理 2.1 创建包含异常的Flux和Mono //直接创建一个包含异常的Flux Flux.error(new Exception()); //直接创建一个包含异常的Mono Mono.error(new Exception()); 2.2 异常处理 Mono.just("1") //连接一个包含异常的Mono .concatWith(Mono.error(new Exception("Exception"))) //异常监听 .doOnError(error -> System.out.println("错误: " + error)) //在发生异常时将其入参传递给订阅者 .onErrorReturn("ErrorReturn") .subscribe(System.out::println); 2.2.1 调用subscribe可以指定需要处理的消息类型

consumer:正常消费,errorConsumer:异常处理,completeConsumer:消费完成

public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) Flux.just(1,2,3,4) .concatWith(Mono.error(new Exception("Exception"))) .subscribe(System.out::println,System.err::println,()-> System.out.println("完成")); 2.2.2 通过onErrorResume当发生异常时重新产生新的流 Flux.just(1,2,3,4) .concatWith(Mono.error(new Exception("Exception"))) .onErrorResume(e -> { System.out.println(e); return Flux.just(11,12,13); }) .subscribe(System.out::println); 2.2.3 retry重试 Flux.just(1,2,3,4) .concatWith(Mono.error(new Exception("Exception"))) .retry(1) .subscribe(System.out::println); 3. 常用方法 3.1 merge 、mergeSequential、mergeComparing 3.1.1 merge

merge按照所有流中元素的实际产生序列来合并

Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5), Flux.interval(Duration.ofMillis(10)).take(3)) .log() .subscribe(); Thread.sleep(1000); 3.1.2 mergeSequential

mergeSequential按照所有流被订阅的顺序,以流为单位进行合并。 例如: FluxA 和FluxB 只有在A消费完后才会去消费B

Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5), Flux.interval(Duration.ofMillis(10)).take(3)) .log() .subscribe(); Thread.sleep(1000); 3.1.3 mergeComparing

消费两个流中较小的那个

prefetch:比较个数 comparator:比较器 sources:数据流

public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) Flux.mergeComparing(Flux.just(1,2,9,4,76,6), Flux.just(2,75,4,3,5,6)) .log() .subscribe(); Thread.sleep(1000); 3.2 buffer、bufferTimeout、bufferWhile、bufferUntil

把当前流中的元素收集到集合中,并把集合对象作为流中的新元素

3.2.1 buffer: 收集为集合

当maxSize > skip 时 重叠 如3,2 [1,2,3],[3,4,5],[5,6,7],[7,8,9],[9,10] 当maxSize < skip 时 重叠 如3,4 [1,2,3],[5,6,7],[9,10] 当maxSize = skip 时 准确分割 等价于只传maxSize 如3,3 [1,2,3],[4,5,6],[7,8,9],[10]

Flux.range(1, 10) // .buffer(3,2) // .buffer(3,4) // .buffer(3,3) .buffer(3) .subscribe(System.out::println); 3.2.2 bufferTimeout: 可以根据maxSize 或 skip 满足其一就可以切割集合 Flux.interval(Duration.ofMillis(100L)) .bufferTimeout(9,Duration.ofMillis(1000L)) .subscribe(System.out::println); Thread.sleep(10000); 3,2.3 bufferWhile: 则只有当Predicate返回true时才会收集。一旦为false,会立即开始下一次收集。 Flux.range(1, 10) .bufferWhile(i -> i % 2 == 0) .subscribe(System.out::println); 3.2.4 bufferUntil: 会一直收集直到Predicate返回true。 Flux.range(1, 10) .bufferUntil(i -> i % 2 == 0) .subscribe(System.out::println);

cutBefore true : Predicate返回true放到下一个集合中

Flux.range(1, 10) .bufferUntil(i -> i % 2 == 0,true) .subscribe(System.out::println); 3.3 Filter 对流中包含的元素进行过滤

对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素。

Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println); 3.4 zipWith 把当前流中的元素与另一个流中的元素按照一对一的方式进行合并

把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。多的元素被舍弃,可以通过BiFunction函数对合并的元素进行处理

Flux.just(1, 2) .zipWith(Flux.just(3, 4)) .subscribe(System.out::println); //通过BiFunction函数对合并的元素进行处理 Flux.just(1, 2,3) .zipWith(Flux.just(4, 5), (s1, s2) -> s1 + "-" + s2) subscribe(System.out::println); 3.5 take 用来从当前流中提取元素 //提取指定数量的元素 Flux.range(1, 1000) .take(10) .subscribe(System.out::println); //按时间间隔提取元素 Flux.interval(Duration.ofMillis(10)) .take(Duration.ofMillis(100)) .subscribe(System.out::println); Thread.sleep(1000); //提取最后N个元素 Flux.range(1, 1000) .takeLast(10) .subscribe(System.out::println); //当Predicate返回true时才进行提取 Flux.range(1, 1000) .takeWhile(i -> i < 10) .subscribe(System.out::println); //提取元素直到Predicate返回true Flux.range(1, 1000) .takeUntil(i -> i == 10) .subscribe(System.out::println); 3.6 reduce和reduceWith 归约操作,可以进行累加累乘等操作 Flux.range(1, 100) .reduce((x, y) -> x + y) .subscribe(System.out::println); //设定默认值 Flux.range(1, 100) .reduce(100,(x, y) -> x + y) .subscribe(System.out::println); //可以设置Supplier初始值 Flux.range(1, 100) .reduceWith(() -> 100, (x ,y) -> x + y) .subscribe(System.out::println) 3.7 flatMap和flatMapSequential 把流中的每个元素转换成一个流,再把所有流中的元素进行合并。 //flatMap按实际生产顺序进行合并 Flux.just(5, 10) .flatMap(x -> Flux.interval( Duration.ofMillis(x * 10), Duration.ofMillis(100)).take(x) ) .subscribe(System.out::println); Thread.sleep(1000); //flatMapSequential按订阅顺序进行合并 Flux.just(5, 10) .flatMapSequential(x -> Flux.interval( Duration.ofMillis(x * 10), Duration.ofMillis(100)).take(x) ) .subscribe(System.out::println); Thread.sleep(1000); 3.8 concatMap 把流中的每个元素转换成一个流,再把所有流进行合并

concatMap会根据原始流中的元素顺序依次把转换之后的流进行合并,并且concatMap堆转换之后的流的订阅是动态进行的,而flatMapSequential在合并之前就已经订阅了所有的流。

Flux.just(5, 10) .concatMap(x -> Flux.interval( Duration.ofMillis(x * 10), Duration.ofMillis(100)).take(x) ) .subscribe(System.out::println); Thread.sleep(1000); 3.9 combineLatest 把所有流中的最新产生的元素合并成一个新的元素

把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

Flux.combineLatest(Arrays::toString, Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5), Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5)) .subscribe(System.out::println); Thread.sleep(1000); 3.10 使用skip跳过元素 //跳过指定条数 Flux.just(1,2,3,4,5,6,7) .skip(2) .subscribe(System.out::println); //跳过指定时间间隔 Flux.interval(Duration.ofMillis(100)) .skip(Duration.ofMillis(300)) .log() .subscribe(); Thread.sleep(1000); 3.11 使用distinct去重 Flux.just(1,1,2,2,5,6,7) .distinct() .subscribe(System.out::println); 3.12 从Flux获取首元素和尾元素 Flux<Integer> just = Flux.just(1, 2, 3, 4, 5, 6, 7); Mono<Integer> last = just.last(); Mono<Integer> first = just.next(); 3.13 从Flux阻塞式取一个元素 Flux<String> flux = Flux.create(skin -> { for (int i = 0; i < 2; ++i) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } skin.next("这是第" + i + "个元素"); } skin.complete(); }); //flux订阅者所有操作都是无副作用的,即不会改变原flux对象数据 //阻塞式订阅,只要有一个元素进入Flux String first = flux.blockFirst(); //输出: 这是第0个元素 System.out.println(first); //还是输出: 这是第0个元素 System.out.println(flux.blockFirst()); //输出: 这是第1个元素 System.out.println(flux.blockLast()); //还是输出: 这是第1个元素 System.out.println(flux.blockLast()); 3.14 监听:doOnError、doOnComplete、doFinally、doOnSubscribe Flux.just(1,2,3,4,5,6,7,8,9) .concatWith(Flux.error(new Exception())) //错误时执行 .doOnError(e -> System.out.println("报错:" + e)) //完成时执行 .doOnComplete(()-> System.out.println("数据接收完成")) //最后执行 .doFinally(t-> System.out.println("最后执行信息:" + t)) .subscribe(System.out::println); //消费者参与前执行的最后一件事,入参为消费者对象(一般用于修改、添加、删除源数据流) Flux.just(1,2,3,4,5,6,7,8,9) .log() .doOnSubscribe(i ->{ System.out.println("先请求2个"); i.request(2); System.out.println("再请求3个"); i.request(3); i.cancel(); System.out.println("取消监听"); }) .subscribe(System.out::println); 4. 背压------主动控制订阅量 4.1 原始的Subscriber::onNext Flux.interval(Duration.ofMillis(10L)) .subscribe(new Subscriber<Long>() { Subscription subscription; AtomicInteger count = new AtomicInteger(); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; //首先请求5个 subscription.request(5); count.set(5); } @Override public void onNext(Long aLong) { System.out.print(" value:" + aLong); try { Thread.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } if (count.decrementAndGet() <= 0){ System.out.println(" 消费完成,重新请求5个"); subscription.request(5); count.set(5); } } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { System.out.println("全部消费完成"); } }); Thread.sleep(5000L); 4.2 BaseSubscriber Flux.range(1,50) .log() .subscribe(new BaseSubscriber<Integer>() { private int count = 0; private final int limit = 5; @Override protected void hookOnSubscribe(Subscription subscription) { request(limit); } @Override protected void hookOnNext(Integer value) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } if (++count == limit){ request(count); count = 0; } } }); 4.3 limitRate Flux.interval(Duration.ofMillis(10L)) .take(10) .log() .limitRate(4) .subscribe(); Thread.sleep(1000L);

当已经处理75%的数据量时会重新请求下一批数据


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #Mono #1 #创建Flux及Mono11 #Integer1 #2