Event-Driven Architecture is a software design pattern or model to build an application. The even-driven systems are designed to capture and process the event between the decouple services.
In an even-driven architecture, two systems can communicate with each other asynchronously.
The source code is available on (getButton) #text=(GitHub) #icon=(download) #color=(#000000)
What is Spring Cloud Stream?
The Spring Cloud Stream is a framework that comes under the umbrella of the Spring Cloud. The Spring Cloud stream offers us the opportunity to build an event-driven microservice architecture.
The framework offers a versatile programming architecture with support for persistent pub/sub semantics, consumer groups, and stateful partitions, all of which are built on well-known and established Spring idioms and best practices.
Spring Cloud Stream has a Binder abstraction that may be used to connect to physical destinations at external middleware.
Spring Cloud Stream supports a number of binder implementations, including but not limited to RabbitMQ
, Apache Kafka
, and Amazon Kinesis
. In this lecture, we will use the Kafka stream
.
What is Apche Kafka?
Apache Kafka is a distributed event streaming platform that is open source and used by hundreds of businesses for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
It is a publish-subscribe messaging system that functions as an interface for two parties: the sender and the recipient. More information on kafka may be found getButton) #text=(Apache Kafka: The Secret Sauce to Real-Time Data Streaming) #icon=(link) #color=(#2339bd)
Overview of the Spring Cloud Stream and Kafka Stream projects
In this lecture, we will build a project named BookMyMovie. This project will be in charge of booking the movie show, and we will construct this software leveraging the even-driven architecture with the spring cloud stream and kafka stream.
To begin, we will construct the Rest API by utilizing the Spring cloud stream and the Kafka stream to receive the payload (a movie ticket book request), which will then send the payload data in JSON format to the Kafka topic, i.e., movie-ticket-request-topic
.
We will create a stream processing with Kafka stream in order to look for the event in the movie-ticket-request-topic
kafka topic and perform real-time activities before sending to the movie-ticket-booked-topic
kafka topic.
After that, we'll look at how to consume messages from the movie-ticket-request-topic
kafka topic.
BookMyMovie Service
The application design is as follows: a user sends a request to the backend Rest API, after which the application validates the movie ticket request and transforms the data before sending it back to the Kafk topic i.e., movie-ticket-request-topic
.
Dependencies
In your project, include the following dependencies: The build.gradle file is shown below.
plugins {
id 'java'
id 'org.springframework.boot' version '3.1.6'
id 'io.spring.dependency-management' version '1.1.4'
}
group = 'in.learnjavaskills'
version = '0.0.1-SNAPSHOT'
java {
sourceCompatibility = '17'
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2022.0.4")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('bootBuildImage') {
builder = 'paketobuildpacks/builder-jammy-base:latest'
}
tasks.named('test') {
useJUnitPlatform()
}
Configuration of the Kafka Binder Properties
The spring.cloud.stream.kafka.binder.brokers
property is the list of brokers to which the Kafka binder connects.
Default: localhost(alert-success)
spring.cloud.stream.kafka.binder.brokers = 3.80.128.59:9092
Because I'm hosting my Kafka server on an AWS EC2 instance in a single broker node, I must supply my EC2 instance's public IP address together with the right port, which in my case is 3.80.128.59:9092
.
As you can see here, I utilized the IP address of the Kafak broker server node, however this is not the best practice. You should always utilize host names rather than IP addresses.(alert-warning)
If you are running Kafka on your local system, you do not need to include this setting in your application. Spring cloud stream search kafa broker server is on localhost:9092
by default.
Controller
To accept user input, we may offer a single rest endpoint that will receive user input and then transmit the same data to the kafka topic, i.e., movie-ticket-request-topic.
package in.learnjavaskills.bookmymovieservice.controller;
import in.learnjavaskills.bookmymovieservice.dto.MovieTicketBookingRequest;
import in.learnjavaskills.bookmymovieservice.service.InitiateMovieTicketBookingService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/movies/")
public class BookMyMovieController
{
private final InitiateMovieTicketBookingService initiateMovieTicketBookingService;
public BookMyMovieController(InitiateMovieTicketBookingService initiateMovieTicketBookingService) {
this.initiateMovieTicketBookingService = initiateMovieTicketBookingService;
}
@PostMapping("book-ticket")
ResponseEntity<String> bookMovieTicket(@RequestBody MovieTicketBookingRequest movieTicketBookingRequest) {
return initiateMovieTicketBookingService.publishMovieTicketBookingMessage(movieTicketBookingRequest);
}
}
The MovieTicketBookingRequest.class
, as shown in the book-ticket API, is used to accept the user's request payload. Let's make the MovieTicketBookingRequest class quickly.
package in.learnjavaskills.bookmymovieservice.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import java.time.LocalDateTime;
import java.util.Objects;
public class MovieTicketBookingRequest
{
private String movieName;
private byte screenNumber;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime movieShowTime;
private String seatNumber;
private String user;
// getter, setter, toString, equal and hashcode method's
}
InitiateMovieTicketBookingService class
It's time to build some business logic in InitiateMovieTicketBookingService.class
and then send a message to the movie-ticket-request-topic Kafka topic.
package in.learnjavaskills.bookmymovieservice.service;
import in.learnjavaskills.bookmymovieservice.dto.MovieTicketBookingMessage;
import in.learnjavaskills.bookmymovieservice.dto.MovieTicketBookingRequest;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
import java.util.Objects;
@Service
public class InitiateMovieTicketBookingService
{
private final StreamBridge streamBridge;
private final String topicName = "movie-ticket-request-topic";
public InitiateMovieTicketBookingService(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
/**
* Publishing message to the kafka topic
* @param movieTicketBookingRequest
* @return response
*/
public ResponseEntity<String> publishMovieTicketBookingMessage(MovieTicketBookingRequest movieTicketBookingRequest) {
if (Objects.isNull(movieTicketBookingRequest))
return ResponseEntity.badRequest().body("Movie Ticket Booking Request must be non null");
MovieTicketBookingMessage movieTicketBookingMessage = new MovieTicketBookingMessage(movieTicketBookingRequest);
Message<MovieTicketBookingMessage> movieTicketBookingMessageMessage = generateMessage(movieTicketBookingMessage);
boolean isMessageSend = streamBridge.send(topicName, movieTicketBookingMessageMessage, MimeTypeUtils.APPLICATION_JSON);
if (isMessageSend)
return ResponseEntity.ok("Movie booking ticket initiated success");
else return ResponseEntity.status(HttpStatus.EXPECTATION_FAILED)
.body("Unable to send message to kafka topic");
}
/**
* Generate message to send to the kafka topic
* @param movieTicketBookingMessage
* @return Message
*/
private Message<MovieTicketBookingMessage> generateMessage(MovieTicketBookingMessage movieTicketBookingMessage) {
if (Objects.isNull(movieTicketBookingMessage))
return null;
return MessageBuilder.withPayload(movieTicketBookingMessage)
.build();
}
}
Let's build a MovieTicketBookingMessage
record that is in charge of collecting Kafka message data.
package in.learnjavaskills.bookmymovieservice.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.LocalDateTime;
public record MovieTicketBookingMessage(String movieName, byte screenNumber,
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
LocalDateTime movieShowTime,
String seatNumber,
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
LocalDateTime messageGeneratedOn,
String users)
{
public MovieTicketBookingMessage(MovieTicketBookingRequest movieTicketBookingRequest) {
this (movieTicketBookingRequest.getMovieName(),
movieTicketBookingRequest.getScreenNumber(),
movieTicketBookingRequest.getMovieShowTime(),
movieTicketBookingRequest.getSeatNumber(),
LocalDateTime.now(),
movieTicketBookingRequest.getUser());
}
}
In the spring cloud stream, how do I send a message to the Kafka topic?
To deliver the message into the Kafka topic, the Spring cloud stream provides the StreamBridge
class. The StreamBridge class is part of the org.springframework.cloud.stream.function package.
The StreamBridge class allows users to transfer data to an output binding.
While the user rarely needs to manually transmit data in a normal spring-cloud-stream application situation, there are instances when the sources of data are outside of the spring-cloud-stream context, and so we need to bridge such foreign sources with spring-cloud-stream.
Because our data comes from an API, we must utilize StreamBridge in this scenario.
We can transmit the message to the Kafka topic using the send
method of the StreamBridge class. The StreamBridge class contains an extensive number of overloaded send methods. Here in the above-mentioned example, we utilize the following method:.
public boolean send(String bindingName, Object data, MimeType outputContentType)
The bindingName
is simply the name of the Kafka topic. The data
argument can represent our kafak message, and the MimeType
field can describe what sort of message we are delivering to the kafka topic.
Message interface in the spring cloud stream
The Kafa message payload can be wrapped in the Message
interface. The Message interface is found in the package org.springframework.messaging and is used for representing messages with the header and body.
The generateMessage()
method wraps the payload, i.e., MovieTicketBookingMessage, inside the Message interface.
If you're wondering how I can send the message to the Kafka topic with the key, that's also feasible using the Message interface since we simply need to include the header for the key to send the message into the Kafka topic with the key.
MessageBuilder.withPayload(movieTicketBookingMessage)
.setHeader(KafkaHeaders.KEY, "YOUR_KEY_HERE".getBytes())
.build();
Booking Process Service
Create a streaming application with Spring Cloud Stream and Kafka Stream.
The booking-process-service
is a streaming service that reads the Kafka message from the movie-ticket-request-topic
and writes it to the movie-ticked-booked-topic
.
The Kafka Streams binder supports the three primary forms of Kafka Streams: KStream
, KTable
, and GlobalKTable
.
Functional Style
Beginning with Spring Cloud Stream 3.0.0, the Kafka Streams binder allows applications to be created and developed utilizing the functional programming style accessible in Java 8.
This means that the applications may be expressed concisely as a lambda expression of the kinds java.util.function.Function
or java.util.function.Consumer
.
Process Movie Ticket Booking Function
Let's use the functional interface to develop a function(processMovieTicketBooking) for Kafka streaming. This function is in charge of reading the message from the movie-ticket-request-topic and writing it back into the movie-ticked-booked-topic.
package in.learnjavaskills.bookingprocessservice.function;
import in.learnjavaskills.bookingprocessservice.dto.MovieTicketBookedSuccessfully;
import in.learnjavaskills.bookingprocessservice.dto.MovieTicketBookingMessage;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.util.Base64;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
@Configuration
public class MovieTicketBookingProcess
{
@Bean("processMovieTicketBooking")
public Function<KStream<String, MovieTicketBookingMessage>, KStream<String, MovieTicketBookedSuccessfully>> processMovieTicketBooking() {
return movieBookingRequestMessageKStream -> movieBookingRequestMessageKStream.map( (messageKey, movieBookingRequestMessage) -> {
System.out.println("New message arrived with key : " + messageKey + " and values : " + movieBookingRequestMessage.toString());
Optional<MovieTicketBookedSuccessfully> movieTicketBookedSuccessfully = bookMovieTicket(movieBookingRequestMessage);
return new KeyValue<String, Optional<MovieTicketBookedSuccessfully>>(messageKey, movieTicketBookedSuccessfully);
}).filter((messageKey, movieTicketBookedSuccessfullyOptional) -> {
if (movieTicketBookedSuccessfullyOptional.isPresent()) {
MovieTicketBookedSuccessfully movieTicketBookedSuccessfully = movieTicketBookedSuccessfullyOptional.get();
System.out.println("Booking movie ticket for movie id : " + movieTicketBookedSuccessfully.movieId());
return true;
} return false;
}).map((messageKey, movieTicketBookedSuccessfully) -> new KeyValue<>(messageKey, movieTicketBookedSuccessfully.get()));
}
private Optional<MovieTicketBookedSuccessfully> bookMovieTicket(MovieTicketBookingMessage movieTicketBookingMessage) {
if (Objects.isNull(movieTicketBookingMessage))
return Optional.empty();
Optional<String> moviePassCode = generateMoviePassCode(movieTicketBookingMessage);
if (!moviePassCode.isPresent())
return Optional.empty();
MovieTicketBookedSuccessfully movieTicketBookedSuccessfully = new MovieTicketBookedSuccessfully(movieTicketBookingMessage.movieName() + UUID.randomUUID().toString(),
movieTicketBookingMessage.users(),
moviePassCode.get(),
LocalDateTime.now());
return Optional.of(movieTicketBookedSuccessfully);
}
private Optional<String> generateMoviePassCode(MovieTicketBookingMessage movieTicketBookingMessage) {
if (Objects.isNull(movieTicketBookingMessage))
return Optional.empty();
String code = movieTicketBookingMessage.movieName() + " " + movieTicketBookingMessage.users()
+ " " + movieTicketBookingMessage.seatNumber();
try {
MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
byte[] digest = messageDigest.digest(code.getBytes(StandardCharsets.UTF_8));
return Optional.of(Base64.getEncoder().encodeToString(digest));
}
catch (NoSuchAlgorithmException e) {
return Optional.empty();
}
}
}
For the sake of simplicity, I've omitted the database communication here. You may also utilize the database to validate or store movie tickets details. (alert-passed)
The processMovieTicketBooking
function is binding with the single inbound with the KStream<String, MovieTicketBookingMessage> and single outbound with the KStream<String, MovieTicketBookedSuccessfully>.
package in.learnjavaskills.bookingprocessservice.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import java.time.LocalDateTime;
public record MovieTicketBookingMessage(String movieName, byte screenNumber,
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
LocalDateTime movieShowTime,
String seatNumber,
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
LocalDateTime messageGeneratedOn,
String users)
{}
package in.learnjavaskills.bookingprocessservice.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.time.LocalDateTime;
public record MovieTicketBookedSuccessfully(String movieId,
String user,
String moviePassCode,
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
LocalDateTime messageGeneratedOn)
{}
What exactly is KStream?
KStream<K,V>
is an interface in the org.apache.kafka.streams.kstream package.
KStream is an abstraction of a record stream of KeyValue
pairs, i.e., each record in the real world is an individual entity or event. For example, if user A purchases two goods, item1 and item2, there may be two records <K:item1> and <K:item2>, in the stream.
What exactly is KeyValue?
The class KeyValue<K, V>
is part of the org.apache.kafka.streams package.
For each Kafka Streams record, a key-value pair is defined. If the record is directly from a Kafka topic, its key/value is the message key/value.
Kafka Broker Configuration in the Booking Process Service
First, in the application.properties
file, we must configure our Kafka broker node, and I have named my service booking-process-service-application.
# application name
spring.application.name = booking-process-service-application
# broker server node
spring.cloud.stream.kafka.binder.brokers = 54.146.205.212:9092
Then you may give spring.cloud.function.definition=processMovieTicketBooking
definitions. With the binder's functional composition support
# kafka binder name - method name of your function
spring.cloud.function.definition = processMovieTicketBooking
Enable the kafka consumer group to consume messages in a group. This property allows you to consume the message in the form of a group name; in our example, booking-process-service-application.
# kafka consumer group name
spring.cloud.stream.bindings.processMovieTicketBooking.group = ${spring.application.name}
The following are the Kafka topic name binder and deserialization configuration properties:.
# kafka topic name
spring.cloud.stream.bindings.processMovieTicketBooking-in-0.destination = movie-ticket-request-topic
spring.cloud.stream.bindings.processMovieTicketBooking-out-0.destination = movie-ticket-booked-topic
# deserialization configuration
spring.cloud.stream.kafka.bindings.processMovieTicketBooking.in-0.consumer.configuration.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.processMovieTicketBooking.in-0.consumer.configuration.value.deserializer = org.springframework.kafka.support.serializer.JsonDeserializer
spring.cloud.stream.kafka.bindings.processMovieTicketBooking.in-0.producer.configuration.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.processMovieTicketBooking.in-0.producer.configuration.value.deserializer = org.springframework.kafka.support.serializer.JsonDeserializer
Dependencies
dependencies {
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
// required this dependencies to deserialize java 8 date/time
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.0'
}
Update Booked Status Service
Finally, using the Spring Cloud Stream and the Kafka Stream, we will develop another service to consume messages from the Kafka topic.
In this service, we will utilize the Java 8 consumer interface to consume the message from the Kafak topic in a functional approach, similar to the previous example.
Let's utilize Java 8's Consumer Interface to consume messages from the Kafka topic. Because it receives just one argumentase input and does not give any response, the consumer interface is utilized to consume the message.
Create a bean called consumeMovieTickedBookingMessage
that we will associate with in the properties file to consume the message from Kafka.
package in.learnjavaskills.updatebookedstatusservice.function;
import in.learnjavaskills.updatebookedstatusservice.dto.MovieTicketBookedSuccessfully;
import in.learnjavaskills.updatebookedstatusservice.repository.MovieTicketRepository;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
@Configuration
public class MovieTicketBookingConsumer
{
private final MovieTicketRepository movieTicketRepository;
@Autowired
public MovieTicketBookingConsumer(MovieTicketRepository movieTicketRepository) {
this.movieTicketRepository = movieTicketRepository;
}
@Bean("consumeMovieTickedBookingMessage")
public Consumer<KStream<String, MovieTicketBookedSuccessfully>> consumeMovieTickedBookingMessage() {
return movieTicketBookingMessageKStream -> {
movieTicketBookingMessageKStream.foreach((messageKey, message) -> {
System.out.println("Message Key : " + messageKey + " Message : " + message.toString());
// Saving the successfully booked movie ticked into the data base.
movieTicketRepository.save(message);
});
};
}
}
Kafka Broker Configuration in the Update Booked Status Service
# application name
spring.application.name = update-booked-status-service-application
# broker server node
spring.cloud.stream.kafka.binder.brokers = 54.146.205.212:9092
# kafka binder name - method name of your function
spring.cloud.function.definition = consumeMovieTickedBookingMessage
# kafka consumer group name
spring.cloud.stream.bindings.consumeMovieTickedBookingMessage.group = ${spring.application.name}
# kafka topic name
spring.cloud.stream.bindings.consumeMovieTickedBookingMessage-in-0.destination = movie-ticket-booked-topic
# deserialization configuration
spring.cloud.stream.kafka.bindings.consumeMovieTickedBookingMessage.in-0.consumer.configuration.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.consumeMovieTickedBookingMessage.in-0.consumer.configuration.value.deserializer = org.springframework.kafka.support.serializer.JsonDeserializer
# adding the trusted package to deserialize the JSON data else it will throw to add the trusted package exception
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages = '*'
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.use.type.headers = false
Dependencies
dependencies {
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
// required this dependencies to deserialize java 8 date/time
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.0'
}
The source code is available on (getButton) #text=(GitHub) #icon=(download) #color=(#000000)
Conclussion
Spring CLoud Stream 3.0 encourages us to consume or produce messages in the Kafak topic using the functional approach.We covered how to use the Spring Cloud Stream with the Kafka Stream in this presentation.
As you can see in this example, I utilized the IP address of the Kafak broker server node, however this is not the best practice. You should always utilize host names rather than IP addresses.
Keep learning and keep growing.