Apache Kafka

 

















docker-compose.yaml file for kafka:

version: '3.3'
services:
zookeeper:
container_name: zookeeper-cntr
image: confluentinc/cp-zookeeper:7.2.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: always
networks:
- kafka-nw
healthcheck:
test: echo stat | nc zookeeper-cntr 2181
interval: 10s
timeout: 10s
retries: 3

kafka:
container_name: kafka-cntr
image: confluentinc/cp-kafka:7.2.0
depends_on:
- zookeeper
ports:
- 29092:29092
restart: always
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-cntr:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-cntr:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ADVERTISED_HOST_NAME: localhost
networks:
- kafka-nw
healthcheck:
test: nc -vz kafka-cntr 9092 || exit -1
# start_period: 15s
interval: 5s
timeout: 10s
retries: 10

init-kafka:
image: confluentinc/cp-kafka:7.2.0
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
networks:
- kafka-nw
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka-cntr:9092 --list

echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka-cntr:9092 --create --if-not-exists --topic codespotify-topic --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka-cntr:9092 --list
"

kafka-manager:
image: hlebalbau/kafka-manager:stable
depends_on:
- kafka
restart: always
ports:
- "9000:9000"
environment:
ZK_HOSTS: "zookeeper-cntr:2181"
networks:
- kafka-nw

networks:
kafka-nw:
driver: bridge



--------------------------------------------------------------------------------------------------------------------------------

#version: '3.3'
#services:
# zookeeper:
# container_name: zookeeper-cntr
# image: confluentinc/cp-zookeeper:7.2.0
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
# ZOOKEEPER_TICK_TIME: 2000
# restart: always
# networks:
# - kafka-nw
# healthcheck:
# test: echo stat | nc zookeeper-cntr 2181
# interval: 10s
# timeout: 10s
# retries: 3
#
# kafka:
# container_name: kafka-cntr
# image: confluentinc/cp-kafka:7.2.0
# depends_on:
# - zookeeper
# ports:
# - 29092:29092
# restart: always
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ZOOKEEPER_CONNECT: zookeeper-cntr:2181
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-cntr:9092,PLAINTEXT_HOST://localhost:29092
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_ADVERTISED_HOST_NAME: localhost
# networks:
# - kafka-nw
# healthcheck:
# test: nc -vz kafka-cntr 9092 || exit -1
# # start_period: 15s
# interval: 5s
# timeout: 10s
# retries: 10
#
# init-kafka:
# image: confluentinc/cp-kafka:7.2.0
# depends_on:
# - kafka
# entrypoint: [ '/bin/sh', '-c' ]
# networks:
# - kafka-nw
# command: |
# "
# # blocks until kafka is reachable
# kafka-topics --bootstrap-server kafka-cntr:9092 --list
#
# echo -e 'Creating kafka topics'
# kafka-topics --bootstrap-server kafka-cntr:9092 --create --if-not-exists --topic codespotify-topic --replication-factor 1 --partitions 1
#
# echo -e 'Successfully created the following topics:'
# kafka-topics --bootstrap-server kafka-cntr:9092 --list
# "
#
# kafka-manager:
# image: hlebalbau/kafka-manager:stable
# depends_on:
# - kafka
# restart: always
# ports:
# - "9000:9000"
# environment:
# ZK_HOSTS: "zookeeper-cntr:2181"
# networks:
# - kafka-nw
#
#networks:
# kafka-nw:
# driver: bridge

version: '3.7'

services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: bitnami/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISER_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
depends_on:
- kafka
environment:
KAFKA_BROKERCONNECT: "kafka:9092"
ports:
- "9000:9000"


cd bitnami

cd kafka

cd bin

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic quickstart

kafka-console-producer.sh --topic quickstart --bootstrap-server localhost:9092



----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Start zookeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties


Start kafka:

./kafka-server-start.sh ../config/server.properties



1. for creating topic on cmd: (you have to be in bin folder)

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-first-topic --partitions 3 --replication-factor 1



2. for knowing the list of topics:

./kafka-topics.sh --bootstrap-server localhost:9092 --list



3. for elaborate information about topic:

./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-third-topic



4. to delete topic:

./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-first-topic






5. read messages:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning ---partition 0



6. send message:

./kafka-console-producer.sh --broker-list localhost:9092 --topic my-first-topic





7. list of groups:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list



8. information about group:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group1







==============================================================================================================================================================================================================






