Transactional outbox is a resilience pattern used in event-driven microservices to ensure that events are delivered reliably to consumers and maintain data consistency. Specifically, the pattern ensures consistency between a service's database and a message broker. It guarantees that if a database transaction is committed, any associated domain events are also reliably published to the message broker.
In this article, we'll focus on a practical implementation of the transactional outbox pattern using Spring Boot, RabbitMQ, MongoDB as well as Docker. You can read more about the theory of the pattern and some different implementations here and here.
What problem are we solving?
We know that the database and message broker are separate systems that cannot be reliably covered by the same transaction without risking the operational issues of distributed transactions. Some failure scenarios we would like to solve are:
If the database transaction is rolled back after an event is published, consuming services will assume that any state change was committed and this could lead to inconsistencies between services.
When our service has a synchronous API (e.g. HTTP Rest), client requests are coupled to the message broker. If the broker is down temporarily, we have to return an error to the client as the service is unable to reliably commit to the database and send the domain event. The expectation therefore would be for the client handle this error.
The transactional outbox pattern solves both problems.
Firstly, by ensuring an event is only sent to the broker once the database transaction is successfully committed. Secondly, this pattern allows our service to re-attempt publication of the event asynchronously later once the broker is back online.
This pattern does, however, introduce additional complexity. This arises from the need to manage an outbox table (or collection) and ensure reliable event dispatching from the outbox.
Requirements
We must ensure that events recorded in the outbox are eventually written to the message broker, even if there are temporary failures when connecting or writing to it.
If our service fails to publish, it will need to try again and these retries may result in duplicate messages being sent. This delivery guarantee is known as "at-least-once delivery" and consumers must expect and be able to handle duplicate messages, so that processing the same event multiple times has the same result as processing the event once (also known as "idempotence"). We're ensuring that our services will be "eventually consistent".
Additionally, in this implementation, we will minimise the delay between committing the domain update and sending the event to ensure good performance. We'll do this by sending the event immediately if we can, or if not, using a scheduled process to do so later.
Finally, it's often important that events are processed in the correct sequential order to maintain consistency. If message ordering is indeed important, consider a log-based message broker like Kafka or Kinesis which provides order guarantees within a partition, rather than a queue-based JMS or AMQP style broker such as RabbitMQ. RabbitMQ is used here simply for demonstration purposes.
Approach
At a high level our approach will consist of:
A Spring Boot app with a HTTP API
MongoDB database
RabbitMQ message broker
Spring Boot integration testing with Testcontainers
Docker Compose to run the full stack
We'll model a small slice of the classic retail business domain with a basic order service and test the consumer using a Spring integration test.
Note, that a replica set with MongoDB is needed to enable multi-document transactions. This can be done locally with a single MongoDB node, which I'll show at the end, but this approach is, of course, only recommended for local testing.
Implementation
The microservice we'll implement is called order-service
and this provides an HTTP API to create orders using HTTP.
Domain & Event Model
We'll need an Order
business entity and OrderCreatedEvent
:
@Value
@Builder
@Immutable
@Document(collection = "order")
public class Order {
@Id
String id;
String customerId;
@With
Instant orderDate;
BigDecimal totalAmount;
}
OrderCreatedEvent
is the class that contains the event data that will be published to consuming services.
@Data
@SuperBuilder
@Jacksonized
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class OrderCreatedEvent extends DomainEvent {
public static final String EVENT_TYPE = "order.created";
@NonNull
private String orderId;
@Override
public String getEventType() {
return EVENT_TYPE;
}
}
OutboxDomainEvent
is an entity that is used to save the DomainEvent
to the outbox_domain_event
collection. This is the outbox collection.
@Builder
@Value
@Immutable
@Document(collection = "outbox_domain_event")
public class OutboxDomainEvent {
@Id
String id;
DomainEvent domainEvent;
}
OutboxApplicationEvent
is an internal Spring event that will help us set up correct transaction handling. The need for this will be explained later in the article.
@Value
public class OutboxApplicationEvent {
OutboxDomainEvent outboxDomainEvent;
}
Lombok is used here to keep things concise.
Order API
Here's our controller for creating orders:
@RestController
@RequestMapping("/api/orders")
@AllArgsConstructor
public class OrderController {
private final OrderService orderService;
private final OrderMapper orderMapper;
@PostMapping
public ResponseEntity<CreateOrderResponseDto> createOrder(
@Valid @RequestBody CreateOrderRequestDto requestDto) {
Order order = orderService.createOrder(requestDto);
CreateOrderResponseDto response = orderMapper.toCreateOrderResponse(order);
return new ResponseEntity<>(response, HttpStatus.ACCEPTED);
}
}
Service Layer
We'll need an OrderService
to create an order:
@Service
@AllArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OrderMapper orderMapper;
private final DomainEventPublisher domainEventPublisher;
@Transactional
public Order createOrder(CreateOrderRequestDto request) {
Order savedOrder = orderRepository.save(
orderMapper.toOrder(request)
.withOrderDate(Instant.now())
);
domainEventPublisher.publishEvent(
OrderCreatedEvent.builder()
.orderId(savedOrder.getId())
.build()
);
return savedOrder;
}
}
Notice that we've started a transaction with a method call to createOrder
using the @Transactional
annotation.
We'll also need an OutboxService
for publishing out of the outbox, which includes removing the event from the outbox:
@Service
@AllArgsConstructor
public class OutboxService {
private final RabbitEventPublisher rabbitEventPublisher;
private final OutboxDomainEventRepository outboxDomainEventRepository;
public void sendFromOutbox(OutboxDomainEvent outboxDomainEvent) {
rabbitEventPublisher.publishEvent(outboxDomainEvent.getDomainEvent());
outboxDomainEventRepository.delete(outboxDomainEvent);
}
}
Publishing Events
We've abstracted domain event publication so that service classes do not need to be concerned with the implementation details. Here's the publisher:
@Service
@AllArgsConstructor
public class DomainEventPublisher {
private final ApplicationEventPublisher applicationEventPublisher;
private final OutboxDomainEventRepository outboxDomainEventRepository;
@Transactional
public void publishEvent(DomainEvent domainEvent) {
OutboxDomainEvent outboxDomainEvent = outboxDomainEventRepository.save(
new OutboxDomainEvent(domainEvent));
applicationEventPublisher.publishEvent(
new OutboxApplicationEvent(outboxDomainEvent));
}
}
Notice that we've extended the database transaction over this method using the @Transactional
annotation. This means persisting a new Order
and saving the associated OutboxDomainEvent
occur in the same transaction, which meets our requirements.
Also notice that we've leveraged Spring's ApplicationEventPublisher to publish an internal OutboxApplicationEvent
. This provides a neat way of executing some logic after the transactional commits successfully. We'll use this mechanism to make our first attempt to send the event.
This listener for this is annotated with @TransactionalEventListener
:
@Component
@AllArgsConstructor
public class OutboxApplicationEventListener {
private final OutboxService outboxService;
@TransactionalEventListener
public void onOutboxApplicationEvent(OutboxApplicationEvent appEvent) {
outboxService.sendFromOutbox(appEvent.getOutboxEvent());
}
}
We used the OutboxService
here that we created previously. Remember that the Order
was already committed, so if anything fails from here until outboxDomainEventRepository.delete()
is called, the event will stay in the outbox and we'll have the opportunity to try sending it again.
Let's look at the implementation of that next but, before we do, here's how to send the DomainEvent
with RabbitTemplate
:
@AllArgsConstructor
@Component
public class RabbitEventPublisher {
private final RabbitTemplate rabbitTemplate;
private final Exchange eventExchange;
public void publishEvent(DomainEvent domainEvent) {
String routingKey = getRoutingKey(domainEvent);
rabbitTemplate.convertAndSend(eventExchange.getName(), routingKey, domainEvent);
}
private String getRoutingKey(DomainEvent domainEvent) {
return "route." + domainEvent.getEventType() + ".#";
}
}
Our publisher publishes events to a TopicExchange
called event.exchange
using wildcard route route.order.created.#
This will help visualise the call sequence for creating an order, saving to the outbox and publishing an application event. Also note the transaction boundaries:
Handling the application event looks like this:
Scheduling Failed Events
We'll scan the outbox_event
collection every 30 seconds for documents containing events that are older than 30 seconds. So we can be pretty confident these messages failed to send on the first attempt.
We'll also limit the number of documents to fetch in one go to 100 to ensure we never read too many in one go. Finally, we'll ensure the events are sorted in event time-sequence order.
This is an implementation of the polling publisher pattern.
Make sure to add @EnableScheduling
to the application or configuration class.
@Slf4j
@Component
@AllArgsConstructor
public class OutboxScheduler {
private final OutboxDomainEventRepository outboxDomainEventRepository;
private final OutboxService outboxService;
@Scheduled(fixedRate = 30000)
public void processOutbox() {
Instant oneMinuteAgo = Instant.now().minus(30, ChronoUnit.SECONDS);
Sort sort = Sort.by(Sort.Direction.ASC, "domainEvent.eventTimestamp");
PageRequest pageRequest = PageRequest.of(0, 100, sort);
List<OutboxDomainEvent> events = outboxDomainEventRepository
.findOlderThan(oneMinuteAgo, pageRequest);
log.debug("processing {} outbox events", events.size());
events.forEach(outboxService::sendFromOutbox);
}
}
It's worth bearing in mind that every instance of the order-service in a load-balanced configuration will be running this schedule, which means they may be racing to process the outbox messages. If that's an issue, consider using ShedLock but it's also worth repeating that consumers should expect and be able to handle duplicate events.
Testing
We'll need a queue that's bound to event.exchange
in our tests to simulate the consumer:
@Bean
public Queue orderCreatedQueue() {
return QueueBuilder.nonDurable("queue.shipping." +
OrderCreatedEvent.EVENT_TYPE).build();
}
@Bean
public Binding orderCreatedQueueBinding(
TopicExchange eventExchange, Queue orderCreatedQueue) {
return BindingBuilder
.bind(orderCreatedQueue)
.to(eventExchange)
.with("route." + OrderCreatedEvent.EVENT_TYPE + ".#");
}
The queue here is called queue.shipping.order.created
and the route used in the binding is a wildcard route we defined earlier.
Here's out integration test case from CreateOrderIT
for the happy path:
@Test
void createOrder_shouldSucceedAndSendDomainEvent() {
CreateOrderRequestDto request =
new CreateOrderRequestDto("1234", new BigDecimal("99.99"));
webTestClient.post()
.uri(baseUri() + "/api/orders")
.bodyValue(request)
.exchange()
.expectStatus().isCreated()
.expectBody()
.jsonPath("$.id").isNotEmpty()
.jsonPath("$.customerId").isEqualTo(request.customerId())
.jsonPath("$.orderDate").isNotEmpty()
.jsonPath("$.totalAmount").isEqualTo(request.totalAmount());
await().atMost(5, SECONDS).untilAsserted(() -> {
OrderCreatedEvent event = getEvent(orderCreatedQueue);
assertThat(event).isNotNull();
assertThat(event.getEventId()).isNotBlank();
assertThat(event.getOrderId()).isNotBlank();
assertThat(event.getEventTimestamp()).isNotNull();
});
List<OutboxDomainEvent> eventList =
outboxDomainEventRepository.findAll();
assertThat(eventList).hasSize(0);
}
Where getEvent()
simply reads from named Queue:
@SuppressWarnings("unchecked")
<T> T getEvent(Queue queue) {
return (T) rabbitTemplate.receiveAndConvert(queue.getName());
}
We successfully consumer from the queue and checked the outbox is empty, so that validates the happy path.
What about when publishing fails? We'll test two cases. In the first case, we'll use @SpyBean
to intentionally fail the rabbitEventPublisher
:
@SpyBean
RabbitEventPublisher rabbitEventPublisher;
@Test
void createOrder_shouldCreateOrderAndRetainDomainEventInOutboxWhenRabbitFails() {
doThrow(new RuntimeException("error"))
.when(rabbitEventPublisher)
.publishEvent(any(DomainEvent.class));
CreateOrderRequestDto request =
new CreateOrderRequestDto("1234", new BigDecimal("99.99"));
webTestClient.post()
.uri(baseUri() + "/api/orders")
.bodyValue(request)
.exchange()
.expectStatus().isCreated()
.expectBody()
.jsonPath("$.id").isNotEmpty()
.jsonPath("$.customerId").isEqualTo(request.customerId())
.jsonPath("$.orderDate").isNotEmpty()
.jsonPath("$.totalAmount").isEqualTo(request.totalAmount());
List<OutboxDomainEvent> eventList = outboxDomainEventRepository.findAll();
assertThat(eventList).hasSize(1);
}
Here we have a valid response but the even is still in the outbox pending publication.
It's worth noting what the Spring framework does with exceptions that are thrown from @TransactionalEventListener
. From the code for TransactionSynchronizationUtils
, you can see it simply logs them and does nothing else.
The test confirms our expectation that an event is left in the outbox. Secondly, we'll check that our scheduled task is able to send from the outbox:
@Test
void createOrder_shouldPublishDomainEventFromOutbox() {
String orderId = "54321";
OrderCreatedEvent domainEvent = OrderCreatedEvent.builder()
.orderId(orderId)
.eventTimestamp(Instant.now()
.minus(31, ChronoUnit.SECONDS))
.build();
outboxDomainEventRepository.save(OutboxDomainEvent.builder()
.domainEvent(domainEvent)
.build());
outboxScheduler.processOutbox();
await().atMost(5, SECONDS).untilAsserted(() -> {
OrderCreatedEvent event = getEvent(orderCreatedQueue);
assertThat(event).isNotNull();
assertThat(event.getEventId()).isNotBlank();
assertThat(event.getOrderId()).isEqualTo(orderId);
assertThat(event.getEventTimestamp()).isNotNull();
});
List<OutboxDomainEvent> eventList = outboxDomainEventRepository.findAll();
assertThat(eventList).hasSize(0);
}
Note that we're invoking the scheduler manually and the background schedule is turned off during testing so the test is quick and reliable, like this:
@ConditionalOnProperty(value = "app.scheduling.enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableScheduling
public class SchedulingConfig {
}
Which needs this property defined in application-it.yml
:
app:
scheduling:
enabled: false
Here's the configuration to run the test containers:
@TestConfiguration(proxyBeanMethods = false)
class TestcontainersConfig {
@Bean
@ServiceConnection
MongoDBContainer mongoDbContainer() {
return new MongoDBContainer(DockerImageName.parse("mongo:5.0"));
}
@Bean
@ServiceConnection
RabbitMQContainer rabbitContainer() {
return new RabbitMQContainer(DockerImageName.parse("rabbitmq:3-management"));
}
}
Run the tests with gradle integrationTest
.
Running with Docker Compose
As mentioned earlier, transactions require a replica set in MongoDB. Here's how to create a single node replica set with Docker Compose:
mongo:
image: mongo:5.0
hostname: mongo
container_name: mongo
environment:
MONGO_INITDB_DATABASE: txn_outbox
command: --replSet rs0 --bind_ip_all
ports:
- "27017:27017"
volumes:
- ./docker/mongo/initdb.d:/docker-entrypoint-initdb.d:ro
Notice that we need to mount an init script in the container as a volume for it to execute on startup. The script initialises the replica set and waits for the primary node to become available:
#!/bin/bash
set -e
mongo --eval "rs.initiate()"
function is_primary {
local result=$(mongo --eval "db.isMaster().ismaster" --quiet)
echo "$result"
}
while true; do
is_primary_result=$(is_primary)
if [ "$is_primary_result" = "true" ]; then
echo "Now the primary node in the replica set!"
break
fi
echo "Waiting to become the primary node..."
sleep 2
done
Now when we run docker-compose up
it will start a replica set and we can test with transactions enabled.
Conclusion
In this article we saw how domain events can be published reliably to a message broker using the transactional outbox pattern and Spring Boot.
The complete accompanying code for this article is available on github.