project reactor - Error Handling Asynchronous Events with a ConnectableFlux -


i have similar requirement author of post: handling errors in fluxsink continuously emits events.

i have implemented concatmap individually handle errors event handlers. however, have requirement have 2 operations occur asynchronously in parallel, before continuing next operation. therefore in code below, need modify handler2 , handler3 occur in parallel before proceeding handler4, instead of synchronously executing.

flux<objectfromqueue> flux = flux.create(fluxsink -> {      messagelistener.setfluxsink(fluxsink);      //messagelistener has 'process' method invoked there new message on queue. }, fluxsink.overflowstrategy.buffer);  connectableflux<objectfromqueue> connectableflux = flux.publish(); connectableflux     .concatmap(v -> mono.just(handler1.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .concatmap(v -> mono.just(handler2.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .concatmap(v -> mono.just(handler3.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .concatmap(v -> mono.just(handler4.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .subscribe() connectableflux.connect(); 

what best way configure connectableflux handle parallel, async events, while still ensuring error not terminate entire flux?

within concatmap function, tried create separate mono handler2 , handler3 , zip them make operations occur in parallel, not handling errors way want it. know of better approach making operations occur in parallel while still preventing errors terminating flux?


Comments