[Kafka]Kafka설치(with KRaft)및 PUB/SUB시험 코드(with Python)실습 l Kafka 3.6.1 with KRaft Apache Kafka(이하”카프카”)은 실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산형 데이터 스트리밍 플랫폼이다.복수의 소스에서 데이터 스트림을 처리하고 복수의 유저에게 전달하도록 설계되고 있다.간단히 말하면 A지점에서 B지점까지 이동할 뿐만 아니라 A지점에서 Z지점을 시작으로 필요한 모든 장소에서 대규모 데이터를 동시에 이동할 수 있다.카프카는 현재 다양한 마이크로 서비스 개발 환경에서 많이 사용된다.카프카는 3.5.2버전까지는 분산 코디로서 Zookeeper를 사용하고 있었지만, 3.6.0버전부터는 분산 코디네이션에 KRaft을 도입했다.현재 이 글을 쓰는 시점의 최신 버전인 3.6.1버전에도 Jukeeper와 KRaft의 양쪽을 지원하며 사용자가 선택하고 사용할 수 있다.그러나 4.0버전부터는 주 키퍼를 지원하지 않다며 그 후는 KRaft만이 사용될 예정이라고 한다.Kraft의 장점 및 자세한 설명은 이하의 링크를 참조한다.(l KRaft:Apache Kafka Without ZooKeeper:https://developer.confluent.io/learn/kraft/
이번 포스트는 카프카 3.6.1 버전을 설치하고 메인키퍼가 아닌 KRAft를 사용해 손쉽게 파이썬 코드로 데이터를 게시(Producer)하고 구독(Consumer)하는 프로그램을 만들어 본다. 【테스트 환경】
OSUbuntu 22.04.3 LTS CPU2 코어 메모리 2 GBKafkaKafka 3.6.1
카프카를 설치·운영하기 위해서는 JDK가 설치되어 있어야 합니다. JDK는 11개 버전 이상 사용해야 하며, 이번 실습에서는 17개 버전을 설치하였습니다.
apt-get install openjdk-17-jdk
카프카 다운로드 페이지에서 원하는 버전을 다운로드 받은 후 압축을 해제한다. l Kafka Download : https://kafka.apache.org/downloads 카프카의 경우 설치형이 아닌 압축을 푼 후 서비스를 실행하여 바로 사용할 수 있다. 실습에서는 3.6.1 버전을 설치한다. 파일을 다운로드하여 /usr/local 경로로의 압축을 해제한다.wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz tar xvf kafka_2.13-3.6.1.165z – C /usr/local관리와 사용자 편의를 위한 심볼 링크를 작성합니다.cd /usr/local ln -s kafka_2.13-3.6.1/ kafka[server.properties 세팅] 카프카를 실행하기 전에 server.properties를 설정해야 합니다. 이하의 순서에 따라서 설정합니다. Properties 파일의 경로는 다음과 같습니다.cd /usr/local/kafka/config/kraft/vi 또는 에디터를 실행하여 server.properties 파일의 내용을 수정합니다.마이크로서버외부 통신을 위해서는 advertised. listeners의 localhost 주소를 카프카 서버 IP로 반드시 변경해야 한다. 필자도 이 부분을 놓쳐서 거의 이틀 동안 삽질을 했다############################# 소켓 서버의 설정 #########\n이봐요. # # # # #advertised. listeners= VMEXT://localhost:9092 <-석처리주 advertised. listeners= VMEXT://XXX.XXXX입니다.XXXX:9092 <-localhost IP 력카프의 로컬 호스트 IP 입신>log.dirs는 로그 파일의 경로입니다. 기본값은 /tmp/kraft-combined-logs로 되어 있으나 사용자의 편의성에 따라 경로를 변경할 수 있습니다. 필자는 /usr/local/kafka/kraft-combined-logs 경로에 디렉토리를 작성하고 변경하였습니다.############################# 로그 기본 정보 ###### #### # # # # #, #, #, log.dirs=/usr/local/kafka/kraft-dirs-title여기까지 server.properties 수정이 완료되면 저장을 하고 에디터에서 나옵니다. [Kafka 클러스터 ID 생성 및 포맷] 카프카 서버를 시작하기 전에 클러스터 ID를 생성하고 스토리지(위에서 지정한 로그 패스의 디렉토리)를 포맷해야 합니다. 클러스터 ID 는, 이하의 커맨드로 작성할 수 있습니다.cd/usr/local/kafka./bin/kafka-storage를 사용합니다.SH랜덤결합의cd/usr/local/kafka./bin/kafka-storage를 사용합니다.SH랜덤결합의랜덤 UUID로 클러스터 ID가 생성되면 해당 ID를 사용하여 스토리지 디렉토리를 포맷합니다.cd/usr/local/kafka./bin/kafka-storage를 사용합니다.sh format -t J9wwo7p2RHe9-4PiP4KbBw -c./config/kraft/server.properties(RHe9-4PiP4KbBW -c)cd/usr/local/kafka./bin/kafka-storage를 사용합니다.sh format -t J9wwo7p2RHe9-4PiP4KbBw -c./config/kraft/server.properties(RHe9-4PiP4KbBW -c)[카프카 서버 시작/중지] 위 단계까지 완료되면 카프카 서버를 시작합니다. 카프카 서버의 기동과 정지는, 이하의 명령을 사용할 수 있습니다.#Start Service cd /usr/local/kafka./bin/kafka-server-start.sh -stop ./config/kraft/server.properties #サービス CD を停止します. cd /usr /local /kafka./bin/kafka-server-stop.sh -stop ./config /kraft/server.properties서비스가 정상적으로 실행되고 있는지 확인하려면 아래 스크립트를 사용하여 데몬 활성화 상태와 포트를 오픈하고 있는지 확인할 수 있습니다.ps -ef + grep java + grep kraft/+。属性ps -ef + grep java + grep kraft/+。属性netstat -antp + grep 9092netstat -antp + grep 9092netstat -antp + grep 9092netstat -antp + grep 9093netstat -antp + grep 9093[Producer / Consumer Test] 카프카 서비스가 정상적으로 실행되면 간단하게 토픽을 하나 생성해 프로듀서, 컨슈머 테스트를 진행한다.토픽 생성은 이하의 스크립트를 사용한다. 실습에서는 토픽의 이름을 topic1이라고 생성하였다.#Topiccd /usr/local/kafka./bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –authentication-factor 1 –partitions 1 #トピックを削除します./bin/kafka-topics.sh –delete –bootstrap-server localhost:9092현재 생성된 주제를 확인하는 명령어는 다음과 같습니다.cd /usr/local/kafka./bin/kafka-topics.sh –bootstrap-server=localhost:9092 –list카프카에 topic1 메시지를 게시합니다.cd /usr/local/kafka./bin/kafka-console-producer.sh –message-list localhost:9092 –message topic 1> Input message > 다른 메시지 입력 #EndInputCtrl+C카프카의 topic1 메시지를 구독합니다.cd /usr/local/kafka。/bin/kafka-console-commer.sh –bootstrap-@localhost:9092 –devic topic1 –from-startingcd /usr/local/kafka。/bin/kafka-console-commer.sh –bootstrap-@localhost:9092 –devic topic1 –from-starting[Python에서 Producer, Consumer 테스트] Python에서 간단한 Producer, Consumer 테스트 코드를 작성해 본다. 우선 카프카 커넥터를 설치해야 한다. 다양한 커넥터가 있지만 실습에서는 kafka-python 커넥터를 사용한다.pip3 install kafka 중개인아래 코드는 producer 코드에서 1~10까지 반복하고 topic1에 메시지를 입력합니다. (파이썬 코드에 관한 설명은 생략합니다.)from kafka import KafkaProducer from json임포트 덤프 import time 가져오거나 역추적 프로듀서=KafkaProducer(acks=0, compression_type=’gzip’, bootstrap_servers=[‘XXX]. XXX.XXX.XXXX:9092′), value_publicizer=public x:html(x)publicutf-8’)start=time.time()try:i in range(10):data={‘str’:’result’+str(i)}producer.send(1′, value=data)producer.flush()producer.close()print(‘[완료]’, time()-start)예외:traceback.print_exc()from kafkaimport KafkaProducer from json 임포트 덤프import time 임포트 트레이스백 프로듀서 = KafkaProducer (acks=0, compression_type=’gzip’, bootstrap_servers=[‘XXX].XXX.XXX.XXX.XXX:9092′), value_publicizer=public x:html(x).publicutf-8’ ) start = time.time() try: i in range(10): data = {‘str’ : ‘result’+str(i)} producer.send(1’, value=data) producer。flush() producer.close() print(‘[完了]’, ‘, time() – start)例外: traceback.print_exc()아래 코드는 Consumer 코드로 topic1에 있는 메시지를 처음부터 구독합니다.from kafka import KafkaConsumer from json import consumer=카프카 가전(‘html1’, bootstrap_servers=[‘XXX.XXX.XXX.XXX:9092′), auto_offset_commit=’commit’, enable_auto_commit=확실히 group_id=’1′, value_deserializer=https x:loads(x.htmlutf-8′), consumer_ms=1000)print(‘[Start]get consumer’)의 메시지는 다음과 같습니다.print(f’Topic:{context. 주제), 파티션:{message.partition}, 오프셋:{message.offset}, 키:{message}.key}, value:{message.value}}’print(‘[End]get consumer’)from kafka import KafkaConsumer from json import consumer = カフカコンシューマ ( ‘html1’, bootstrap_servers=[‘XXX。XXX.XXX。XXX:9092′), auto_offset_commit=’commit’, enable_auto_commit=確かに、group_id=’1’、value_deserializer=https x: loads(x.htmlutf-8′)、consumer_ms=1000) print(‘[Start] get consumer’) 메시지는 다음과 같습니다. print(f’Topic: {context. 토픽), 파티션: {message.partition}, 오프셋: {message.offset}, 키: {message}.key}, value : {message.value}}’ print(‘[End] get consumer’)그동안 카프카의 설치 및 간단하게 프로슈머 컨슈머의 사용법에 대해서 알아봤다.카프카는 대규모 메시징시스템에서 잘 사용된다.그래서 단독 서버가 아닌 클러스터로 구성되어 사용하는 경우가 많다.카프카의 특성을 잘 이해하고 주제 및 파티션 최적화 설계에서 안정적인 서비스 구성할 수 있도록 반드시 추가 학습을 진행시키는 것을 권장한다.[참고자료]KRaft:Apache Kafka Without ZooKeeper:https://developer.confluent.io/learn/kraft/Mark KRaft as Production Ready:https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready 2024-01-09/Sungwook Kang/https://sungwookkang.com#KAFKA,#아파치카프카,#카프카설치,#InstallKafka,#ApacheKafka,#Kraft,#카프카스트림,#producer,#consumer