Kafka

 1. Create topic. Go to bin folder in kafka:

kafka-topics.sh --create --topic test --bootstrap-server localhost:19092

--replication-factor 1 --partitions 1

2. Describe topic. Get information about topic:

kafka-topics.sh --describe --topic test --bootstrap-server localhost:19092

3. Produce message:

kafka-console-producer.sh --topic test --bootstrap-server localhost:19092

4. Consume message:

kafka-console-consumer.sh --topic test --bootstrap-server localhost:19092 --from-beginning


Keep in mind that the number of partitions for a topic can only be increased, never decreased.


KafkaProducer has two types of errors. Retriable errors are those that can be resolved

by sending the message again. For example, a connection error can be resolved

because the connection may get reestablished. A “no leader” error can be resolved

when a new leader is elected for the partition. KafkaProducer can be configured to

retry those errors automatically, so the application code will get retriable exceptions

only when the number of retries was exhausted and the error was not resolved. Some

errors will not be resolved by retrying. For example, “message size too large.” In those

cases, KafkaProducer will not attempt a retry and will return the exception immedi‐

ately.




*acks

The acks parameter controls how many partition replicas must receive the record

before the producer can consider the write successful. This option has a significant

impact on how likely messages are to be lost. There are three allowed values for the

acks parameter:

• If acks=0, the producer will not wait for a reply from the broker before assuming

the message was sent successfully. This means that if something went wrong and

48 | Chapter 3: Kafka Producers: Writing Messages to Kafkathe broker did not receive the message, the producer will not know about it and

the message will be lost. However, because the producer is not waiting for any

response from the server, it can send messages as fast as the network will support,

so this setting can be used to achieve very high throughput.

• If acks=1, the producer will receive a success response from the broker the

moment the leader replica received the message. If the message can’t be written

to the leader (e.g., if the leader crashed and a new leader was not elected yet), the

producer will receive an error response and can retry sending the message,

avoiding potential loss of data. The message can still get lost if the leader crashes

and a replica without this message gets elected as the new leader (via unclean

leader election). In this case, throughput depends on whether we send messages

synchronously or asynchronously. If our client code waits for a reply from the

server (by calling the get() method of the Future object returned when sending

a message) it will obviously increase latency significantly (at least by a network

roundtrip). If the client uses callbacks, latency will be hidden, but throughput will

be limited by the number of in-flight messages (i.e., how many messages the pro‐

ducer will send before receiving replies from the server).

• If acks=all, the producer will receive a success response from the broker once all

in-sync replicas received the message. This is the safest mode since you can make

sure more than one broker has the message and that the message will survive

even in the case of crash (more information on this in Chapter 5). However, the

latency we discussed in the acks=1 case will be even higher, since we will be wait‐

ing for more than just one broker to receive the message.






*retries

When the producer receives an error message from the server, the error could be

transient (e.g., a lack of leader for a partition). In this case, the value of the retries

parameter will control how many times the producer will retry sending the message

before giving up and notifying the client of an issue. By default, the producer will wait

100ms between retries, but you can control this using the retry.backoff.ms parame‐

ter. We recommend testing how long it takes to recover from a crashed broker (i.e.,

how long until all partitions get new leaders) and setting the number of retries and

delay between them such that the total amount of time spent retrying will be longer

than the time it takes the Kafka cluster to recover from the crash—otherwise, the pro‐

ducer will give up too soon. Not all errors will be retried by the producer. Some errors

are not transient and will not cause retries (e.g., “message too large” error). In general,

because the producer handles retries for you, there is no point in handling retries

within your own application logic. You will want to focus your efforts on handling

nonretriable errors or cases where retry attempts were exhausted.






*linger.ms

linger.ms controls the amount of time to wait for additional messages before send‐

ing the current batch. KafkaProducer sends a batch of messages either when the cur‐

rent batch is full or when the linger.ms limit is reached. By default, the producer will

send messages as soon as there is a sender thread available to send them, even if

there’s just one message in the batch. By setting linger.ms higher than 0, we instruct

the producer to wait a few milliseconds to add additional messages to the batch

before sending it to the brokers. This increases latency but also increases throughput

(because we send more messages at once, there is less overhead per message).




*client.id

This can be any string, and will be used by the brokers to identify messages sent from

the client. It is used in logging and metrics, and for quotas.





*max.in.flight.requests.per.connection

This controls how many messages the producer will send to the server without

receiving responses. Setting this high can increase memory usage while improving

throughput, but setting it too high can reduce throughput as batching becomes less

efficient. Setting this to 1 will guarantee that messages will be written to the broker in

the order in which they were sent, even when retries occur.




*Ordering Guarantees

Apache Kafka preserves the order of messages within a partition.

This means that if messages were sent from the producer in a spe‐

cific order, the broker will write them to a partition in that order

and all consumers will read them in that order. For some use cases,

order is very important. There is a big difference between deposit‐

ing $100 in an account and later withdrawing it, and the other way

around! However, some use cases are less sensitive.

Setting the retries parameter to nonzero and the

max.in.flights.requests.per.session to more than one means

that it is possible that the broker will fail to write the first batch of

messages, succeed to write the second (which was already in-

flight), and then retry the first batch and succeed, thereby reversing

the order.

Usually, setting the number of retries to zero is not an option in a

reliable system, so if guaranteeing order is critical, we recommend

setting in.flight.requests.per.session=1 to make sure that

while a batch of messages is retrying, additional messages will not

be sent (because this has the potential to reverse the correct order).

This will severely limit the throughput of the producer, so only use

this when order is important.





*Thread Safety

You can’t have multiple consumers that belong to the same group

in one thread and you can’t have multiple threads safely use the

