Kafka
1. Create topic. Go to bin folder in kafka:
kafka-topics.sh --create --topic test --bootstrap-server localhost:19092
--replication-factor 1 --partitions 1
2. Describe topic. Get information about topic:
kafka-topics.sh --describe --topic test --bootstrap-server localhost:19092
by sending the message again. For example, a connection error can be resolved
because the connection may get reestablished. A “no leader” error can be resolved
when a new leader is elected for the partition. KafkaProducer can be configured to
retry those errors automatically, so the application code will get retriable exceptions
only when the number of retries was exhausted and the error was not resolved. Some
errors will not be resolved by retrying. For example, “message size too large.” In those
cases, KafkaProducer will not attempt a retry and will return the exception immedi‐
ately.
The acks parameter controls how many partition replicas must receive the record
before the producer can consider the write successful. This option has a significant
impact on how likely messages are to be lost. There are three allowed values for the
acks parameter:
• If acks=0, the producer will not wait for a reply from the broker before assuming
the message was sent successfully. This means that if something went wrong and
48 | Chapter 3: Kafka Producers: Writing Messages to Kafkathe broker did not receive the message, the producer will not know about it and
the message will be lost. However, because the producer is not waiting for any
response from the server, it can send messages as fast as the network will support,
so this setting can be used to achieve very high throughput.
• If acks=1, the producer will receive a success response from the broker the
moment the leader replica received the message. If the message can’t be written
to the leader (e.g., if the leader crashed and a new leader was not elected yet), the
producer will receive an error response and can retry sending the message,
avoiding potential loss of data. The message can still get lost if the leader crashes
and a replica without this message gets elected as the new leader (via unclean
leader election). In this case, throughput depends on whether we send messages
synchronously or asynchronously. If our client code waits for a reply from the
server (by calling the get() method of the Future object returned when sending
a message) it will obviously increase latency significantly (at least by a network
roundtrip). If the client uses callbacks, latency will be hidden, but throughput will
be limited by the number of in-flight messages (i.e., how many messages the pro‐
ducer will send before receiving replies from the server).
• If acks=all, the producer will receive a success response from the broker once all
in-sync replicas received the message. This is the safest mode since you can make
sure more than one broker has the message and that the message will survive
even in the case of crash (more information on this in Chapter 5). However, the
latency we discussed in the acks=1 case will be even higher, since we will be wait‐
ing for more than just one broker to receive the message.
When the producer receives an error message from the server, the error could be
transient (e.g., a lack of leader for a partition). In this case, the value of the retries
parameter will control how many times the producer will retry sending the message
before giving up and notifying the client of an issue. By default, the producer will wait
100ms between retries, but you can control this using the retry.backoff.ms parame‐
ter. We recommend testing how long it takes to recover from a crashed broker (i.e.,
how long until all partitions get new leaders) and setting the number of retries and
delay between them such that the total amount of time spent retrying will be longer
than the time it takes the Kafka cluster to recover from the crash—otherwise, the pro‐
ducer will give up too soon. Not all errors will be retried by the producer. Some errors
are not transient and will not cause retries (e.g., “message too large” error). In general,
because the producer handles retries for you, there is no point in handling retries
within your own application logic. You will want to focus your efforts on handling
nonretriable errors or cases where retry attempts were exhausted.
linger.ms controls the amount of time to wait for additional messages before send‐
ing the current batch. KafkaProducer sends a batch of messages either when the cur‐
rent batch is full or when the linger.ms limit is reached. By default, the producer will
send messages as soon as there is a sender thread available to send them, even if
there’s just one message in the batch. By setting linger.ms higher than 0, we instruct
the producer to wait a few milliseconds to add additional messages to the batch
before sending it to the brokers. This increases latency but also increases throughput
(because we send more messages at once, there is less overhead per message).
This can be any string, and will be used by the brokers to identify messages sent from
the client. It is used in logging and metrics, and for quotas.
This controls how many messages the producer will send to the server without
receiving responses. Setting this high can increase memory usage while improving
throughput, but setting it too high can reduce throughput as batching becomes less
efficient. Setting this to 1 will guarantee that messages will be written to the broker in
the order in which they were sent, even when retries occur.
Apache Kafka preserves the order of messages within a partition.
This means that if messages were sent from the producer in a spe‐
cific order, the broker will write them to a partition in that order
and all consumers will read them in that order. For some use cases,
order is very important. There is a big difference between deposit‐
ing $100 in an account and later withdrawing it, and the other way
around! However, some use cases are less sensitive.
Setting the retries parameter to nonzero and the
max.in.flights.requests.per.session to more than one means
that it is possible that the broker will fail to write the first batch of
messages, succeed to write the second (which was already in-
flight), and then retry the first batch and succeed, thereby reversing
the order.
Usually, setting the number of retries to zero is not an option in a
reliable system, so if guaranteeing order is critical, we recommend
setting in.flight.requests.per.session=1 to make sure that
while a batch of messages is retrying, additional messages will not
be sent (because this has the potential to reverse the correct order).
This will severely limit the throughput of the producer, so only use
this when order is important.
You can’t have multiple consumers that belong to the same group
in one thread and you can’t have multiple threads safely use the
same consumer. One consumer per thread is the rule. To run mul‐
tiple consumers in the same group in one application, you will
need to run each in its own thread. It is useful to wrap the con‐
sumer logic in its own object and then use Java’s ExecutorService
to start multiple threads each with its own consumer. The Conflu‐
ent blog has a tutorial that shows how to do just that.
package io.conductor.demos.kafkabasics.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
@Slf4j
public class ProducerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "hello world");
producer.send(producerRecord);
producer.flush();
producer.close();
}
}
package io.conductor.demos.kafkabasics.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
@Slf4j
public class ProducerDemoWithCallback {
public static void main(String[] args) {
log.info("I am a Kafka Producer");
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "message: " + i);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
log.info("Received new metadata \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
}
producer.flush();
producer.close();
}
}
package io.conductor.demos.kafkabasics.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
@Slf4j
public class ProducerDemoWithCallback {
public static void main(String[] args) {
log.info("I am a Kafka Producer");
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
// properties.put("batch.size", 400);
properties.put("partitioner.class", RoundRobinPartitioner.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int a = 0; a < 10; a++) {
for (int i = 0; i < 30; i++) {
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("demo_java", "message: " + i);
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
log.info("Received new metadata \n" +
"Topic: " + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
}
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
package io.conductor.demos.consumerdemo.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
@Slf4j
public class ConsumerDemoCooperative {
public static void main(String[] args) {
log.info("I am a Kafka Consumer");
String topic = "demo_java";
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:19092");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
properties.put("group.id", "my-java-application");
properties.put("auto.offset.reset", "earliest");
properties.put("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
final Thread mainThread = Thread.currentThread();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
log.info("Detected a shutdown");
consumer.wakeup();
try {
mainThread.join();
} catch (Exception e) {
e.printStackTrace();
}
}
});
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
log.info("Polling");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + " Value: " + record.value());
log.info("Partition: " + record.partition() + " Offset: " + record.offset());
}
}
} catch (WakeupException e) {
log.info("Consumer is starting to shutdown");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close();
log.info("The consumer is now gracefully shut down");
}
}
}
version: '3.8'
services:
kafka-1:
image: bitnami/kafka:latest
container_name: kafka-1
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_KRAFT_CLUSTER_ID: 97exVv29T2icAhT2SNEEDQ
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091"
KAFKA_CFG_LISTENERS: "CONTROLLER://:9091,DOCKER://:9092,HOST://:19092"
KAFKA_CFG_ADVERTISED_LISTENERS: "CONTROLLER://kafka-1:9091,DOCKER://kafka-1:9092,HOST://localhost:19092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "DOCKER"
ports:
- "9092:9092" # DOCKER listener
- "19092:19092" # HOST listener
volumes:
- ./volumes/kafka/server-1:/bitnami/kafka
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "kafka-1:9092", "--list"]
interval: 20s
timeout: 10s
retries: 5
kafka-2:
image: bitnami/kafka:latest
container_name: kafka-2
environment:
KAFKA_CFG_NODE_ID: 2
KAFKA_KRAFT_CLUSTER_ID: 97exVv29T2icAhT2SNEEDQ
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091"
KAFKA_CFG_LISTENERS: "CONTROLLER://:9091,DOCKER://:9094,HOST://:19094"
KAFKA_CFG_ADVERTISED_LISTENERS: "CONTROLLER://kafka-2:9091,DOCKER://kafka-2:9094,HOST://localhost:19094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "DOCKER"
ports:
- "9094:9094" # DOCKER listener
- "19094:19094" # HOST listener
volumes:
- ./volumes/kafka/server-2:/bitnami/kafka
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "kafka-2:9094", "--list"]
interval: 20s
timeout: 10s
retries: 5
kafka-3:
image: bitnami/kafka:latest
container_name: kafka-3
environment:
KAFKA_CFG_NODE_ID: 3
KAFKA_KRAFT_CLUSTER_ID: 97exVv29T2icAhT2SNEEDQ
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9091,2@kafka-2:9091,3@kafka-3:9091"
KAFKA_CFG_LISTENERS: "CONTROLLER://:9091,DOCKER://:9096,HOST://:19096"
KAFKA_CFG_ADVERTISED_LISTENERS: "CONTROLLER://kafka-3:9091,DOCKER://kafka-3:9096,HOST://localhost:19096"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "DOCKER"
ports:
- "9096:9096" # DOCKER listener
- "19096:19096" # HOST listener
volumes:
- ./volumes/kafka/server-3:/bitnami/kafka
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "kafka-3:9096", "--list"]
interval: 20s
timeout: 10s
retries: 5
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
container_name: schema-registry
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9094,PLAINTEXT://kafka-3:9096
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
ports:
- "8081:8081"
networks:
- kafka-net
depends_on:
- kafka-1
- kafka-2
- kafka-3
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081"]
interval: 30s
timeout: 10s
retries: 3
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:9092,kafka-2:9094,kafka-3:9096
KAFKA_CLUSTERS_0_ZOOKEEPER: ""
KAFKA_CLUSTERS_0_KAFKA_CONNECT_ENABLED: "false"
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: "http://schema-registry:8081"
KAFKA_CLUSTERS_0_SCHEMAREGISTRY_ENABLED: "true"
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
networks:
- kafka-net
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080"]
interval: 30s
timeout: 20s
retries: 3
networks:
promotion-service-network:
kafka-net:
name: kafka-net
volumes:
kafka_server_1_data:
kafka_server_2_data:
kafka_server_3_data:
Комментарии
Отправить комментарий