project reactor - Error Handling Asynchronous Events with a ConnectableFlux -
i have similar requirement author of post: handling errors in fluxsink continuously emits events.
i have implemented concatmap
individually handle errors event handlers. however, have requirement have 2 operations occur asynchronously in parallel, before continuing next operation. therefore in code below, need modify handler2 , handler3 occur in parallel before proceeding handler4, instead of synchronously executing.
flux<objectfromqueue> flux = flux.create(fluxsink -> { messagelistener.setfluxsink(fluxsink); //messagelistener has 'process' method invoked there new message on queue. }, fluxsink.overflowstrategy.buffer); connectableflux<objectfromqueue> connectableflux = flux.publish(); connectableflux .concatmap(v -> mono.just(handler1.handle(v)) .doonerror(errorhandler::handle) .onerrorresume(mono.empty()) .concatmap(v -> mono.just(handler2.handle(v)) .doonerror(errorhandler::handle) .onerrorresume(mono.empty()) .concatmap(v -> mono.just(handler3.handle(v)) .doonerror(errorhandler::handle) .onerrorresume(mono.empty()) .concatmap(v -> mono.just(handler4.handle(v)) .doonerror(errorhandler::handle) .onerrorresume(mono.empty()) .subscribe() connectableflux.connect();
what best way configure connectableflux
handle parallel, async events, while still ensuring error not terminate entire flux?
within concatmap
function, tried create separate mono
handler2
, handler3
, zip
them make operations occur in parallel, not handling errors way want it. know of better approach making operations occur in parallel while still preventing errors terminating flux?
Comments
Post a Comment