*** We cannot decrease number of topics, but we can increase it.



-- Kafka Broker is a program. It can be a physical computer or a virtual machine that runs kafka processes. We can start multiple brokers in a cluster. 







** There is no Kafka broker always acts as a leader. 







1. Create storage directory for Kafka cluster: inside kafka folder: 


./bin/kafka-storage.sh random-uuid


This command creates unique identifier to configure kafka cluster.



2. Format log directory: 


./bin/kafka-storage.sh format -t K8pcZzGQSeSUdMAhiDbeyg -c config/kraft/server.properties



3. Run Kafka server:


./bin/kafka-server-start.sh config/kraft/server.properties




4. To stop Kafka servers gracefully:


./bin/kafka-server-stop.sh




5. Creating Kafka Topic: first of all we have to enter to the bin folder:


./kafka-topics.sh --create --topic topic2 --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092,localhost:9094,localhost:9096




6. To show list of kafka topics:

./kafka-topics.sh --list --bootstrap-server localhost:9092


7. To show detailed information about topics:

./kafka-topics.sh --describe --bootstrap-server localhost:9092



8. Delete Kafka topic:


./kafka-topics.sh --delete --topic topic1 --bootstrap-server localhost:9092



9. Producing Kafka message without a key:


./kafka-console-producer.sh --bootstrap-server localhost:9092,localhost:9094 --topic my-topic




10. Producing Kafka message with key:

./kafka-console-producer.sh --bootstrap-server localhost:9092,localhost:9094 --topic my-topic --property "parse.key=true" --property "key.separator=:"



11. Consuming messages from Kafka topic from the beginning:

./kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092


veya 


./kafka-console-consumer.sh --topic product-created-events-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true


12. Consuming messages from Kafka topic from the beginning with key and value as well:

./kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property print.value=true











12. Add or update existing configuration:

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name topic2 --add-config min.insync.replicas=2


13. Consuming messages from Dead Letter Topic:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic product-created-events-topic.DLT --from-beginning --property print.key=true --property print.value=true


***14. Operating inside docker container:

1) docker ps

2) docker exec -it <container-id, or name> /bin/bash

3) cd opt/bitnami/kafka

4)kafka-console-consumer.sh --bootstrap-server host.docker.internal:9092 --topic product-created-events-topic.DLT --from-beginning --property print.key=true --property print.value=true

all other command are the same

*** here we must pay attention host.docker.internal, because we are inside container, if we write as above all brokers will see each others, otherwise we can write localhost:9092 this will work only for the container that we are inside


=================================================================================================================================================================================================================================


yaml file for one kafka broker with kraft:

version: '3.8'  # Updated version


services:

  kafka1:

    image: bitnami/kafka:latest  # Use Bitnami's Kafka image which supports KRaft mode

    hostname: kafka1

    container_name: kafka1

    ports:

      - "9092:9092"

    environment:

      - KAFKA_CFG_NODE_ID=1

      - KAFKA_CFG_PROCESS_ROLES=broker,controller

      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093

      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093

      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT

      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT

      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true

      - KAFKA_CFG_LOG_DIRS=/opt/bitnami/kafka/data

      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1

      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1

      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1

      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

      - ALLOW_PLAINTEXT_LISTENER=yes  # Allows plaintext communication

    volumes:

      - ./data/kafka:/opt/bitnami/kafka/data

    networks:

      - kafka-network


networks:

  kafka-network:

    driver: bridge





yaml file for multiple kafka broker with kraft with kafka-ui:

version: "3"