same consumer. One consumer per thread is the rule. To run mul‐

tiple consumers in the same group in one application, you will

need to run each in its own thread. It is useful to wrap the con‐

sumer logic in its own object and then use Java’s ExecutorService

to start multiple threads each with its own consumer. The Conflu‐

ent blog has a tutorial that shows how to do just that.






























Simple Producer:

package io.conductor.demos.kafkabasics.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

@Slf4j
public class ProducerDemo {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "hello world");
producer.send(producerRecord);
producer.flush();
producer.close();
}

}




package io.conductor.demos.kafkabasics.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

@Slf4j
public class ProducerDemoWithCallback {

public static void main(String[] args) {
log.info("I am a Kafka Producer");
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "message: " + i);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
log.info("Received new metadata \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
}
producer.flush();
producer.close();
}

}












package io.conductor.demos.kafkabasics.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

@Slf4j
public class ProducerDemoWithCallback {

public static void main(String[] args) {
log.info("I am a Kafka Producer");
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
// properties.put("batch.size", 400);
properties.put("partitioner.class", RoundRobinPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

for (int a = 0; a < 10; a++) {
for (int i = 0; i < 30; i++) {
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "message: " + i);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
log.info("Received new metadata \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
}
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.flush();
producer.close();
}

}
























package io.conductor.demos.consumerdemo.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

@Slf4j
public class ConsumerDemoCooperative {

public static void main(String[] args) {
log.info("I am a Kafka Consumer");

String topic = "demo_java";

Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
properties.put("group.id", "my-java-application");
properties.put("auto.offset.reset", "earliest");
properties.put("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
log.info("Detected a shutdown");
consumer.wakeup();
try {
mainThread.join();
} catch (Exception e) {
e.printStackTrace();
}
}
});

consumer.subscribe(Arrays.asList(topic));

try {

while (true) {
log.info("Polling");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + " Value: " + record.value());
log.info("Partition: " + record.partition() + " Offset: " + record.offset());
}
}
} catch (WakeupException e) {
log.info("Consumer is starting to shutdown");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close();
log.info("The consumer is now gracefully shut down");
}
}

}














Docker compose file with Kraft mode, with 3 Kafka servers, KafkaUI and Schema Registry:


version: '3.8'

services:
kafka-1:
image: bitnami/kafka:latest
container_name: kafka-1
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_KRAFT_CLUSTER_ID: 97exVv29T2icAhT2SNEEDQ
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091"
KAFKA_CFG_LISTENERS: "CONTROLLER://:9091,DOCKER://:9092,HOST://:19092"
KAFKA_CFG_ADVERTISED_LISTENERS: "CONTROLLER://kafka-1:9091,DOCKER://kafka-1:9092,HOST://localhost:19092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "DOCKER"
ports:
- "9092:9092" # DOCKER listener
- "19092:19092" # HOST listener
volumes:
- ./volumes/kafka/server-1:/bitnami/kafka
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "kafka-1:9092", "--list"]
interval: 20s
timeout: 10s
retries: 5

kafka-2:
image: bitnami/kafka:latest
container_name: kafka-2
environment:
KAFKA_CFG_NODE_ID: 2
KAFKA_KRAFT_CLUSTER_ID: 97exVv29T2icAhT2SNEEDQ
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091"
KAFKA_CFG_LISTENERS: "CONTROLLER://:9091,DOCKER://:9094,HOST://:19094"
KAFKA_CFG_ADVERTISED_LISTENERS: "CONTROLLER://kafka-2:9091,DOCKER://kafka-2:9094,HOST://localhost:19094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "DOCKER"
ports:
- "9094:9094" # DOCKER listener
- "19094:19094" # HOST listener
volumes:
- ./volumes/kafka/server-2:/bitnami/kafka
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "kafka-2:9094", "--list"]
interval: 20s
timeout: 10s
retries: 5

kafka-3:
image: bitnami/kafka:latest
container_name: kafka-3
environment:
KAFKA_CFG_NODE_ID: 3
KAFKA_KRAFT_CLUSTER_ID: 97exVv29T2icAhT2SNEEDQ
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091"
KAFKA_CFG_LISTENERS: "CONTROLLER://:9091,DOCKER://:9096,HOST://:19096"
KAFKA_CFG_ADVERTISED_LISTENERS: "CONTROLLER://kafka-3:9091,DOCKER://kafka-3:9096,HOST://localhost:19096"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "DOCKER"
ports:
- "9096:9096" # DOCKER listener
- "19096:19096" # HOST listener
volumes:
- ./volumes/kafka/server-3:/bitnami/kafka
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "kafka-3:9096", "--list"]
interval: 20s
timeout: 10s
retries: 5

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
container_name: schema-registry
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9094,PLAINTEXT://kafka-3:9096
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
ports:
- "8081:8081"
networks:
- kafka-net
depends_on:
- kafka-1
- kafka-2
- kafka-3
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081"]
interval: 30s
timeout: 10s
retries: 3

kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9094,kafka-3:9096
KAFKA_CLUSTERS_0_ZOOKEEPER: ""
KAFKA_CLUSTERS_0_KAFKA_CONNECT_ENABLED: "false"
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: "http://schema-registry:8081"
KAFKA_CLUSTERS_0_SCHEMAREGISTRY_ENABLED: "true"
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080"]
interval: 30s
timeout: 20s
retries: 3

networks:
promotion-service-network:
kafka-net:
name: kafka-net

volumes:
kafka_server_1_data:
kafka_server_2_data:
kafka_server_3_data:








Комментарии

Популярные сообщения из этого блога

Lesson1: JDK, JVM, JRE

IoC:ApplicationContext, BeanFactory. Bean

Preparation for Java interview