Kafka and WebSocket Integration: Building Efficient Real-Time Applications with Seamless Communication

Zuda Pradana Putra
6 min readDec 22, 2023

In the realm of modern software development, Apache Kafka and WebSocket have emerged as leading solutions for constructing efficient and real-time communication among application components. In this simple project I will use springboot on the server side.

Apache Kafka as a Message Broker

Apache Kafka serves as a distributed streaming platform, providing robust infrastructure for message exchange between applications. With its publish-subscribe model, Kafka ensures swift, reliable, and persistent message delivery, aiding in the synchronization of data across diverse systems.

WebSocket for Seamless Communication

WebSocket is a bidirectional protocol enabling real-time message exchange between clients and servers. By maintaining open connections persistently, WebSocket creates a responsive and efficient user experience, well-suited for features like real-time notifications and immediate updates.

Integration of Kafka and WebSocket

The integration of Kafka and WebSocket opens avenues for building robust real-time applications. Kafka acts as a dependable notification layer, while WebSocket provides a seamless communication channel between the server and clients. With this approach, applications can deliver interactive and responsive user experiences.

1. Create Spring Project:

First, make sure you have created a Spring Boot project. You can do this by using Spring Initializr or using a development tool like IntelliJ or Eclipse. Make sure you add the web, kafka, and websocket dependencies.

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

Create a configuration file for Kafka, usually in the application.properties or application.yml file. Customize it with your Kafka settings

# application.properties
spring.kafka.bootstrap-servers=your-kafka-bootstrap-server
spring.kafka.consumer.group-id=your-group-id

2. WebSocket Configurations:

Create a configuration class for WebSocket. Typically, you can create a configuration class that implements WebSocketConfigurer and defines endpoints and handlers for WebSocket.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();
}
}

The provided Java code represents a WebSocket configuration class in a Spring application. Annotated with @Configuration and @EnableWebSocketMessageBroker, it signifies its role in defining WebSocket endpoints and handlers. The configureMessageBroker method configures a simple in-memory broker, enabling clients to subscribe and receive messages on the "/topic" destination. The registerStompEndpoints method establishes the WebSocket endpoint at "/ws" and incorporates SockJS to facilitate fallback options. Overall, this configuration sets up essential components for WebSocket functionality, dictating how messages are brokered and specifying the WebSocket endpoint for client connections.

3. Service Kafka Producer and Consumer:

In the context of a Spring application, setting up Kafka producers and consumers is essential for seamlessly integrating Kafka messaging capabilities. Spring provides abstractions and convenient configurations through the Spring Kafka project, making it easier to create and manage Kafka producers and consumers.

