본문 바로가기

백엔드 개발자(node.js)가 되는 과정

카프카 컨테이너로 메시지 전송하기 - nest.js, python

728x90

 

 

GitHub - issuebombom/get_youtube_audio

Contribute to issuebombom/get_youtube_audio development by creating an account on GitHub.

github.com

 

Nest.js로 클라이언트에게 유튜브 링크를 입력받으면 이를 카프카 브로커에 메시지로 전송하고, python 라이브러리인 pytube로 해당 링크의 영상 정보를 획득하기 위해 python에서 consumer를 구현하여 메시지를 구독한다.

오늘 핵심 주제는 위 과정을 docker compose를 이용하여 각각 컨테이너로 빌드하여 동작하게 하는 것을 구현하는 과정과 이 과정에서 해결한 문제들에 대해서 알아보자.

컨테이너 없이 로컬 환경에서 간단하게 구현하는 내용은 아래를 참고해 주세요.
(참조: https://issuebombom.tistory.com/120)

producer 구현과 컨테이너 빌드 (Nest.js)

지난 게시글에서는 Nest.js 프레임워크에 어울리도록 짜여진 방식으로 producer를 구현했지만 오늘은 순수 kafkajs 라이브러리를 들고서 구현하는 방식을 알아보고 서로 비교해보자.

kafkajs로 producer 구현

지난 게시글을 작성할 시기에는 nestjs 프레임워크에서 제공하는 브로커 연결 방식에 따라 코드를 구현했었다. 그리고 그 방식의 경우 서버를 실행하면 즉시 카프카와의 연결을 시도했다.

하지만 지속적으로 연결이 유지될 필요가 없는 상황에서는 어떻게 해야하지? 라는 의문이 들었고, 결론적으로 kafkajs를 직접 적용하여 필요에 따라 연결과 종료가 되도록 조금 다른 방식으로 producer를 구현해 보았다.

// links.kafka.config.ts
import { Kafka, logLevel } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'test-client-1',
  brokers: ['kafka-1:19092'],
  logLevel: logLevel.ERROR,
});
export const kafkaProducer = kafka.producer();

// links.module.ts
// ...라이브러리 생략
@Module({
  imports: [],
  controllers: [LinksController],
  providers: [
    // 프로바이더 등록
    {
      provide: 'KafkaProducer',
      useValue: kafkaProducer,
    },
    LinksKafkaService,
  ],
})
export class LinksModule {}

// links.kafka.service.ts
// ...라이브러리 생략
@Injectable()
export class LinksKafkaService {
  constructor(
    // producer 등록
    @Inject('KafkaProducer') private readonly producer: Producer
  ) {}

  async sendMessage({ topic, message }: ILinksKafkaServiceSendMessage): Promise<void> {
    await this.producer.connect();
    await this.producer.send({
      topic,
      messages: [{ value: JSON.stringify(message) }],
    });
    await this.producer.disconnect();
  }
}

위 코드는 config.ts에서 카프카 연결에 대한 설정을 세팅, 모듈의 프로바이더에 등록, 그리고 이를 서비스에 사용하기 위해 Inject로 주입해 둔 상태이다. 여기까지는 모두 연결을 위한 준비상태이며 실제 연결을 시도하지는 않는다. 이 상태에서 constructor에 등록된 producer가 실제로 사용될 때, 즉 producer.connect() 함수가 실행될 때 연결을 시도하게 된다. 그러므로 sendMessage 함수가 실행될 때에만 연결과 종료를 수행하게 된다.

커넥션 유지 vs 필요할 때만 커넥션

스트림데이터를 발행하거나 잦은 발행이 요구되는 상황에서는 커넥션을 유지하는 것이 좋다. 잦은 연결, 종료의 반복은 오버헤드에 해당하기 때문이다. 하지만 실시간 처리는 트래픽에 부담을 주는 부분이므로 비즈니스적으로 배치 처리가 허용되는 상황에서는 굳이 실시간성을 유지할 필요가 없을 것이다.

python으로 consumer 구현

예전에 pytube라는 파이썬 라이브러리를 활용하여 유튜브 링크를 입력하면 해당 영상 정보, 음원, 영상 등을 추출할 수 있는 서비스를 구현한 적이 있어서 이를 활용하고자 nestjs와 python 간 통신을 구현했다.

from utils import YoutubeAudioExtractor
from kafka import KafkaConsumer
import json
import logging
import time

KAFKA_SERVER = ["kafka:19092"]
TOPIC_NAME = "TEST-KAFKA"
GROUP_ID = "youtube-extractor"
CLIENT_ID = "get-links-info"


