spring integration - How to provide a response based on a publisher confirm -


i have web service ingests objects, sends notification on amqp, , returns json response requester. each request performed on single thread , trying implement publisher confirms , struggling on how should set up. have working don't way doing it.

the way doing is:

  • put headers on message
  • have publish-subscribe-channel 2 subscribers
  • subscriber 1) creates blocking queue ready , sends message on amqp
  • subscriber 2) begins pulling 5 seconds on queue until gets confirm
  • the amqp:outbound-channel-adapter sends publisher confirms service activator
  • the publisherconfirmreceiver receives confirm , puts in blocking queue causing subscriber 2's pulling complete , return result of confirm.

this technique work don't making assumption chain going receive message before waitforpublisherconfirm service activator publish subscribe channel. in case order matters regarding component receives message first.

if waitforpublisherconfirms service activator receives message first block thread 5 seconds, allow chain send message via amqp:outbound-channel-adapter.

i tried putting waitforpublisherconfirms after amqp:outbound-channel-adapter since outbound-channel-adapter doesn't "return" service activator never gets called after in chain.

i feel there should better way of doing this. goal wait publisher confirms (or timeout cannot find support in spring's publisher confirms) before sending response requester.

could me shape solution little better or let me know if ok rely on fact first subscriber publish-subscribe-channel receive message first.

sorry 1 long.

some configuration

<int:header-enricher input-channel="addheaders" output-channel="metadataingestnotifications">     <int:header name="routingkey" ref="routingkeyresolver" method="resolvereoutingkey"/>     <int:header name="notificationid" expression="payload.id" /> </int:header-enricher>  <int:chain input-channel="metadataingestnotifications" output-channel="nullchannel" >      <int:service-activator id="addpublisherconfirmqueue"         requires-reply="false"         ref="publisherconfirmservice"           method="addpublisherconfirmqueue" />      <int:object-to-json-transformer id="transformobjecttojson" />      <int-amqp:outbound-channel-adapter id="amqpoutboundchanneladapter"         amqp-template="rabbittemplate"         exchange-name="${productnotificationexchange}"         confirm-ack-channel="publisherconfirms"         confirm-nack-channel="publisherconfirms"         mapped-request-headers="*"         routing-key-expression="headers.routingkey"         confirm-correlation-expression="headers.notificationid" />  </int:chain>  <int:service-activator id="waitforpublisherconfirm"         input-channel="metadataingestnotifications"         output-channel="publisherconfirmed"         requires-reply="true"         ref="publisherconfirmservice"           method="waitforpublisherconfirm"  />   <int:service-activator id="publisherconfirmreceiver"                         ref="publisherconfirmservice"                         method="receivepublisherconfirm"                         input-channel="publisherconfirms"                         output-channel="nullchannel" /> 

class

public class publisherconfirmservice {      private final map<string, blockingqueue<boolean>> suspenders = new hashmap<>();      public message addpublisherconfirmqueue(@header("notificationid") string id, message m){         logmanager.getlogger(this.getclass()).info("adding publisher confirm queue.");         blockingqueue<boolean> bq = new linkedblockingqueue<>();         suspenders.put(id, bq);         return m;     }      public boolean waitforpublisherconfirm(@header("notificationid") string id) {         logmanager.getlogger(this.getclass()).info("waiting publisher confirms notification: " + id);         blockingqueue<boolean> bq = suspenders.get(id);         try {             boolean result = bq.poll(5, timeunit.seconds);             if(result == null){                 logmanager.getlogger(this.getclass()).error("the broker took long return publisher confirm. notificationid: " + id);                 return false;             }else if(!result){                 logmanager.getlogger(this.getclass()).error("the publisher confirm indicated message not confirmed. notificationid: " + id);                 return false;             }         } catch (interruptedexception ex) {             logmanager.getlogger(this.getclass()).error("something went wrong polling publisher confirm notificationid: " + id, ex);             return false;         }finally{             suspenders.remove(id);         }         return true;     }      public void receivepublisherconfirm(string id, @header(amqpheaders.publish_confirm) boolean confirmed){         logmanager.getlogger(this.getclass()).info("received publisher confirm notification: " + id);         if (suspenders.containskey(id)){             blockingqueue<boolean> bq = suspenders.get(id);             bq.add(confirmed);         }     }  } 

how take aggregator solution same purpose?

the <recipient-list-router> send message aggregator's input-channel , second channel <int-amqp:outbound-channel-adapter>.

the confirm-ack-channel must brings message same aggregator after transformation, e.g. proper extraction correlationkey , on.


Comments

Popular posts from this blog

c - Bitwise operation with (signed) enum value -

xslt - Unnest parent nodes by child node -

python - Healpy: From Data to Healpix map -