services:

  kafka1:

    image: bitnami/kafka

    container_name: kafka1

    ports:

      - 9092:9092  # Expose external listener port

    environment:

      - KAFKA_ENABLE_KRAFT=yes

      - KAFKA_CFG_PROCESS_ROLES=broker,controller

      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

      - KAFKA_CFG_LISTENERS=INTERNAL://:9093,EXTERNAL://:9092,CONTROLLER://:29093

      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT

      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka1:9093,EXTERNAL://localhost:9092

      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      - KAFKA_BROKER_ID=1

      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:29093,2@kafka2:29093,3@kafka3:29093

      - ALLOW_PLAINTEXT_LISTENER=yes

      - KAFKA_CFG_NODE_ID=1

      - KAFKA_KRAFT_CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk  # Replace with your cluster ID

    volumes:

      - ./kafka1:/bitnami/kafka

  kafka2:

    image: bitnami/kafka

    container_name: kafka2

    ports:

      - 9093:9093  # Expose external listener port

    environment:

      - KAFKA_ENABLE_KRAFT=yes

      - KAFKA_CFG_PROCESS_ROLES=broker,controller

      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

      - KAFKA_CFG_LISTENERS=INTERNAL://:9094,EXTERNAL://:9093,CONTROLLER://:29093

      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT

      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka2:9094,EXTERNAL://localhost:9093

      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      - KAFKA_BROKER_ID=2

      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:29093,2@kafka2:29093,3@kafka3:29093

      - ALLOW_PLAINTEXT_LISTENER=yes

      - KAFKA_CFG_NODE_ID=2

      - KAFKA_KRAFT_CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk  # Use the same cluster ID as kafka1

    volumes:

      - ./kafka2:/bitnami/kafka

  kafka3:

    image: bitnami/kafka

    container_name: kafka3

    ports:

      - 9094:9094  # Expose external listener port

    environment:

      - KAFKA_ENABLE_KRAFT=yes

      - KAFKA_CFG_PROCESS_ROLES=broker,controller

      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER

      - KAFKA_CFG_LISTENERS=INTERNAL://:9095,EXTERNAL://:9094,CONTROLLER://:29093

      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT

      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka3:9095,EXTERNAL://localhost:9094

      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      - KAFKA_BROKER_ID=3

      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:29093,2@kafka2:29093,3@kafka3:29093

      - ALLOW_PLAINTEXT_LISTENER=yes

      - KAFKA_CFG_NODE_ID=3

      - KAFKA_KRAFT_CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk  # Use the same cluster ID as kafka1

    volumes:

      - ./kafka3:/bitnami/kafka

  kafka-ui:

    image: provectuslabs/kafka-ui:latest

    container_name: kafka-ui

    ports:

      - 8080:8080

    environment:

      KAFKA_CLUSTERS_0_NAME: local

      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9093,kafka2:9094,kafka3:9095

      KAFKA_CLUSTERS_0_ZOOKEEPER: ""  # Empty since you're using KRaft, not Zookeeper

      KAFKA_CLUSTERS_0_KAFKA_CONNECT_ENABLED: "false"

      KAFKA_CLUSTERS_0_SCHEMAREGISTRY_ENABLED: "false"

    depends_on:

      - kafka1

      - kafka2

      - kafka3



-----------------------------

version: "3.8"
services:
  kafka-1:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=97exVv29T2icAhT2SNEEDQ
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9090,CONTROLLER://:9091,EXTERNAL://:9092,EXTERNAL_CLIENT://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9090,EXTERNAL://kafka-1:9092,EXTERNAL_CLIENT://localhost:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_CLIENT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    volumes:
      - ./volumes/kafka/server-1:/bitnami/kafka

  kafka-2:
    image: bitnami/kafka:latest
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_KRAFT_CLUSTER_ID=97exVv29T2icAhT2SNEEDQ
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9090,CONTROLLER://:9091,EXTERNAL://:9094,EXTERNAL_CLIENT://:29094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-2:9090,EXTERNAL://kafka-2:9094,EXTERNAL_CLIENT://localhost:29094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_CLIENT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    volumes:
      - ./volumes/kafka/server-2:/bitnami/kafka

  kafka-3:
    image: bitnami/kafka:latest
    ports:
      - "9096:9096"
      - "29096:29096"
    environment:
      - KAFKA_CFG_NODE_ID=3
      - KAFKA_KRAFT_CLUSTER_ID=97exVv29T2icAhT2SNEEDQ
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9090,CONTROLLER://:9091,EXTERNAL://:9096,EXTERNAL_CLIENT://:29096
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-3:9090,EXTERNAL://kafka-3:9096,EXTERNAL_CLIENT://localhost:29096
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL_CLIENT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    volumes:
      - ./volumes/kafka/server-3:/bitnami/kafka

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:9092,kafka-2:9094,kafka-3:9096
      - KAFKA_CLUSTERS_0_ZOOKEEPER=""
      - KAFKA_CLUSTERS_0_KAFKA_CONNECT_ENABLED=false
      - KAFKA_CLUSTERS_0_SCHEMAREGISTRY_ENABLED=false
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3






































Комментарии

Популярные сообщения из этого блога

Lesson1: JDK, JVM, JRE

SE_21_Lesson_11: Inheritance, Polymorphism

SE_21_Lesson_9: Initialization Blocks, Wrapper types, String class