@Service
@Slf4j
public class KafkaConsumerServices {
private static final String TOPIC = "public-chats"; // Ganti dengan nama topik yang Anda inginkan
private final SimpMessagingTemplate messagingTemplate;
private final List<ChatMessage> chatMessages = new ArrayList<>();
public KafkaConsumerServices(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@KafkaListener(topics = TOPIC, groupId = "my-group")
public void handleMessage(ChatMessage message) {
log.info("Received message from Kafka: {}", message);
chatMessages.add(message);
messagingTemplate.convertAndSend("/topic/public", message);
}
public List<ChatMessage> getChatMessages() {
// Implementasikan logika untuk mengembalikan daftar pesan dari Kafka
return new ArrayList<>(chatMessages);
}
}

The provided Java code represents a Spring service class, KafkaConsumerServices, responsible for consuming messages from a Kafka topic named "public-chats." Annotated with @Service and utilizing the @KafkaListener annotation, the class defines a method, handleMessage, to process incoming ChatMessage objects from Kafka. Upon receiving a message, the method logs the message, adds it to a list of chat messages, and uses Spring's SimpMessagingTemplate to broadcast the message to WebSocket subscribers on the "/topic/public" destination. The constructor injects the messaging template, and the class also provides a method, getChatMessages, to retrieve the list of accumulated chat messages. This code illustrates the integration of Kafka messaging with Spring WebSocket, allowing real-time communication of chat messages within a Spring-based web application.

@Service
public class KafkaProducerServices {
private static final String TOPIC = "public-chats"; // Ganti dengan nama topik yang Anda inginkan
private final KafkaTemplate<String, ChatMessage> kafkaTemplate;
public KafkaProducerServices(KafkaTemplate<String, ChatMessage> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(ChatMessage message) {
kafkaTemplate.send(TOPIC, message);
}
}

The provided Java code defines a Spring service class, KafkaProducerServices, responsible for producing and sending chat messages to a Kafka topic named "public-chats." Annotated with @Service, the class utilizes the KafkaTemplate to interact with the Kafka messaging system. The constructor injects a KafkaTemplate configured to handle messages with a specified topic and message type. The sendMessage method facilitates the sending of a ChatMessage to the designated Kafka topic. This service class encapsulates the functionality required for producing messages, ensuring seamless integration with Kafka within a Spring-based web application, particularly in the context of real-time chat message dissemination.

4. Structure Model Data:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {
private String sender;
private String content;
private MessageType type;
}

The provided Java class, ChatMessage, is a data model designed to encapsulate information related to chat messages within a messaging system. Annotated with Lombok annotations such as @Data, @Builder, @AllArgsConstructor, and @NoArgsConstructor, the class includes fields representing the sender's name, the content of the message, and the type of message (potentially distinguishing between various message formats). This model serves as the structure for messages that will be filled with sender information, content, and message type, and subsequently utilized in both producing and consuming messages within a Kafka-based system. In the context of WebSocket data exchange, instances of this class will be serialized and sent as messages, facilitating real-time communication by encapsulating sender details, message content, and the nature of the message type.

5. Create Controller and Consume data from kafka as an API

@RestController
@Slf4j
public class WebSocketController {
private final KafkaProducerServices kafkaProducerServices;
private final KafkaConsumerServices kafkaConsumerServices;
public WebSocketController(KafkaProducerServices kafkaProducerServices, KafkaConsumerServices kafkaConsumerServices) {
this.kafkaProducerServices = kafkaProducerServices;
this.kafkaConsumerServices = kafkaConsumerServices;
}

@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
public void handleChatMessage(@Payload ChatMessage message) {
// Send the message to Kafka
kafkaProducerServices.sendMessage(message);
}

@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
public void addUser(@Payload ChatMessage message, SimpMessageHeaderAccessor headerAccessor){
//add user to join room chat
log.info("User added: {}", message.getSender());
if (headerAccessor != null && headerAccessor.getSessionAttributes() != null) {
headerAccessor.getSessionAttributes().put("username", message.getSender());
} else {
log.error("headerAccessor or session attributes is null.");
}
kafkaProducerServices.sendMessage(message);
}
@MessageMapping("/chat.removeUser")
@SendTo("/topic/public")
public void removeUser(@Payload ChatMessage message, SimpMessageHeaderAccessor headerAccessor) {
// Menangani pengguna yang keluar (disconnect)
log.info("User disconnected: {}", message.getSender());
// Mengirim pesan ke topik "/topic/public"
kafkaProducerServices.sendMessage(message);
if (headerAccessor != null && headerAccessor.getSessionAttributes() != null) {
// Hapus atribut username dari sesi
headerAccessor.getSessionAttributes().remove("username");
} else {
log.error("headerAccessor or session attributes is null.");
}
}
@GetMapping("/api/chat")
public List<ChatMessage> getChatMessages() {
// Ambil data chat dari Kafka dan kirimkan ke frontend
return kafkaConsumerServices.getChatMessages();
}
}

In the final implementation of the web application, represented by the WebSocketController, WebSocket communication seamlessly integrates with Apache Kafka in a Spring Boot environment. This controller class orchestrates real-time messaging functionality through several annotated methods. The handleChatMessage method manages the sending of chat messages to the Kafka topic, bridging WebSocket and Kafka communication. Additionally, the addUser and removeUser methods handle user join and disconnect events, updating the chat room dynamically and relaying these events to both Kafka and WebSocket subscribers. Finally, the getChatMessages endpoint fetches and provides the chat message history from Kafka for display on the frontend. The entire controller serves as a crucial component in achieving a responsive and collaborative real-time chat experience by seamlessly combining WebSocket and Kafka functionalities within a Spring-based web application.

If you have cors problem when consuming API from kafka, you can add this in springboot application.

@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**").allowedOrigins("*");
};
};
}
simple-chat-apps

This article illustrates the effective integration between WebSocket and Apache Kafka in a Spring Boot environment to build responsive real-time web applications. By combining Kafka as a message broker and WebSocket for direct communication, applications can send and receive messages efficiently, creating an interactive user experience. Through classes such as WebSocketController, sending messages to Kafka from WebSocket, handling user join and disconnect events, and retrieving message history from Kafka, this article provides a comprehensive view of a reliable and flexible implementation. Overall, this integration opens up the potential to develop robust applications with real-time communication capabilities that combine the reliability of Kafka and the responsiveness of WebSocket.

--

--