Project

Kafka 역직렬화 에러 해결하기

kinim329 2026. 5. 4. 23:33

문제 상황

asset-service에서 Kafka Consumer를 실행하던 중 다음과 같은 에러가 발생했다

Error deserializing VALUE for partition competition.ticked-0

그리고 핵심 원인은 아래 코드였다

No type information in headers and no default type provided

처음에는 Kafka 연결 문제나 Producer 설정 문제라고 생각했지만, 로그를 확인해보니 실제 원인은 Consumer가 Kafka 메시지를 객체로 변환하지 못하는 문제였다.

 

문제가 발생한 코드

asset-service에는 competition.ticked 토픽을 구독하는 Consumer가 있었다.

@Slf4j
@Component
@RequiredArgsConstructor
public class CompetitionTickedEventConsumer {

    private final RankingService rankingService;

    @KafkaListener(
            topics = "${topics.competition.ticked}",
            groupId = "${spring.kafka.consumer.group-id}"
    )
    public void handleCompetitionTicked(CompetitionTicked payload) {
        rankingService.updateRanking(payload.competitionId());
    }
}

이 코드는 Kafka에서 메시지를 받아 CompetitionTicked 객체로 바로 변환하려고 한다.

문제는 Spring Kafka의 JsonDeserializer가 이 메시지를 어떤 클래스 타입으로 변환해야 하는지 알 수 없었다는 점이다.

왜 이런 문제가 발생했을까?

Kafka 메시지는 내부적으로 바이트 형태로 저장된다.

Producer가 객체를 Kafka에 보낼 때는 객체를 JSON 같은 형태로 변환해서 보내고, Consumer는 다시 그 JSON을 Java 객체로 변환해야 한다.

이 과정을 각각 다음과 같이 부른다.

Producer: 직렬화
Consumer: 역직렬화
 

이번 문제는 Consumer 쪽에서 발생했다.

즉, 메시지는 Kafka에 들어왔지만 Consumer가 다음을 알지 못했다.

이 JSON을 어떤 Java 클래스에 담아야 하지?
 

그래서 다음과 같은 에러가 발생했다.

No type information in headers and no default type provided
 

의미는 다음과 같다.

Kafka 메시지 헤더에 타입 정보가 없고,
Consumer 설정에도 기본 타입이 지정되어 있지 않다.

해결방법 1

가장 간단한 해결 방법은 application.yml에 기본 역직렬화 타입을 지정하는 것이다.

CompetitionTicked의 패키지가 다음과 같다면:

io.antcamp.assetservice.domain.event.payload.CompetitionTicked
 

yml에 아래 설정을 추가할 수 있다.

spring:
  kafka:
    consumer:
      group-id: asset-service
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
        spring.json.value.default.type: io.antcamp.assetservice.domain.event.payload.CompetitionTicked
 

이렇게 설정하면 Consumer는 Kafka 메시지를 읽을 때 기본적으로 CompetitionTicked 타입으로 변환하려고 한다.

해결방법 2

다만 asset-service가 앞으로 여러 Kafka 이벤트를 받을 수 있다면 문제가 생길 수 있다.

예를 들어 다음과 같은 토픽을 모두 구독한다고 가정한다.

competition.started
competition.finished
competition.ticked
 

각 토픽의 payload 타입이 다르다면 spring.json.value.default.type 하나로 모든 메시지를 처리하기 어렵다.

이 경우에는 Kafka 메시지를 먼저 String으로 받고, Consumer 내부에서 직접 DTO로 변환하는 방식이 더 명확할 수 있다.

yml 설정:

spring:
  kafka:
    consumer:
      group-id: asset-service
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 

Consumer 코드:

@Slf4j
@Component
@RequiredArgsConstructor
public class CompetitionTickedEventConsumer {

    private final RankingService rankingService;
    private final ObjectMapper objectMapper;

    @KafkaListener(
            topics = "${topics.competition.ticked}",
            groupId = "${spring.kafka.consumer.group-id}"
    )
    public void handleCompetitionTicked(String message) {
        try {
            CompetitionTicked payload =
                    objectMapper.readValue(message, CompetitionTicked.class);

            rankingService.updateRanking(payload.competitionId());
        } catch (Exception e) {
            log.error("competition.ticked 이벤트 변환 실패. message={}", message, e);
        }
    }
}
 

이 방식은 각 Consumer마다 어떤 DTO로 변환할지 코드에서 직접 확인할 수 있다는 장점이 있다.

내가 선택한 방법

이번 상황에서는 해결방법 2인 Kafka 메시지를 String으로 받은 뒤 ObjectMapper로 직접 변환하는 방식을 선택했다.

asset-service는 앞으로 하나의 Kafka 이벤트만 받는 것이 아니라 여러 이벤트를 받을 수 있다.

예를 들어 다음과 같은 이벤트를 받을 수 있다.

competition.started
competition.finished
competition.ticked
 

각 이벤트는 역할이 다르고, payload 구조도 다를 수 있다.

이런 상황에서 spring.json.value.default.type으로 기본 타입을 하나만 지정하면 모든 Kafka 메시지를 하나의 DTO 타입으로 변환하려고 할 수 있다.

하지만 이벤트마다 payload 타입이 다르면 이 방식은 관리하기 어렵다.

그래서 Consumer에서는 메시지를 먼저 String으로 받고, 각 Consumer 내부에서 필요한 DTO로 직접 변환하도록 했다.

 
@Slf4j
@Component
@RequiredArgsConstructor
public class CompetitionTickedEventConsumer {

    private final RankingService rankingService;
    private final ObjectMapper objectMapper;

    @KafkaListener(
            topics = "${topics.competition.ticked}",
            groupId = "${spring.kafka.consumer.group-id}"
    )
    public void handleCompetitionTicked(String message) {
        try {
            CompetitionTicked payload =
                    objectMapper.readValue(message, CompetitionTicked.class);

            rankingService.updateRanking(payload.competitionId());
        } catch (Exception e) {
            log.error("competition.ticked 이벤트 변환 실패. message={}", message, e);
        }
    }
}
 

이렇게 하면 각 Consumer가 어떤 이벤트를 어떤 DTO로 변환하는지 코드에서 명확하게 확인할 수 있다.

정리

이번 문제는 Kafka 연결 문제가 아니라 Consumer 역직렬화 문제였다.

Kafka 메시지는 정상적으로 들어왔지만, Consumer가 JSON 메시지를 어떤 Java 객체로 변환해야 하는지 알 수 없어서 에러가 발생했다.

핵심 에러는 다음과 같았다.

No type information in headers and no default type provided
 

해결 방법은 크게 두 가지였다.

1. application.yml에 spring.json.value.default.type을 지정한다.
2. 메시지를 String으로 받은 뒤 ObjectMapper로 직접 변환한다.
 

이번 프로젝트에서는 여러 Kafka 이벤트를 받을 가능성이 있기 때문에 두 번째 방법을 선택했다.

이벤트마다 payload 구조가 다를 수 있으므로, 각 Consumer에서 필요한 DTO로 직접 변환하는 방식이 더 명확하다고 판단했다.