adesso Blog

It is often necessary for services in distributed applications such as microservice architectures to notify each other about important business events. If, for instance, the price of an item changes in a central price service, this change event is potentially relevant for a billing service. Another example would be an order event in a shop service, which is also relevant for the billing service as this information is needed in order for it to generate an accompanying invoice for the order. For that reason, the price and shop service should send a message to the billing service to notify it that a technically relevant event like this has occurred. These messages are often referred to as integration events.

Integration events can be technically implemented in a number of ways, including via message queues or using HTTP POST requests. However, it is important to guarantee certain consistency properties. To explain, an integration event should only be sent if the processing of the business logic in the sending service has been successfully completed. Otherwise, if it is not possible to save the new price due to a database failure, no corresponding integration event may be sent. If this were to happen, for example, an order could fail on technical grounds, though an invoice would still be sent. Conversely, the integration event should not be lost if the business logic was successfully carried out.

As Vaughn Vernon explained in ‘Implementing Domain Driven Design’, the famous ‘red book’ of domain-driven design, the consistent delivery of integration events can be implemented through the use of an event store. In my blog post, I describe how this can be done in Java with the aid of Spring. In addition to Spring Data JPA, Spring application events are used in particular. It should be noted, however, that application events differ from integration events, as you find out in the next section.

Before we dive any deeper, let us first learn about the various different categories of push- and pull-based communication strategies. The solution described here should be employed whenever a service sends new events under the push principle. Whether the service sends the events directly to another service via an HTTP request or whether it publishes them in a message broker such as RabbitMQ, from which they only reach the actual recipient(s) indirectly, is immaterial.

Integration events and application events

Let us return to integration events and application events. Although they are related, there is one key difference between them.

  • As described above, integration events are used for communication between different services.
  • Spring application events are a way to exchange messages within a Java application, that is, within a single service. For this purpose, Spring provides constructs such as the ApplicationEvent abstract Java class, the ApplicationEventPublisher interface and the @EventListener annotation, which will be described in greater detail later on in this blog post.

As such, application events are concrete programming language constructs, while integration events are more of an abstract concept that can be implemented in various ways (at least in the context of this blog post). The following diagram describes the difference between application events and integration events:

Spring application events are a great way to decouple modules within a Java application. Let me explain why. Even if two modules (Module1 and Module2) communicate with each other via application events, this does not lead to strong dependencies between the modules. One module does not call the other, and there are also no import or build dependencies. The only thing that both modules need to know is the application event and the Spring Framework.

Consistent integration of events on the basis of an event store

As mentioned earlier in the introduction, there are challenges with regard to consistency when sending integration events. For instance, an integration event may not be sent if the underlying execution failed. In the example above, an event may not be sent for a price change if the commit for the price change in the price service fails. However, an integration event should be sent if the execution was successful.

In principle, it would be possible to ensure that these consistency requirements are met using distributed transactions, that is, via the XA standard based on the two-phase commit protocol. However, distributed transactions have gained a bad reputation, in part because they are difficult to implement and often lead to poor performance. In many cases, this option is not available anyway, since XA transactions are not permitted under the general framework. In the project on which this blog post is based, for example, REST was chosen as the technology that would be used to implement integration events. However, REST and HTTP POST requests do not support distributed transactions. And it cannot be assumed that other technologies such as message brokers support XA transactions either. For example, RabbitMQ does not.

Another way to ensure consistency is the approach described by Vernon in ‘Implementing Domain Driven Design’, which is based on an event store. It works as follows:

  • All integration events are serialised in the service in which they occur and stored in the same database as the application data. The table or tables in which the integration events are saved constitute the event store. In the example here, it is located in the same database as the price data.
  • The integration events are persisted in the same transaction in the event store in which the functional application data is persisted.
  • The integration events are only actually sent once a transaction has been successfully committed.

This solution ensures that integration events are only sent when the business logic in the sending service has been successfully implemented. The integration events are consistently persisted in the event store, meaning that it is also extremely easy to carry out multiple delivery attempts and, if necessary, to initiate a controlled, logged troubleshooting process should repeated delivery attempts fail. This is a significant advantage over a simpler solution that does not include an event store. In the latter case, the database transaction is completed first, after which the integration events are then sent from the context data of the request or Java thread that is running. If this solution (also known as a best effort one-phase commit) fails to send the request, the integration event is simply lost.

