백엔드 개발자(node.js)가 되는 과정

카프카로 메시지 전송하기 - Nest.js와 python

soopy 2024. 1. 5. 14:09
728x90

카프카로 메시지 전송하기 - Nest.js와 python

Nest.js에서 유튜브 링크를 python 코드로 작성된 서버에 전송하면 받아서 해당 영상 정보를 출력해주는 간단한 서비스를 구현해 보았다. 이전 글에서는 axios로 http 통신을 통해 정보를 주고 받는 코드를 구현해 보았다.
(참조: https://issuebombom.tistory.com/117)

이번에는 주고 받는 것은 아니지만 카프카 메시지 브로커를 중간에 두고서 메시지를 전달하는 코드를 구현해 보았다.

Kafka란 무엇인가? (간단하게)

어디에선가 받은 이벤트를 잘 보관하고 있다가 구독자에게 이벤트를 잘 전달해주는 이벤트 브로커를 말한다.
이벤트 브로커는 토픽이라는 일종의 파일시스템(fs)의 폴더와 같은 저장 공간을 가지고 그 안에 기본적으로 하나의 파티션을 갖는다.
자료를 조사하다 보면 Kafka는 메시지 브로커의 역할도 가능하다고 기록이 되어있는데 다소 혼란스러운 부분이다. 사실 전송하는 방식에 큰 차이는 없는 듯 보였다. 하지만 아키텍처 설계 과정에서 두 서버 간 소통할 때 A 서버에서 발생한 어떠한 사건에 대한 알림을 B 서버에게 전달해준다는 개념을 가질 때는 이벤트라고 부르고, A 서버가 B 서버에게 특정한 정보(데이터)를 전달해준다는 개념을 가질 때는 메시지라 부르는 듯 하다.
카프카에 대한 더 자세한 내용은 다른 게시글을 통해 정리하도록 하겠다.

카프카 설치

카프카는 아파치 재단에서 제공하는 프로그램이다. 먼저 설치 과정을 살펴보자. 아래 링크로 접속하면 카프카 설치를 위한 tgz 압축 파일을 다운받을 수 있다.
(참조: https://kafka.apache.org/downloads)

압축 풀기 및 기본 사용법 또한 아래 링크를 참조하자.
(참조: https://kafka.apache.org/quickstart)

기본적인 카프카 사용법

공식문서의 Quick Start를 따라가며 Flow를 생각해보자.

카프카 폴더에 접속한다.

# 자신이 다운받은 버전에 따라 폴더명이 다를 것이다. 
# 핵심은 압축 풀기 후 생성된 kafka 폴더를 찾아 들어가는 것이다. 
cd kafka_2.13-3.6.1`

주키퍼와 카프카를 서로 다른 터미널에서 실행한다.

# Start the ZooKeeper 
service bin/zookeeper-server-start.sh config/zookeeper.properties

# Start the Kafka broker service
bin/kafka-server-start.sh config/server.properties

 

주키퍼를 우선적으로 실행한 뒤에 카프카를 실행해야 한다. 이 둘은 서로 다른 개별 서버에 해당한다 생각하고 각각 켜 두어야 한다는 것을 기억하자. 또한 기본적으로 localhost:9092포트를 사용하게 되며 이에 대한 변경은 config/server.properties에서 가능하다.  
참고로 주키퍼는 카프카 브로커의 메타데이터를 보관하고 브로커들의 동적 확장, 축소를 담당하는 브로커 관리자에 해당하며 브로커 클러스터의 운영을 돕는다.

새로운 터미널에서 토픽을 생성한다.

bin/kafka-topics.sh --create --topic TEST-KAFKA --bootstrap-server localhost:9092

kafka-topics.sh 파일을 실행하는데 옵션으로 토픽 생성(토픽 이름으로 'TEST-KAFKA' 지정), localhost:9092로 실행 중인 카프카를 대상으로 생성함을 의미한다.

여기까지 주피커와 카프카 실행 및 토픽 생성을 마쳤다. 이제 본격적으로 해당 브로커에게 메시지를 전달하고 받는 과정을 구현해보자.

Nest.js에서 카프카에 메시지 전달하기 (producer)

브로커를 가운데 두고서 메시지나 이벤트를 보내는 producer와 이를 구독하는 consumer의 개념이 존재한다. 여기서는 Nest.js가 카프카 producer의 역할을 하고 python 코드 consumer를 구현해 보도록 하자.

먼저 필요한 패키지를 설치해준다.

npm install @nestjs/microservices kafkajs

main.ts 설정

main.ts에서 부트스트랩할 때 KAFKA를 이용할 수 있도록 아래와 같이 설정한다.
이때 카프카 설정 options에서는 nest.js 프레임워크의 글로벌 설정임을 기억하자. 여기서 구체적인 clientId나 groupId를 설정할 수도 있다. 하지만 여러 하위 항목의 여러 컨트롤러나 서비스에서 세부적인 설정이 가능하므로 상황에 맞게 작성하자.

// main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { ValidationPipe } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      },
    },
  });

  await app.listen(3000);
}
bootstrap();

위 코드는 localhost:3000으로 웹 서비스를 하는 가운데 KAFKA를 이용하는 것에 대한 설정이다. 우리는 엔드포인트 요청을 통해 python consumer에 전달할 초기 메시지를 전달할 것이기 때문이다.
Nest.js 공식 도큐먼트에 나와있는 카프카 설정은 위와 다를 것이다. 아래와 같이 코드를 작성한다면 해당 서버를 순수 카프카 producer나 consumer로서만 사용하겠다는 의미이다. 그러므로 http를 통한 엔드포인트가 필요하지 않고, 순수 이벤트 기반으로만 사용될 경우 적용하는 코드임을 기억하자(참고)

// main.ts (참고)

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});

module, controller, service 설정

모듈 설정을 통해 세부적인 카프카 설정을 진행한다. 특히 clientId, groupId 설정을 왜 하는지 아는 것이 중요하다.
그룹Id의 경우 컨슈머의 입장에서 그룹화를 의미하며, 컨슈머1과 컨슈머2가 같은 그룹에 속할 수 있음을 의미한다. 이 기능이 필요한 이유 중 하나로 하나의 메시지를 하나의 컨슈머1이 메세지 구독을 혼자 처리하기에 버거워 한다면 컨슈머2를 생성해서 구독을 나눠 가질 수 있도록 하기 위함이다. 이 때 그룹 내 컨슈머는 하나의 파티션에만 접근이 가능하므로 토픽 내에서도 파티션 추가가 필요하다. 그렇게하면 해당 토픽은 메시지를 두 개의 파티션에 분산 저장하게 되어 처리량의 확장, 증대시킬 수 있다.

// links.module.ts

import { Module } from '@nestjs/common';
import { LinksController } from './links.controller';
import { LinksService } from './links.service';
import { HttpModule } from '@nestjs/axios';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { LinksKafkaProducerService } from './links.kafka-producer.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'YOUTUBE_SERVICE', // service의 Inject를 위해 지정하는 이름
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'links-message', // 카프카 클라이언트명을 지정하는 개념, 로깅을 위한 용도
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'test-group-1', // 컨슈머는 그룹화가 가능하여 그룹명 지정
          },
        },
      },
    ]),
  ],
  controllers: [LinksController],
  providers: [
    LinksKafkaProducerService],
})
export class LinksModule {}

여기서 컨트롤러의 역할은 유튜브 링크에 대한 엔드포인트 요청(POST)이 들어오면 이를 Kafka 브로커에 전달해주는 것이다.

// links.controller.ts

import { Body, Controller, Post } from '@nestjs/common';
import { LinksKafkaProducerService } from './links.kafka-producer.service';
import { SendLinksMessageDto } from './dto/send-links-message.dto';

@Controller('links')
export class LinksController {
  constructor(
    private readonly linksKafkaProducerService: LinksKafkaProducerService,
  ) {}

  @Post('/send-message')
  sendMessage(@Body() sendLinksMessageDto: SendLinksMessageDto) {
    this.linksKafkaProducerService.sendMessage({
      topic: 'TEST-KAFKA', // 터미널 명령으로 생성한 토픽 이름을 지정한다.
      message: sendLinksMessageDto,
    });

    return sendLinksMessageDto;
  }
}

카프카 producer는 별도의 service로 구현한다.

// links.kafka-producer.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { SendLinksMessageDto } from './dto/send-links-message.dto';

@Injectable()
export class LinksKafkaProducerService {
  constructor(
    @Inject('YOUTUBE_SERVICE') private readonly clientKafka: ClientKafka,
  ) {}

  sendMessage({ topic, message }: ILinksKafkaProducerServiceSendMessage): void {
    const jsonString = JSON.stringify(message) // 오브젝트를 전송하므로 json
    this.clientKafka.emit<SendLinksMessageDto>(topic, jsonString);
  }
}

interface ILinksKafkaProducerServiceSendMessage {
  topic: string;
  message: Record<string, any>;
}

이제 Nest.js에서의 카프카 사용을 위한 준비는 끝났다. 여기서는 producer만 구현되었음을 기억하자. 이제 받는 것은 python consumer가 할 것이다.

Python에서 카프카 메시지 구독하기 (consumer)

파이썬에서 사용하는 kafka 패키지를 설치한다.

pip install kafka-python

컨슈머 구현은 아래와 같이 간단하게 할 수 있다.
(참조: https://kafka-python.readthedocs.io/en/master/)

# app.py

from utils import YoutubeAudioExtractor
from kafka import KafkaConsumer
import json

TOPIC_NAME = "TEST-KAFKA" # 구독할 토픽 지정
GROUP_ID = "youtube-extractor"
CLIENT_ID = "get-links-info"
KAFKA_SERVER = "localhost:9092"

consumer = KafkaConsumer(
    bootstrap_servers=KAFKA_SERVER,
    group_id=GROUP_ID,
    client_id=CLIENT_ID,
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

consumer.subscribe(TOPIC_NAME)

try:
    for message in consumer:
        message = message.value
        links = message['links']

        youtube = YoutubeAudioExtractor(links)
        video_information = youtube.extract_url_information()
        print(video_information)

except Exception as e:
    print({ 'error': f'{e}' })

위 코드에서 구독할 토픽을 지정하는 부분, 그리고 value_deserializer를 통해 json 스트링을 dict 형태로 변경하도록 설정하고 있다. 그리고 YoutubeAudioExtractor 클래스는 유튜브 영상 링크를 받아 메타데이터를 전달해주는 기능을 한다.
(참조: https://issuebombom.tistory.com/117)

app.py를 실행하면 지정 토픽에 대한 구독 상태에 돌입한다.

테스트하기

왼쪽 사진은 Nest.js 서버 실행, 가운데는 파이썬 consumer의 구독 실행, 오른쪽은 kafka에서 실행한 consumer에 해당한다.
포스트맨을 통해 특정 유튜브 링크를 {"link": "https://...."} 형태의 데이터로 요청한 결과 오른쪽 kafka consumer에서 데이터를 그대로 받았음이 확인되고 있다. 또한 python에서는 받은 링크의 메타데이터를 출력하는 것을 확인할 수 있으며 producer와 consumer가 제 역할을 잘 하고 있음을 확인할 수 있다.

728x90
728x90