본격 적인 채팅을 개발하기 앞서서
카프카 기본 설정을 해주겠다.
나중에 내가 다시 채팅을 개발할 것이라는 다짐으로
최선을 다해서 작성할 예정인데,,,
솔직히 자신은 없다..

참고로, 이는 카프카 개념공부가 아닌,
기본 설정환경을 이야기하는 것이므로,
앞으로 나올 모든 게시글에서 언급되는
그룹id, producer, consumer 와 같은 개념은
설명하지 않고 넘어가도록 하겠습니다..
Kafka 의존성 추가
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
//이건 ImmutableMap 사용하려고 추가한건데 나중에 kafka 설정에 필요한 것이니 추가하는 것이 좋다.
implementation 'com.google.guava:guava:33.1.0-jre'
Kafka Producer 설정
Kafka producerFactory를 생성하는 클래스를 만들어야 한다.
//kafka producer 설정 클래스를 작성합니다.
@EnableKafka
@Configuration
public class ProducerConfiguration {
//카프카 서버 주소. 로컬에서는 보통 localhost:9092로 설정
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// Kafka ProducerFactory를 생성하는 Bean 메서드
@Bean
public ProducerFactory<String, MessageSendDTO> messageSendProducerFactory() {
return new DefaultKafkaProducerFactory<>(messageSendProducerConfigurations());
}
@Bean
public ProducerFactory<String, ChatRoomResponseDTO> chatRoomProducerFactory() {
return new DefaultKafkaProducerFactory<>(chatRoomProducerConfigurations());
}
// Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
@Bean
public Map<String, Object> messageSendProducerConfigurations() {
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
.build();
}
@Bean
public Map<String, Object> chatRoomProducerConfigurations() {
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
.build();
}
// KafkaTemplate을 생성하는 Bean 메서드
@Bean
public KafkaTemplate<String, MessageSendDTO> messageSendKafkaTemplate() {
return new KafkaTemplate<>(messageSendProducerFactory());
}
@Bean
public KafkaTemplate<String, ChatRoomResponseDTO> chatRoomKafkaTemplate() {
return new KafkaTemplate<>(chatRoomProducerFactory());
}
}
나는 이벤트로, 채팅방과 채팅을 처리해주려고 했기 때문에,
객체값만 다른 Bean 메서드는 2개씩 존재한다.
그리고 2개의 주의사항이 있다!
이벤트로 처리하는 객체는 직렬화, 역직렬화를 해주어야 하는데,
그것을 위한 설정은
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
이렇게 해주는 것을 확인할 수 있을 것이다.
이때, 나는 value값을 객체로 넣었기 때문에,
JsonSerializer를 사용해야 한다.
(string이면 StringSerializer 사용하면 됨)
그리고 또 한가지!!
아래와 같은 패키지를 import해야한다!!!
안그러면 카프카 자체가 생성안됨
(해당 주의사항은 Consumer에서도 동일합니다)
import org.apache.kafka.common.serialization.StringSerializer; (O)
import org.springframework.kafka.support.serializer.JsonSerializer; (O)
//이런거 import 하면 큰일납니다 (카프카 자체가 생성안대여)
import com.fasterxml.jackson.databind.ser.std.StringSerializer; (x)
import com.fasterxml.jackson.databind.ser.std.JsonSerialize; (x)
Kafka Consumer 설정
이제는 kafka 설정 클래스를 작성해주어야 합니다.
// Kafka consumer 설정 클래스를 작성합니다.
@EnableKafka
@Configuration
public class ListenerConfiguration {
// 카프카 서버 주소. 보통은 localhost:9092
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// 이건 보통 latest로 설정함
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
// 그룹 id를 설정해준다. (나는 chat-group으로 해둠)
@Value("${spring.kafka.chat.group-id}")
private String chatGroupId;
// 그룹 id를 설정해준다. (나는 chat-room-group으로 해둠)
@Value("${spring.kafka.chat-room.group-id}")
private String chatRoomGroupId;
// KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
@Bean
ConcurrentKafkaListenerContainerFactory<String, MessageSendDTO> kafkaMessageSendContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageSendDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(messageSendconsumerFactory());
return factory;
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, ChatRoomResponseDTO> kafkaChatRoomContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ChatRoomResponseDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(chatRoomconsumerFactory());
return factory;
}
// Kafka ConsumerFactory를 생성하는 Bean 메서드
@Bean
public ConsumerFactory<String, MessageSendDTO> messageSendconsumerFactory() {
JsonDeserializer<MessageSendDTO> deserializer = new JsonDeserializer<>();
// 패키지 신뢰 오류로 인해 모든 패키지를 신뢰하도록 작성
deserializer.addTrustedPackages("*");
// Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정
// Kafka Consumer 설정값은 일반적으로 애플리케이션 실행 중에 변경되지 않아야 하는 고정된 구성값이어서 ImmutableMap을 사용해야 함
Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
.put(ConsumerConfig.GROUP_ID_CONFIG, chatGroupId)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
.build();
return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
}
@Bean
public ConsumerFactory<String, ChatRoomResponseDTO> chatRoomconsumerFactory() {
JsonDeserializer<ChatRoomResponseDTO> deserializer = new JsonDeserializer<>();
// 패키지 신뢰 오류로 인해 모든 패키지를 신뢰하도록 작성
deserializer.addTrustedPackages("*");
// Kafka Consumer 구성을 위한 설정값들을 설정 -> 변하지 않는 값이므로 ImmutableMap을 이용하여 설정
// Kafka Consumer 설정값은 일반적으로 애플리케이션 실행 중에 변경되지 않아야 하는 고정된 구성값이어서 ImmutableMap을 사용해야 함
Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
.put(ConsumerConfig.GROUP_ID_CONFIG, chatRoomGroupId)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
.build();
return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
}
}
해당 설정도 producer와 마찬가지로,
나는 채팅방과 채팅을 처리해주려고 했기 때문에,
객체값만 다른 Bean 메서드는 2개씩 존재한다.
그리고 consumerFactory를 보면, 불변 객체인 ImmutableMap 객체를 볼 수 있는데,
설정 값이 나중에 변하는 것을 막기 위해서 사용한 객체이다.
이를 사용하기 위해서는 (위의 의존성 추가에서도 확인하였겠지만) guava 의존성을 추가해주어야 한다.
이로써, 채팅을 개발하기 위한 카프카 기본 세팅은 완료했고,
다음에는 STOMP 세팅으로 넘어가겠다!
'Back-End > JavaSpring' 카테고리의 다른 글
MSA에서 채팅 개발하기 (Spring, STOMP, Kafka, MongoDB, MySQL) - MongoDB 세팅 #3 (0) | 2025.04.17 |
---|---|
MSA에서 채팅 개발하기 (Spring, STOMP, Kafka, MongoDB, MySQL) - STOMP 세팅 #2 (0) | 2025.04.13 |
QueryMethod, JPQL, QueryDSL 중에서 어떤것을 사용할까 (0) | 2025.04.10 |
Entity, 영속성 컨텍스트 알아보기 (0) | 2025.04.01 |
Bean과 Spring IoC 컨테이너 (0) | 2025.03.31 |