Kafka Commands

Published: 2019-08-04, Updated: 2020-05-09

Links

Libs de consumo de kafka

Padrão para nomenclatura de tópicos

Padrão: <app type>.<app name>.<dataset name>.<stage of processing>

Onde:

<app type>: ETL, queuing, tracking, user, data push, streaming, etc;
<app name>: nome da aplicação;
<dataset name>: nome da sua coleção de dados, similar ao banco de dados;
<stage of processing>: estágio de processamento do dado, exemplo: created, filtered, partitioned, joined, edited, etc;
Exemplo: suponha que sua aplicação chame "member-get-member" e outras aplicações estão interessadas em saber sobre o evento de um "link" "criado".

topic: streaming.mgm.link.created
Veja aqui mais sobre padronização dos tópicos.

Consumir a partir de uma data

final var recordTimestamp = Instant.ofEpochMilli(record.timestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime();
final var kafkaProblemDate = LocalDateTime.parse("2019-11-21T12:00:00");
if(recordTimestamp.isBefore(kafkaProblemDate)){
	return ;
}

Conceitos

auto.commit.enable
auto.commit.interval.ms
max.poll.interval.ms=300000#5 min
max.poll.records=500
session.timeout.ms=10s
group.max.session.timeout.ms=5min

Quando ativo mensagens que foram recebidas do poll a auto.commit.interval.ms de tempo vao ser comitadas automaticamente

Replicação de dados

Writes

To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader. The leader writes the message to its local log. Each follower constantly pulls new messages from the leader using a single socket channel. That way, the follower receives all messages in the same order as written in the leader. The follower writes each received message to its own log and sends an acknowledgment back to the leader. Once the leader receives the acknowledgment from all replicas in ISR, the message is committed. The leader advances the HW and sends an acknowledgment to the client. For better performance, each follower sends an acknowledgment after the message is written to memory. So, for each committed message, we guarantee that the message is stored in multiple replicas in memory. However, there is no guarantee that any replica has persisted the commit message to disks though. Given that correlated failures are relatively rare, this approach gives us a good balance between response time and durability. In the future, we may consider adding options that provide even stronger guarantees. The leader also periodically broadcasts the HW to all followers. The broadcasting can be piggybacked on the return value of the fetch requests from the followers. From time to time, each replica checkpoints its HW to its disk.

Reads

For simplicity, reads are always served from the leader. Only messages up to the HW are exposed to the reader.

Topic

Postar mensagens no topico

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Ping

Conta mensagens do topico

Nao eh efetivo pois as mensagens expiradas continuam contando

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic page_detail --time -1 --offsets 1  | awk -F  ":" '{sum += $3} END {print sum}'

Criar um topic

$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic ElvisTest

Particoes

Alterar as particoes de um topico

$ ./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic ElvisTest

Listar topicos

./bin/kafka-topics.sh --zookeeper localhost:2181 --list

Deletar um topico

./bin/kafka-topics.sh --delete --zookeeper zookeeper.intranet:2181  --topic page_detail

Mostrar as particoes do topico

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

Marcar mensagens para delecao (purge), limpar

./bin/kafka-topics.sh --zookeeper zookeeper.intranet:2181 --alter --config retention.ms=1000 --topic PEDREIRO_CRAWLER
ou
./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --add-config retention.ms=1000 --entity-name TELELISTAS_PROFESSIONAL_CRAWLER

depois remova o ttl

./bin/kafka-topics.sh --zookeeper zookeeper.intranet:2181 --alter --delete-config retention.ms --topic PEDREIRO_CRAWLER
ou
./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --delete-config retention.ms --entity-name TELELISTAS_PROFESSIONAL_CRAWLER

Purgar e esperar ser completar

echo 'Nome do tópico: ' && read topic && ./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --add-config retention.ms=1000 --entity-name $topic \
&& sleep 2 && ./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --delete-config retention.ms --entity-name $topic

Consumer Groups

Deletar um grupo

./bin/kafka-consumer-groups.sh --zookeeper zookeeper.intranet:2181 --delete --group telelistas

Mostrar estado dos consumidores e topicos / Pegar offsets das particoes do groupid / group status / consumer status

./bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group telelistas

Mover offset do consumidor

./bin/kafka-consumer-groups.sh --bootstrap-server zookeeper.intranet:9092 --execute --reset-offsets --to-earliest --group mg-mining --topic PROFESSIONAL
--to-offset 305
--to-earliest
--to-latest
....

Mover offset de n consumidores para o fim

docker exec -it stg-kafka bash
curl https://gist.githubusercontent.com/mageddo/2af979c587bd66617c8156149cbe4fea/raw/133dd11e34b9c8deab1aa8eb439cc9ed5054c96d/kafka-utils | sh -

Serializando ids entre os consumidores concorrentes

No kafka mesmo com varios consumidores é possivel ter uma ordem de consumo baseado na key, a key é como a JMSGroupID do JMS, com base na key passada é gerada uma hash que escolhe um consumidor para ser o correspondente e assim todas as mensagens com essa key passam a ir exclusivamente para este consumidor mantendo assim a ordem de consumo entre as mensagens de mesma key, ex:

Criando um topico com 2 partiçoes

$ ./bin/kafka-topics.sh --create --zookeeper zookeeper.intranet:2181 --replication-factor 1 --partitions 3 --topic Ping

Postando mensagens no tópico especificando as keys

$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Ping   --property "parse.key=true" --property "key.separator=:"
a:test2
a:test3
b:test4
b:test5
a:test4
a:test6
c:test7
d:test8

Consumidor de group 1 / consumir

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic Ping
test2
test3
test4
test5
test4
test6
test7
test8

Consumidor 1 de group 2

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Ping --partition 1
test2
test3
test4
test6
test7

Consumidor 2 de group 2

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Ping --partition 2
test4
test5
test8

Specify group

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer-property "group.id=a"

Contar mensagens do topico

String topic = "Ping";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.dev:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

KafkaConsumer c = new KafkaConsumer(props);
c.subscribe(Arrays.asList(topic));
System.out.println(c.poll(5000).count());
int sum = 0;
final List<PartitionInfo> partitions = c.partitionsFor(topic);
for (PartitionInfo partition : partitions) {
	final TopicPartition topicPartition = new TopicPartition(topic, partition.partition());
	c.seekToEnd(Arrays.asList(topicPartition));
	final long max = c.position(topicPartition);
	c.seekToBeginning(Arrays.asList(topicPartition));
	final long min = c.position(topicPartition);
	sum += max - min;
}
System.out.println("size=" + sum);

final List<TopicPartition> partitionsList = partitions.stream()
	.map(partition -> new TopicPartition(topic, partition.partition()))
	.collect(Collectors.toList());
final Map<TopicPartition, Long> begin = c.beginningOffsets(partitionsList);
final Map<TopicPartition, Long> end = c.endOffsets(partitionsList);

sum = 0;
for (TopicPartition topicPartition : begin.keySet()) {
	sum += end.get(topicPartition) - begin.get(topicPartition);
}
System.out.println("size2=" + sum);

Como funciona a postagem do produtor

Como funciona a postagem assincrona

As triggers para a postagem assincrona fazer flush são duas

O que acontecer primeiro chama o flush, se o buffer encher antes de passar o tempo desde a primeira mensagem postada no buffer ou se o tempo chegar e o buffer ainda nao estiver cheio.

No spring funciona da mesma forma

Ligando o produtor transacional
./bin/kafka-console-consumer.sh --isolation-level read_committed --bootstrap-server localhost:9092 --from-beginning --topic MyTopic

Vai ser necessário subir pelo menos 3 brokers ou mudar o parametro transaction.state.log.replication.factor para 2 no servidor do kafka que o mínimo de replicacao para poder usar transações com o kafka. Exemplo de codigo

Kafka No Data Loss

# retry post until it works
spring.kafka.properties.retries=2147483647

# if couldn't connect to kafka try again until it works (it blocks even async post)
spring.kafka.properties.max.block.ms=9223372036854775807
spring.kafka.properties.session.timeout.ms=30000
spring.kafka.properties.max.poll.interval.ms=1200000
spring.kafka.consumer.enable-auto-commit=false

Grating message ack and transactional

Using TransactionSynchronizationManager granting ACK and increasing performance

m=transactionalPost l=36 status=transactional-complete, time=1342
m=beforeCommit l=66 m=send, status=committed, records=50000, time=0

Using sync .get()

m=synPost l=48 status=sync-complete, time=16457

IO Redirection in Bash Ferramentas para testar concorrencia e performance

Comments