class Consumer:
    def __init__(self, broker, topic, client_id, group_id):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=broker,
            value_deserializer=lambda v: json.loads(v.decode("utf-8")),
            client_id=client_id,
            group_id=group_id,
            auto_offset_reset="latest",  # earliest, latest
            # retry_backoff_ms=30000, # 연결 시도
            # reconnect_backoff_max_ms=30000, 재연결 시도
            # enable_auto_commit=True, # 오프셋 자동 커밋 여부
            # consumer_timeout_ms=1000 # 데이터 이터레이션을 막는 시간
        )

    def receive_message(self):
        try:
            for message in self.consumer:
                print(message.value)
                links = message.value["links"]
                youtube = YoutubeAudioExtractor(links)
                video_information = youtube.extract_url_information()
                print(video_information, flush=True)  # flush 적용 시 버퍼에 저장된 내용 출력
        except Exception as e:
            print(f"Extract Process Error: {e}")


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("consumer.conn")

"""카프카 연결 반복 시도
- 카프카 ready 상태에 돌입할 때 까지 연결 재시도
"""
while True:
    try:
        kafkaConsumer = Consumer(KAFKA_SERVER, TOPIC_NAME, CLIENT_ID, GROUP_ID)
        break
    except Exception as e:
        logger.error(f"exception occurred: {e}")
        logger.info("retrying on errors")
        time.sleep(1)
        continue
kafkaConsumer.receive_message()

위 코드를 구현하는 과정에서 몇 가지 이슈 사항이 있었다.
첫째는 컨테이너 기반 파이썬에서 print를 그냥 사용하면 터미널에 출력되지 않는다는 점,
둘째는 파이썬 카프카 라이브러리에서 카프카 커넥팅을 지속적으로 시도하지 않는다는 점이다.
위 사항에 대한 해결 과정은 해당 프로젝트 깃헙 readme에 기록해 두었다.  

컨테이너 이미지 빌드하기

# nodejs build
# main-server
FROM node:18 AS builder
WORKDIR /app
COPY package*.json /app/
RUN npm install

FROM node:18-alpine
WORKDIR /app
COPY --from=builder /app/node_modules/ /app/node_modules/
COPY . .
RUN npm run build
CMD ["node", "dist/main.js"]

# python build
# youtube extractor
FROM python:3.8-alpine
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD python app.py

위와 같이 도커 허브에 있는 node와 python 이미지를 적용하여 두 서버 이미지를 빌드했다. 라이브러리 설치를 우선 적용하여 향후 다시 빌드할 때 시간이 단축되도록 작성했다. 최종적으로 docker-compose를 통해 주키퍼, 카프카 이미지와 한꺼번에 컨테이너 생성 및 실행한다.

카프카 컨테이너 실행 및 연결

DockerHub에 올라온 가장 많이 접할 수 있었던 주키퍼와 카프카 이미지는 wurstmeisterconflientinc였다. 위 둘을 모두 사용에 큰 문제는 없었으나 이미지 크기에서 wurstmeister가 약 300MB 가량 더 적었다. 그리고 conflientinc의 카프카가 실행되면 브로커가 준비상태가 되는데까지 시간이 더 오래 걸렸다.

컨테이너 빌드를 위한 yaml 파일을 살펴보자

# wurstmeister
version: '2'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - 2181:2181

  kafka-1:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:19092,LISTENER_DOCKER_EXTERNAL://kafka-1:9092 # wurstmeister는 listeners와 advertised를 같이 안쓰면 오류를 낸다.
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
    depends_on:
      - zookeeper

# confluentinc
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka-1:
    image: confluentinc/cp-kafka:7.0.0
    ports:
      - "9092:9092"
    environment:
      # confluentinc는 listeners가 없어도 문제되지 않는다.
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

두 방식에 큰 차이는 없다. 단지 브로커나 파티션 등에 대한 디테일한 세팅을 미리 할 것인가 아닌가에 대한 차이가 있을 뿐이다.

그보다 중요한 점은 KAFKA_LISTENERSKAFKA_ADVERTISED_LISTENERS에 대한 설정이다. 개인적으로 이 부분에서 많이 해맸는데, 이는 컨테이너로 구현된 카프카를 내부와 외부 모두에서 접근이 가능하도록 구현하는 과정에서 해당 옵션에 대한 이해가 필요했기 떄문이다.

카프카 컨테이너의 내부와 외부 연결

KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:19092,LISTENER_DOCKER_EXTERNAL://kafka-1:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL

