Apache Kafka 적용시키기
2024. 12. 31. 19:39ㆍTIL
Kafka 설치를 위한 docker-compse.yml 작성 (루트 디렉토리에 위치)
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
Docker 실행후 해당 라인 실행
docker-compose up -d
Spring Boot에서 Kafka를 사용하기 위한 의존성 추가
dependencies {
implementation("org.springframework.kafka:spring-kafka")
}
application.yml 에 kafka 설정 추가
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: hj-chat-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Producer 설정
package hjp.hjchat.infra.kafka
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
@Configuration
class KafkaProducerConfig {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val configProps = HashMap<String, Any>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
Consumer 설정
package hjp.hjchat.infra.kafka
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
@Configuration
class KafkaConsumerConfig {
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val configProps = HashMap<String, Any>()
configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
configProps[ConsumerConfig.GROUP_ID_CONFIG] = "hjchat-consumer-group"
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(configProps)
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
}
기존 코드에 Kafka 적용
ChatService
기존 코드 (Kafka 적용 전)
fun processMessage(message: MessageDto, user: UserPrincipal): Message {
val chatRoom = chatRoomRepository.findById(message.chatRoomId)
.orElseThrow { IllegalArgumentException("Chat room not found") }
val member = oAuthRepository.findById(user.memberId)
.orElseThrow { IllegalArgumentException("Member not found") }
val savedMessage = messageRepository.save(
Message(
content = message.content,
userId = member,
chatRoom = chatRoom,
)
)
return savedMessage
}
수정된 코드 (Kafka 적용 후)
fun processMessage(message: MessageDto, user: UserPrincipal): Message {
val chatRoom = chatRoomRepository.findById(message.chatRoomId)
.orElseThrow { IllegalArgumentException("Chat room not found") }
val member = oAuthRepository.findById(user.memberId)
.orElseThrow { IllegalArgumentException("Member not found") }
// Kafka 메시지 전송
val kafkaMessage = mapOf(
"chatRoomId" to message.chatRoomId.toString(),
"senderName" to member.userName,
"content" to message.content
)
kafkaTemplate.send("chat-messages", kafkaMessage.toString())
return messageRepository.save(
Message(
content = message.content,
userId = member,
chatRoom = chatRoom,
)
)
}
FriendService
기존 코드 (Kafka 적용 전)
@Transactional
fun sendFriendRequest(userId: Long, friendId: Long): FriendShipDto {
val user = oAuthRepository.findById(userId)
.orElseThrow { IllegalArgumentException("사용자를 찾을 수 없습니다.") }
val friend = oAuthRepository.findById(friendId)
.orElseThrow { IllegalArgumentException("친구를 찾을 수 없습니다.") }
friendRequestRepository.save(
FriendRequest(
sender = user,
receiver = friend,
status = RequestStatus.PENDING
)
)
val friendship = friendshipRepository.save(
Friendship(
user = user,
friend = friend,
status = FriendshipStatus.PENDING
)
)
return FriendShipDto(
userId = friendship.user.id,
friendId = friendship.friend.id,
status = friendship.status.toString(),
senderName = friendship.friend.userName
)
}
수정된 코드 (Kafka 적용 후)
@Transactional
fun sendFriendRequest(userId: Long, friendId: Long): FriendShipDto {
val user = oAuthRepository.findById(userId)
.orElseThrow { IllegalArgumentException("사용자를 찾을 수 없습니다.") }
val friend = oAuthRepository.findById(friendId)
.orElseThrow { IllegalArgumentException("친구를 찾을 수 없습니다.") }
friendRequestRepository.save(
FriendRequest(
sender = user,
receiver = friend,
status = RequestStatus.PENDING
)
)
val friendship = friendshipRepository.save(
Friendship(
user = user,
friend = friend,
status = FriendshipStatus.PENDING
)
)
val event = mapOf(
"type" to "REQUEST",
"senderId" to userId,
"receiverId" to friendId
)
kafkaTemplate.send("friend-events", event.toString())
return FriendShipDto(
userId = friendship.user.id,
friendId = friendship.friend.id,
status = friendship.status.toString(),
senderName = friendship.friend.userName
)
}
'TIL' 카테고리의 다른 글
AWS S3 버킷 생성하기 (0) | 2025.01.02 |
---|---|
친구 추가 요청,거절 및 리팩토링 (2) | 2025.01.01 |
Apache Kafka (0) | 2024.12.30 |
친구 목록 조회 기능 구현 (1) | 2024.12.27 |
사용자 친구 추가 기능 구현 (1) | 2024.12.26 |