티스토리 뷰

728x90

 

 

요약
이 글은 Spring Boot 3.x + Spring for Apache Kafka 3.x 조합으로 로컬 카프카 클러스터(도커)와 애플리케이션을 연결해 프로듀서 → 브로커 → 컨슈머 흐름을 완성하는 실전 가이드다. Gradle 의존성, application.yml 설정, 메시지 발행 서비스와 @KafkaListener 컨슈머, DLQ·메트릭·통합 테스트(Embedded Kafka)까지 단계별로 구현하며, 각 설정이 성능·안정성에 미치는 영향을 함께 짚는다. 모든 예제는 최신 Spring Kafka 3.7.0 기준이며, Spring Boot 3.4.x-3.5.x와 호환된다.

1 프로젝트 초기 설정

1.1 Gradle build.gradle

plugins { id 'org.springframework.boot' version '3.4.1'; id 'io.spring.dependency-management' version '1.1.5' }
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'           // Kafka starter
    testImplementation 'org.springframework.kafka:spring-kafka-test'  // EmbeddedKafka
}

spring-kafka 3.7.x 는 kafka-clients 3.7.x, Spring Boot 3.4.x와 호환되므로 버전을 반드시 맞춘다.

1.2 Docker-Compose로 1-노드 클러스터

version: '3'
services:
  zookeeper: { image: confluentinc/cp-zookeeper:7.6.0, environment: ZOOKEEPER_CLIENT_PORT: 2181 }
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
    ports: ["9092:9092"]

10 초면 브로커가 기동되며 로컬 포트 9092로 접속할 수 있다. 

2 application.yml 핵심 파라미터

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: all
      compression-type: lz4
      linger-ms: 5
    consumer:
      group-id: blog-demo
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        max.poll.interval.ms: 300000
    listener:
      ack-mode: MANUAL_IMMEDIATE

acks=all 로 내구성을 확보하고 linger-ms·compression-type 으로 네트워크 효율을 높인다. 
컨슈머는 자동 커밋을 끄고 수동 커밋으로 정확-한-번 처리를 준비한다.

3 프로듀서 코드 — 서비스 계층에서 전송

@RequiredArgsConstructor
@Service
public class OrderProducer {
    private final KafkaTemplate<String, String> template;

    public CompletableFuture<SendResult<String,String>> send(String orderJson){
        return template.send("orders", orderJson);
    }
}

KafkaTemplate 은 프로듀서를 감싸며 토픽·파티션·키를 편리하게 지정할 수 있는 래퍼다.

4 컨슈머 코드 — 수동 커밋 & 오류 DLQ

@Component
@Slf4j
public class OrderListener {

    @KafkaListener(topics = "orders", containerFactory = "manualFactory")
    public void onMessage(ConsumerRecord<String,String> rec,
                          Acknowledgment ack){
        try {
            process(rec.value());   // 비즈니스 로직
            ack.acknowledge();      // 처리 후 커밋
        } catch (Exception ex){
            throw new ListenerExecutionFailedException("DLQ route", ex);
        }
    }
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> manualFactory(
        ConsumerFactory<String,String> cf, KafkaTemplate<String,String> tpl){

    var recoverer = new DeadLetterPublishingRecoverer(tpl,
        (r,e)-> new TopicPartition(r.topic()+".DLQ", r.partition())); // DLQ 라우팅
    var handler = new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3));

    var factory = new ConcurrentKafkaListenerContainerFactory<String,String>();
    factory.setConsumerFactory(cf);
    factory.setCommonErrorHandler(handler);          // 오류 → DLQ
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
}

DefaultErrorHandler + DeadLetterPublishingRecoverer 로 3회 재시도 후 DLQ 토픽으로 격리한다. 
컨테이너 팩토리는 리스너 스레드 풀·에러 핸들러·ack 모드를 한 곳에서 관리한다. 

5 통합 테스트 — @EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = "orders")
class OrderFlowTest {

    @Autowired KafkaTemplate<String,String> template;
    @Autowired OrderRepository repo;

    @Test
    void endToEnd(){
        template.send("orders","test");
        Awaitility.await().untilAsserted(() ->
            assertTrue(repo.existsById("test")));
    }
}

spring-kafka-test 모듈이 내장 브로커를 띄워 CI 환경에서도 외부 의존성 없이 검증할 수 있다. 

6 운영 관찰성 & 튜닝

  • Micrometer Metrics: KafkaClientMetrics 를 활성화하면 TPS·레코드 lag·에러율을 Prometheus/Grafana 에 노출한다. 
  • Heartbeat⭢세션: 파티션 처리 시간이 길다면 max.poll.interval.ms 를 조정해 하트비트 손실을 방지한다. 
  • 스키마 진화: Avro/Protobuf를 사용한다면 spring.kafka.properties.schema.registry.url 과 subject-compatibility 로 안전하게 관리한다.

7 정리

Spring Kafka는 스프링 친화적 Bean 구성카프카 저수준 기능(멀티 스레드 컨테이너·DLQ·트랜잭션·Micrometer)을 모두 제공한다. 본 예제처럼 KafkaTemplate 과 @KafkaListener 만으로도 완전한 발행/구독 파이프라인을 구축할 수 있으며, DefaultErrorHandler · EmbeddedKafka · Micrometer 로 운영 품질까지 확보할 수 있다. 다음 단계로는 트랜잭션 프로듀서/컨슈머, Exactly-Once processing.guarantee, Kafka Streams 통합을 학습해 실시간 분석과 데이터 싱크까지 확장해 보자.

 

 

 

728x90
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/06   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함