Apache Kafka 적용시키기

2024. 12. 31. 19:39TIL

 

Kafka 설치를 위한 docker-compse.yml 작성 (루트 디렉토리에 위치)

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092

 

Docker 실행후 해당 라인 실행

docker-compose up -d

 

 

Spring Boot에서 Kafka를 사용하기 위한 의존성 추가

dependencies {

    implementation("org.springframework.kafka:spring-kafka")

}

 

 

application.yml kafka 설정 추가 

 

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: hj-chat-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

 

Producer 설정

package hjp.hjchat.infra.kafka

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory

@Configuration
class KafkaProducerConfig {

    @Bean
    fun producerFactory(): ProducerFactory<String, String> {
        val configProps = HashMap<String, Any>()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }
}

 

Consumer 설정

package hjp.hjchat.infra.kafka

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory

@Configuration
class KafkaConsumerConfig {

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        val configProps = HashMap<String, Any>()
        configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        configProps[ConsumerConfig.GROUP_ID_CONFIG] = "hjchat-consumer-group"
        configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        return DefaultKafkaConsumerFactory(configProps)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        return factory
    }
}

 

기존 코드에 Kafka 적용

 

ChatService

기존 코드 (Kafka 적용 전)

fun processMessage(message: MessageDto, user: UserPrincipal): Message {
    val chatRoom = chatRoomRepository.findById(message.chatRoomId)
        .orElseThrow { IllegalArgumentException("Chat room not found") }

    val member = oAuthRepository.findById(user.memberId)
        .orElseThrow { IllegalArgumentException("Member not found") }

    val savedMessage = messageRepository.save(
        Message(
            content = message.content,
            userId = member,
            chatRoom = chatRoom,
        )
    )
    return savedMessage
}

 

수정된 코드 (Kafka 적용 후)

fun processMessage(message: MessageDto, user: UserPrincipal): Message {
    val chatRoom = chatRoomRepository.findById(message.chatRoomId)
        .orElseThrow { IllegalArgumentException("Chat room not found") }

    val member = oAuthRepository.findById(user.memberId)
        .orElseThrow { IllegalArgumentException("Member not found") }

    // Kafka 메시지 전송
    val kafkaMessage = mapOf(
        "chatRoomId" to message.chatRoomId.toString(),
        "senderName" to member.userName,
        "content" to message.content
    )
    kafkaTemplate.send("chat-messages", kafkaMessage.toString())
    
    return messageRepository.save(
        Message(
            content = message.content,
            userId = member,
            chatRoom = chatRoom,
        )
    )
}

 

FriendService

기존 코드 (Kafka 적용 전)

@Transactional
fun sendFriendRequest(userId: Long, friendId: Long): FriendShipDto {

    val user = oAuthRepository.findById(userId)
        .orElseThrow { IllegalArgumentException("사용자를 찾을 수 없습니다.") }
    val friend = oAuthRepository.findById(friendId)
        .orElseThrow { IllegalArgumentException("친구를 찾을 수 없습니다.") }

    friendRequestRepository.save(
        FriendRequest(
            sender = user,
            receiver = friend,
            status = RequestStatus.PENDING
        )
    )

    val friendship = friendshipRepository.save(
        Friendship(
            user = user,
            friend = friend,
            status = FriendshipStatus.PENDING
        )
    )

    return FriendShipDto(
        userId = friendship.user.id,
        friendId = friendship.friend.id,
        status = friendship.status.toString(),
        senderName = friendship.friend.userName
    )
}

 

수정된 코드 (Kafka 적용 후)

@Transactional
    fun sendFriendRequest(userId: Long, friendId: Long): FriendShipDto {

        val user = oAuthRepository.findById(userId)
            .orElseThrow { IllegalArgumentException("사용자를 찾을 수 없습니다.") }
        val friend = oAuthRepository.findById(friendId)
            .orElseThrow { IllegalArgumentException("친구를 찾을 수 없습니다.") }

        friendRequestRepository.save(
            FriendRequest(
                sender = user,
                receiver = friend,
                status = RequestStatus.PENDING
            )
        )

        val friendship = friendshipRepository.save(
            Friendship(
                user = user,
                friend = friend,
                status = FriendshipStatus.PENDING
            )
        )
        
        val event = mapOf(
            "type" to "REQUEST",
            "senderId" to userId,
            "receiverId" to friendId
        )
        kafkaTemplate.send("friend-events", event.toString())

        return FriendShipDto(
            userId = friendship.user.id,
            friendId = friendship.friend.id,
            status = friendship.status.toString(),
            senderName = friendship.friend.userName
        )
    }

'TIL' 카테고리의 다른 글

AWS S3 버킷 생성하기  (0) 2025.01.02
친구 추가 요청,거절 및 리팩토링  (2) 2025.01.01
Apache Kafka  (0) 2024.12.30
친구 목록 조회 기능 구현  (1) 2024.12.27
사용자 친구 추가 기능 구현  (1) 2024.12.26