project reactor - Error Handling Asynchronous Events with a ConnectableFlux -


i have similar requirement author of post: handling errors in fluxsink continuously emits events.

i have implemented concatmap individually handle errors event handlers. however, have requirement have 2 operations occur asynchronously in parallel, before continuing next operation. therefore in code below, need modify handler2 , handler3 occur in parallel before proceeding handler4, instead of synchronously executing.

flux<objectfromqueue> flux = flux.create(fluxsink -> {      messagelistener.setfluxsink(fluxsink);      //messagelistener has 'process' method invoked there new message on queue. }, fluxsink.overflowstrategy.buffer);  connectableflux<objectfromqueue> connectableflux = flux.publish(); connectableflux     .concatmap(v -> mono.just(handler1.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .concatmap(v -> mono.just(handler2.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .concatmap(v -> mono.just(handler3.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .concatmap(v -> mono.just(handler4.handle(v))             .doonerror(errorhandler::handle)             .onerrorresume(mono.empty())     .subscribe() connectableflux.connect(); 

what best way configure connectableflux handle parallel, async events, while still ensuring error not terminate entire flux?

within concatmap function, tried create separate mono handler2 , handler3 , zip them make operations occur in parallel, not handling errors way want it. know of better approach making operations occur in parallel while still preventing errors terminating flux?


Comments

Popular posts from this blog

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -

python - Error while using APScheduler: 'NoneType' object has no attribute 'now' -