Unlike two-phase commit, the solution described here does not roll back the business logic if the integration event is not sent. In many cases, this is not a bad thing. If, for example, a price change or an order is only reported to the billing service with a delay, this is generally not an issue. Plus, it is better than having to reject an order simply because the billing service is not available at the time.

Implementation in Spring Data JPA and application events

We now want to deploy the approach outlined above in Java, this time on the basis of Spring application events and Spring Data JPA. Here, we are working on the assumption that a traditional relational database is used and that the integration events are to be sent via HTTP POST requests.

Basis for implementation: application events in Spring

Spring’s ApplicationEventPublisher is an interface that features the publishEvent method, which can be used to publish any object. This then makes it possible to respond to the events in various ways within the Spring application.

The easiest way is to use the @EventListener annotation, which allows you to register a method as an event listener. The method signature, or more precisely its parameter type, is used to declare which events the event listener is registered for. The call of an event listener annotated in this way is performed synchronously by default. This means that the service that publishes the event is only run once all listeners have been executed.

The @TransactionalEventListener annotation gives you greater control over the process. With it, the processing of the event can be placed in the context of a transaction, while the call of a listener can be linked to a specific phase of the transaction. For example, @TransactionalEventListener(phase=TransactionPhase.AFTER_COMMIT) specifies that the listener should only be called after a successful commit. Alternatively, execution can also take place during the BEFORE_COMMIT and AFTER_ROLLBACK phases.

Spring uses these mechanisms in-house. For instance, the ApplicationContext publishes the ContextStartedEvent and ContextRefreshedEvent framework events.

Overview of the solution

Our implementation uses the Spring functionality described above as follows:

  • The functional module sends the integration event with the relevant information on the price change via an ApplicationEventPublisher. This takes place within the transaction in which the price change is persisted via JPA.
  • The module for handling integration events, referred to below as the integration event service, uses the @EventListener to listen for the event. It serialises the event and persists it via JPA in the same database in which the functional data (in this case, the prices) is persisted. This takes place within the same transaction since the @EventListener runs synchronously.
  • The event is not sent by the integration event service to the other service(s) until the price change (and the event) have been successfully committed. This is triggered by @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT).

The diagram above illustrates this:

Publication of integration events in the functional module