여러 카프카 강의를 보면 다운 받은 kafka 폴더 내 server.properties에서 listenersadvertised.listeners의 주석을 풀어주라고 하는데 딱 그 행동을 하는 설정에 해당한다. 이 때 브로커에 접속하기 위한 길을 열어주는 것인다. 위 yaml에서 LISTENER_DOCKER_INTERNALLISTENER_DOCKER_EXTERNAL 두 가지 케이스가 입력된 것을 볼 수 있다. INTERNAL의 경우 같은 네트워크에 속한 컨테이너 간 통신을 위한 경로에 해당하고 EXTERNAL은 컨테이너 밖 외부와의 연결을 의미한다.

도커 컨테이너는 docker-compose up으로 실행할 때 마다 각 컨테이너의 고유 IP주소가 부여되고 이는 실행할 때 마다 바뀐다. 그래서 'kafka-1:19092'와 같이 IP주소 입력 란에 컨테이너명을 입력하면 매번 IP가 바뀌더라도 현재 IP로 접속할 수 있도록 기능이 구현되어있다. 또한 nestjs와 python으로 구현된 producer와 consumer도 컨테이너로 띄우므로 카프카와 동일 네트워크를 사용하여 브로커 접근도 'kafka-1:19092'로 설정해두면 되기 때문에 편리하다.

하지만 도커 환경을 벗어난 외부에서는 'kafka-1'을 알아들을리 만무하고, 설령 해당 컨테이너의 실제 IP를 알고 있어도 네트워크 환경이 다르므로 접속이 불가하다. 그러므로 우선 해당 컨테이너의 네트워크 환경으로 접속해야 하고 yaml의 kafka-1에서 설정한 9092포트로 접속한다. 그러면 우리집IP:9092 -> 컨테이너IP:9092로 포트포워딩된다. 이제는 컨테이너 네트워크 환경이므로 컨테이너IP:9092 = localhost:9092가 된다. 이때 카프카의 advertise.listeners 항목에 localhost:9092를 설정하여 kafka-1:9092로 접속할 수 있게 매칭시켜 두었다면 결론적으로 로컬에서의 localhost:9092가 kafka-1:9092에 다다르게 된다.

listeners와 advertised.listeners

listeners는 쉽게 말해 해당 브로커에 접속하기 위한 길을 열어 둔다고 생각하면 좋다. 위 yaml의 경우 kafka-1 호스트의 19092와 9092포트에 대한 접근을 허용하고 있다. advertised.listeners는 '광고'라는 키워드를 쓰고 있다. 즉 listeners를 통해 열린 길로 들어오려면 이쪽 경로로 들어오면 됩니다. 라고 외부에 알리는 것이다. 그래서 localhost:9092로 들어오면 kafka-1:9092로 들어올 수 있게 하는 것이다. 이는 LISTENER_DOCKER_INTERNAL와 LISTENER_DOCKER_EXTERNAL로 엮여 있다. 얘네들 형태가 Key:Value임이 보일 것이다. 그래서 listeners의 LISTENER_DOCKER_EXTERNAL과 advertised.listeners의 LISTENER_DOCKER_EXTERNAL이 같은 Key값을 가지므로 엮일 수 있는 것이다.

정리하자면 아래와 같다.

  • INTERNAL: 컨테이너끼리는 같은 네트워크 환경이니까 바로 kafka-1 호스트로 접속하면 된다.
  • EXTERNAL: 컨테이너 입장에서의 외부 네트워크 환경(우리집 로컬)에서의 접속을 허용하기 위해 로컬 -> 컨테이너 환경 -> 브로커로 포트포워딩한다.

docker-compose up (결과)

version: '2'

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - 2181:2181

  kafka-1:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:19092,LISTENER_DOCKER_EXTERNAL://kafka-1:9092 # wurstmeister는 listeners와 advertised를 같이 안쓰면 오류를 낸다.
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka-1:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
    depends_on:
      - zookeeper

  main-server:
    build:
      context: ./main-server
      dockerfile: Dockerfile
    volumes:
      - ./main-server/src:/app/src
    ports:
      - 3000:3000

  extractor-server:
    build:
      context: ./extractor
      dockerfile: Dockerfile
    ports:
      - 5000:5000
    depends_on:
      - kafka

 

컨테이너 실행
메시지 구독 및 처리

포스트맨으로 main-server(nestjs) 엔드포인트로 링크를 POST하면 이를 받아 producer에서 메시지를 브로커에 보낸다. 최초 전송 시 카프카에서 버전과 관련된 WARN을 띄우고 있다. 이후 extractor-server(python)에서 즉각적으로 브로커에 담긴 메시지를 가져와 해당 유튜브 영상 정보를 출력하고 있다.

 

참고
카프카 도커 컨테이너 옵션 설정
카프카 도커 컨테이너 연결

 

728x90
728x90