스프링 Spring
24. Spring Boot에서 Kafka 이벤트 스트리밍 시스템 구축
안녕하세요! 태마입니다.
Spring 기초 강좌입니다.
강좌의 경우
1. 주제 간단 정리
2. 상세 주제 정리
으로 이루어져 있습니다.
스프링 Spring
포스팅 시작하겠습니다 :)
1. 주제 간단 정리
1. Kafka란?
✔ Kafka는 분산형 이벤트 스트리밍 플랫폼으로, 대용량 데이터를 실시간으로 처리하는 데 최적화된 메시지 브로커
✔ 이벤트 중심(Event-Driven) 아키텍처에서 핵심적으로 활용됨
📌 Kafka의 주요 특징
분산 시스템 | 여러 개의 브로커(Broker)로 구성되어 확장성이 뛰어남 |
고성능 | 높은 처리량을 지원하며, 수백만 개의 메시지를 빠르게 처리 가능 |
내결함성(Fault Tolerance) | 데이터 복제(Replication)를 통해 장애가 발생해도 안정적으로 운영 가능 |
이벤트 스트리밍 | 이벤트 기반으로 데이터를 처리하고 실시간 분석 가능 |
📌 Kafka를 사용하면 "실시간 데이터 스트리밍 및 이벤트 중심 시스템을 구축 가능"
2. Kafka의 핵심 개념
✔ Kafka는 Publisher-Subscriber 모델을 기반으로 메시지를 주고받음
✔ 메시지는 브로커를 통해 여러 개의 컨슈머(Consumer)에게 전달됨
📌 Kafka 아키텍처 개념 정리
Producer(생산자) | 데이터를 생성하여 Kafka 브로커로 전송 |
Broker(브로커) | Kafka 서버, 메시지를 저장하고 관리 |
Topic(토픽) | 메시지를 저장하는 채널 (예: "orders", "payments") |
Partition(파티션) | 하나의 토픽을 여러 개로 나누어 병렬 처리 가능 |
Consumer(소비자) | Kafka에서 메시지를 읽어 처리 |
Consumer Group(소비자 그룹) | 여러 개의 컨슈머가 그룹을 이루어 메시지를 병렬로 처리 |
📌 Kafka를 활용하면 "이벤트 중심 시스템에서 대량의 데이터를 실시간으로 처리 가능"
✅ 여기까지 Kafka의 개념과 핵심 요소를 배웠습니다!
👉 "그렇다면, Spring Boot에서 Kafka를 어떻게 활용할까?"
✅ 2부에서 Spring Boot에서 Kafka를 설정하고, Producer & Consumer 구현하는 방법을 배워봅시다!
2. 상세 주제 정리
1. Spring Boot 프로젝트에서 Kafka 설정
✔ Kafka를 사용하려면 spring-kafka 의존성을 추가해야 함
📌 1️⃣ build.gradle 설정
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
}
✔ Kafka를 지원하는 라이브러리 추가
2. Kafka Producer 구현 (메시지 생성)
✔ Kafka의 Producer는 메시지를 생성하여 Kafka 브로커로 전송
📌 Kafka Producer 설정 (KafkaConfig.java)
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
✔ Kafka의 Producer를 설정하고, 메시지를 JSON 형식으로 직렬화하여 전송
📌 Kafka Producer 서비스 구현 (KafkaProducerService.java)
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Sent message: " + message);
}
}
✔ sendMessage() 메서드를 호출하면 Kafka 토픽으로 메시지를 전송
📌 Kafka 메시지 전송 컨트롤러 (KafkaController.java)
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducerService kafkaProducerService;
public KafkaController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaProducerService.sendMessage("test-topic", message);
return "Message sent to Kafka: " + message;
}
}
✔ 클라이언트에서 /kafka/send?message=hello 요청을 보내면 Kafka로 메시지가 전송됨
📌 Kafka Producer를 적용하면 "비동기적으로 데이터를 브로커로 전송 가능"
3. Kafka Consumer 구현 (메시지 소비)
✔ Kafka의 Consumer는 브로커에서 메시지를 읽어 처리
📌 Kafka Consumer 설정 (KafkaConfig.java 수정)
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
✔ Kafka의 Consumer 설정 추가
📌 Kafka 메시지 소비 서비스 구현 (KafkaConsumerService.java)
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consumeMessage(String message) {
System.out.println("Received message: " + message);
}
}
✔ Kafka에서 메시지를 읽어 로그에 출력하는 Consumer 구현
📌 Kafka Consumer를 적용하면 "실시간으로 메시지를 처리 가능"
✅ 여기까지 Spring Boot에서 Kafka 이벤트 스트리밍 시스템을 구축하는 방법을 배웠습니다!
👉 "그렇다면, Spring Boot에서 GraphQL API 개발(Spring GraphQL)은 어떻게 할까?"
✅ 다음 회차에서 **Spring Boot에서 GraphQL API 개발 (Spring GraphQL)**을 배워봅시다!
'IT Developer > Spring' 카테고리의 다른 글
Spring 기초 <28. Spring Boot와 Kubernetes를 활용한 클라우드 배포 전략> (0) | 2025.04.14 |
---|---|
Spring 기초 <27. Spring Boot에서 트랜잭션 관리 최적화 (@Transactional, REQUIRES_NEW)> (1) | 2025.04.13 |
Spring 기초 <26. JPA + QueryDSL을 활용한 복잡한 쿼리 최적화> (0) | 2025.04.12 |
Spring 기초 <25. Spring Boot에서 GraphQL API 개발 (Spring GraphQL)> (0) | 2025.04.11 |
Spring 기초 <23. Spring Boot에서 Multi-Tenancy 아키텍처 구현> (0) | 2025.04.09 |
Spring 기초 <22. Spring Security에서 RBAC(Role-Based Access Control) 적용> (1) | 2025.04.08 |
Spring 기초 <21. Spring Boot에서 웹소켓(WebSocket)과 실시간 데이터 처리> (1) | 2025.04.07 |
Spring 기초 <20. Spring WebFlux와 비동기 프로그래밍 (Reactor, Mono, Flux 개념)> (1) | 2025.04.06 |