The event in the functional module can be sent by a service, a DDD entity or another construct, depending on the architecture, whereby the basic process is always the same. To keep it simple, a service is assumed below. Integration events are easy to publish. The ApplicationEventPublisher is provided via dependency injection, while the integration event is published by simply calling publishEvent(). This can be seen in the code below in the context of our example based on a price change:

	
	@Service
		public class PriceService {
		    @Autowired
		    ApplicationEventPublisher eventpublisher
		    @Transactional
		    public void setPrice(String articleId, BigInteger value) {
		        //Neuen Preis speichern
		        //...
		        //Event publizieren
		        publisher.publishEvent(
		          new IntegrationEvent("PriceUpdate " + articleId + "<-" + value));
		    }
	

For simplicity’s sake, the payload in this code, that is, the contents of the integration event, consists of one single string. A more complex structure would certainly be required in a live application. By the way, there are no dependencies between the functional service and the IntegrationEventService in this solution – the functional module only needs to know the publisher and the IntegrationEvent.

Persistence of events in the event store

As indicated above, the application event published in the functional service can be intercepted and persisted in the integration event service via the @EventListener. This takes place in the same transaction since processing is done synchronously. For simplicity’s sake, we are working on the assumption that the integration event has been annotated as @Entity so that it can be persisted as follows:

	
	@Service
		public class IntegrationEventService {
		    @EventListener
		    public void persistEvent(IntegrationEvent event) {
		        eventRepository.save(event);
		    }
		    //...
	

In addition to the actual payload (in our example, a single string), the JPA entity also contains additional information, most importantly an ID and a timestamp. The following code includes is a Boolean flag that stores information on whether the integration event has already been dispatched.

	
	@Entity
		public class IntegrationEvent {
		    @Id
		    UID id;
		    @Column
		    String payload;
		    @Column
		    Instant timestamp;
		    @Column
		    boolean successfullyDispatched;
		    public IntegrationEvent(String payload) {        
		this(UUID.randomUUID(), payload, Instant.now(), false);    
		}
		}
	

Basically speaking, the event store is therefore a JPA repository for IntegrationEvents.

Dispatch of integration events

The event is only forwarded to other services once the transaction has been successfully committed. The @TransactionalEventListener annotation described above is used in the integration event service for this purpose. This ensures that the event is not sent until the transaction has been successfully completed.

	
	    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
		    @Transactional(Transactional.TxType.REQUIRES_NEW)
		    public void sendEvent(IntegrationEvent event) {
		        //Event per HTTP POST verschicken
		        httpClient.send(...);
		        event.setSuccessfullyDispatched(true);
		        eventStore.save(event);        
		}    
		}
	

In the code shown above, the event is dispatched via an HTTP request, though the process would be similar if it were to be published to a RabbitMQ message broker, for example. After the event has been dispatched, the dispatch status is updated and saved.

Retry mechanism

As described above, a retry mechanism can be implemented on the basis of the event store, which is triggered in the event of a failed dispatch. This can be done using Spring’s @Scheduled annotation:

	
	    @Scheduled(cron = "0 0 3 * * *") //Retry every day at 3 am
		    public void watchdog() {
		        List<IntegrationEvent> undispatchedEvents = 
		          eventStore.findAllBySuccessfullyDispatched(false);
		        for (IntegrationEven event : undispatchedEvents) {
		            sendEvent(event);
		        }
		    }
	

This means that we are done with our implementation.

Process in Spring

Despite all the advantages that Spring has to offer, the thing that makes it unique (first and foremost the use of generated interceptors to implement aspects or annotations) does not always make it easy to understand the resulting process flow. For this reason, the following diagram has been provided to illustrate how the key modules interact in the happy path, a scenario in which both the price change and the message dispatch are successful.

  • The @Transactional annotation of setPrice() causes Spring to generate an interceptor that adds transaction processing to the underlying method. This interceptor first initiates a new database transaction via the Spring TransactionManager.
  • The actual method body of setPrice() is not executed until this has been done. This saves the new price via JPA and then publishes an event in the ApplicationEventPublisher.
  • Spring’s ApplicationEventPublisher then immediately calls the persistEvent() method registered via @EventListener. This saves the integration event within the same database transaction.
  • In addition, the ApplicationEventPublisher or Spring ensures that the event is registered with the TransactionManager so that it can subsequently activate the @TransactionalEventListener (after a successful commit).
  • Execution of setPrice() is now terminated, and the Spring interceptor triggers a commit of the database transaction in the TransactionManager,
  • which in turn executes the commit via JDBC. It does not call the sendEvent() method registered using @TransactionalEventListener until after a successful commit.
  • This packs and sends the IntegrationEvent as an HTTP message. After a successful commit, it updates the delivery status of the IntegrationEvent and saves it using JPA.

Important points to consider

A special feature of the described solution is worth discussing. To explain, there are cases where an integration event may be sent several times. This happens, among other things, if the persistence of the information on the dispatch fails after dispatch. Even if this is far from ideal, it is easy to identify duplicates on the basis of the integration event’s ID in the receiving service and ignore them.

A real-life implementation would certainly be more complex than the very simple solution described here. The integration events would have more structure than in the example above, where they consist of a single string. They may not be directly persistable and would first need to be mapped to JPA entities. One option would be to save a counter for retries instead of mapping the successful delivery via a Boolean. It might make sense to distinguish between different recipients, among other things That said, the implementation of integration events on the basis of Spring application events and an event store remains a simple and therefore recommended approach on a conceptual level for the consistent, push-based delivery of integration events in distributed applications.

Would you like to learn more about exciting topics from the world of adesso? Then check out our latest blog posts.

Picture Henrik Grosskreutz

Author Henrik Grosskreutz

Henrik Grosskreutz is a software architect at adesso and has over 20 years of experience in the field of agile software development. His focus is on Java and software architecture - most recently in the context of distributed systems, containerisation and cloud technologies.

Save this page. Remove this page.