Kafka como fila
Docker kafka + gui
Padrão: <app type>.<app name>.<dataset name>.<stage of processing>
Onde:
<app type>: ETL, queuing, tracking, user, data push, streaming, etc;
<app name>: nome da aplicação;
<dataset name>: nome da sua coleção de dados, similar ao banco de dados;
<stage of processing>: estágio de processamento do dado, exemplo: created, filtered, partitioned, joined, edited, etc;
Exemplo: suponha que sua aplicação chame "member-get-member" e outras aplicações estão interessadas em saber sobre o evento de um "link" "criado".
topic: streaming.mgm.link.created
Veja aqui mais sobre padronização dos tópicos.
final var recordTimestamp = Instant.ofEpochMilli(record.timestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime();
final var kafkaProblemDate = LocalDateTime.parse("2019-11-21T12:00:00");
if(recordTimestamp.isBefore(kafkaProblemDate)){
return ;
}
auto.commit.enable
auto.commit.interval.ms
Se voce demorar demais max.poll.interval.ms
para chamar a o proximo lote de mensagens para serem processadas .poll()
entao vai perder a chance de processar essas mensagens.
Se voce usar retentativas entao vai piorar ainda mais esse problema, nesse caso deve diminuir a quantidade de registros por chamada .poll()
ou/e aumentar o tempo do max.poll.interval.ms
que vai permitir que o consumidor fique mais tempo sem chamar o .poll()
, (retentando uma mensagem por exemplo)
max.poll.interval.ms
É o tempo que voce pode ficar sem chamar o .poll()
. Senao chamar nesse intervalo o kafka vai escolher outro consumidor
session.timeout.ms
é o tempo que a thread secundaria do consumidor pode ficar sem mandar healthcheck. Se o kafka nao receber healthcheck nesse intervalo vai escolher outro consumidor, se eu consumidor morrer de forma inesperada agora o kafka vai ficar session.timeout.ms
tempo achando que você está vivo
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
https://docs.spring.io/spring-kafka/reference/htmlsingle/#stateful-retry
max.poll.interval.ms=300000#5 min
max.poll.records=500
session.timeout.ms=10s
group.max.session.timeout.ms=5min
session.timeout.ms
tem que mudar tambem o group.max.session.timeout.ms
Quando ativo mensagens que foram recebidas do poll a auto.commit.interval.ms
de tempo vao ser comitadas automaticamente
Writes
To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader. The leader writes the message to its local log. Each follower constantly pulls new messages from the leader using a single socket channel. That way, the follower receives all messages in the same order as written in the leader. The follower writes each received message to its own log and sends an acknowledgment back to the leader. Once the leader receives the acknowledgment from all replicas in ISR, the message is committed. The leader advances the HW and sends an acknowledgment to the client. For better performance, each follower sends an acknowledgment after the message is written to memory. So, for each committed message, we guarantee that the message is stored in multiple replicas in memory. However, there is no guarantee that any replica has persisted the commit message to disks though. Given that correlated failures are relatively rare, this approach gives us a good balance between response time and durability. In the future, we may consider adding options that provide even stronger guarantees. The leader also periodically broadcasts the HW to all followers. The broadcasting can be piggybacked on the return value of the fetch requests from the followers. From time to time, each replica checkpoints its HW to its disk.
Reads
For simplicity, reads are always served from the leader. Only messages up to the HW are exposed to the reader.
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Ping
Nao eh efetivo pois as mensagens expiradas continuam contando
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic page_detail --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic ElvisTest
$ ./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic ElvisTest
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
./bin/kafka-topics.sh --delete --zookeeper zookeeper.intranet:2181 --topic page_detail
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
./bin/kafka-topics.sh --zookeeper zookeeper.intranet:2181 --alter --config retention.ms=1000 --topic PEDREIRO_CRAWLER
ou
./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --add-config retention.ms=1000 --entity-name TELELISTAS_PROFESSIONAL_CRAWLER
depois remova o ttl
./bin/kafka-topics.sh --zookeeper zookeeper.intranet:2181 --alter --delete-config retention.ms --topic PEDREIRO_CRAWLER
ou
./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --delete-config retention.ms --entity-name TELELISTAS_PROFESSIONAL_CRAWLER
echo 'Nome do tópico: ' && read topic && ./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --add-config retention.ms=1000 --entity-name $topic \
&& sleep 2 && ./bin/kafka-configs.sh --entity-type topics --zookeeper zookeeper.intranet:2181 --alter --delete-config retention.ms --entity-name $topic
./bin/kafka-consumer-groups.sh --zookeeper zookeeper.intranet:2181 --delete --group telelistas
./bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group telelistas
./bin/kafka-consumer-groups.sh --bootstrap-server zookeeper.intranet:9092 --execute --reset-offsets --to-earliest --group mg-mining --topic PROFESSIONAL
--to-offset 305
--to-earliest
--to-latest
....
docker exec -it stg-kafka bash
curl https://gist.githubusercontent.com/mageddo/2af979c587bd66617c8156149cbe4fea/raw/133dd11e34b9c8deab1aa8eb439cc9ed5054c96d/kafka-utils | sh -
No kafka mesmo com varios consumidores é possivel ter uma ordem de consumo baseado na key
, a key é como a JMSGroupID
do JMS, com base na key passada é gerada uma hash que escolhe um consumidor para ser o correspondente e assim todas as mensagens com essa key passam a ir exclusivamente para este consumidor mantendo assim a ordem de consumo entre as mensagens de mesma key, ex:
Criando um topico com 2 partiçoes
$ ./bin/kafka-topics.sh --create --zookeeper zookeeper.intranet:2181 --replication-factor 1 --partitions 3 --topic Ping
Postando mensagens no tópico especificando as keys
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Ping --property "parse.key=true" --property "key.separator=:"
a:test2
a:test3
b:test4
b:test5
a:test4
a:test6
c:test7
d:test8
Consumidor de group 1 / consumir
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic Ping
test2
test3
test4
test5
test4
test6
test7
test8
Consumidor 1 de group 2
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Ping --partition 1
test2
test3
test4
test6
test7
Consumidor 2 de group 2
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Ping --partition 2
test4
test5
test8
Specify group
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer-property "group.id=a"
String topic = "Ping";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.dev:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
KafkaConsumer c = new KafkaConsumer(props);
c.subscribe(Arrays.asList(topic));
System.out.println(c.poll(5000).count());
int sum = 0;
final List<PartitionInfo> partitions = c.partitionsFor(topic);
for (PartitionInfo partition : partitions) {
final TopicPartition topicPartition = new TopicPartition(topic, partition.partition());
c.seekToEnd(Arrays.asList(topicPartition));
final long max = c.position(topicPartition);
c.seekToBeginning(Arrays.asList(topicPartition));
final long min = c.position(topicPartition);
sum += max - min;
}
System.out.println("size=" + sum);
final List<TopicPartition> partitionsList = partitions.stream()
.map(partition -> new TopicPartition(topic, partition.partition()))
.collect(Collectors.toList());
final Map<TopicPartition, Long> begin = c.beginningOffsets(partitionsList);
final Map<TopicPartition, Long> end = c.endOffsets(partitionsList);
sum = 0;
for (TopicPartition topicPartition : begin.keySet()) {
sum += end.get(topicPartition) - begin.get(topicPartition);
}
System.out.println("size2=" + sum);
.get()
da Future que vem como resultado ou o .flush()
do produto esses dois vao mandar na mesma hora e esperar o ack do broker..flush
não respeita o linger.ms
.get
do Future do produtor espera o linger.ms
antes de postarAs triggers para a postagem assincrona fazer flush são duas
batch.size
default 16384linger.ms
em tempo default 0O que acontecer primeiro chama o flush, se o buffer encher antes de passar o tempo desde a primeira mensagem postada no buffer ou se o tempo chegar e o buffer ainda nao estiver cheio.
No spring funciona da mesma forma
isolation.level
de READ_UNCOMMITED
para READ_COMMITED
./bin/kafka-console-consumer.sh --isolation-level read_committed --bootstrap-server localhost:9092 --from-beginning --topic MyTopic
Vai ser necessário subir pelo menos 3 brokers ou mudar o parametro transaction.state.log.replication.factor
para 2 no servidor do kafka que o mínimo de replicacao para poder usar transações com o kafka. Exemplo de codigo
# retry post until it works
spring.kafka.properties.retries=2147483647
# if couldn't connect to kafka try again until it works (it blocks even async post)
spring.kafka.properties.max.block.ms=9223372036854775807
spring.kafka.properties.session.timeout.ms=30000
spring.kafka.properties.max.poll.interval.ms=1200000
spring.kafka.consumer.enable-auto-commit=false
Using TransactionSynchronizationManager granting ACK and increasing performance
m=transactionalPost l=36 status=transactional-complete, time=1342
m=beforeCommit l=66 m=send, status=committed, records=50000, time=0
Using sync .get()
m=synPost l=48 status=sync-complete, time=16457
-e Exit successfully when last message received
-s value=<serdes> Deserialize non-NULL values using <serdes>.
-o <offset> Offset to start consuming from:
-p <partition> Partition
kafkacat -b localhost:9092 -t 'financial-movement-topic'
kcat -C -b bootstrap_server \
-t topic \
-r schema_registry \
-o end \
-s value=avro
kcat -C -b bootstrap_server \
-t topic \
-r schema_registry \
-p 0 -o -1 -s value=avro -e