ESTE É UM CONTEÚDO EM DESENVOLVIMENTO E NÃO DEVE SER CONSIDERADO COMO VERSÃO FINAL!
Clique aqui para maiores informações
Schema da Mensagem Trafegada
Estrutura da Mensagem
No Apache Kafka, cada mensagem trafegada é composta por dois elementos principais: Headers e Payload.
Headers
Os headers são metadados adicionais que acompanham a mensagem, permitindo o transporte de informações auxiliares sem fazer parte do payload principal. Eles consistem em pares chave-valor.
Atributos dos Headers:
x-notification-id: Identificador único da notificação.
Tipo: UUID
Formato: String
Payload
O payload contém os dados da mensagem de notificação em formato JSON.
Atributos do Payload:
eventId: Identificador único para o evento de acordo com UUID RFC4122.
tipo: UUID
format: String
pattern: ^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$
required: true
minLength: 1
maxLength: 100
eventType: Tipo de evento que está sendo notificado.
Tipo: String
Formato: Para a POC deverá conter o valor
ACCOUNT_BALANCE_CHANGED
Required: true
eventDatetime: Data e hora em que o evento ocorreu.
Tipo: String
Formato: ISO-8601
Required: true
consentId: identificador do consentimento que está sendo notificado da ocorrência de um evento
tipo: UUID
format: String
pattern: ^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$
required: true
minLength: 1
maxLength: 100
resourceId: Identificador único do recurso relacionado ao evento. No caso de contas de depósito à vista, de poupança ou de pagamento pré-paga, corresponde ao AccountId.
tipo: UUID
format: String
pattern: ^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$
required: true
minLength: 1
maxLength: 100
Uso do Timestamp Nativo do Kafka
O timestamp de uma mensagem no Kafka é gerenciado pelo próprio Apache Kafka e pode ser acessado pelo consumidor através da estrutura ConsumerRecord. O Kafka introduziu o conceito de timestamps nativos nas mensagens a partir da versão 0.10.0. Existem dois tipos principais de timestamps que podem ser associados a uma mensagem:
CreateTime: Esse é o timestamp definido no momento em que o produtor envia a mensagem. Ele representa a hora em que a mensagem foi criada.
LogAppendTime: Esse é o timestamp atribuído por Kafka no momento em que a mensagem é adicionada ao log do servidor. Esse tipo de timestamp só é usado se a configuração do tópico estiver definida para usá-lo como padrão.
Desta forma, não existe necessidade de um header ou dado no payload para trafegar esta informação.
Schema
O schema abaixo pode ser utilizado como referência para o payload do body e se desejado cadastradp no SchemaRegistry do Kafka. Por limitações do Kafka, apenas o schema referente ao body pode ser registrado, o header “x-notification-id” deve ser adicionado a todas as mensagens de notificação produzidas.
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "EventNotification",
"type": "object",
"properties": {
"eventId": {
"type": "string",
"format": "uuid",
"pattern": "^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$",
"minLength": 1,
"maxLength": 100,
"description": "Identificador único para o evento de acordo com UUID RFC4122."
},
"eventType": {
"type": "string",
"enum": ["ACCOUNT_BALANCE_CHANGED"],
"description": "Tipo de evento que está sendo notificado. Para a POC deverá conter o valor ACCOUNT_BALANCE_CHANGED"
},
"eventDatetime": {
"type": "string",
"format": "date-time",
"description": "Data e hora em que o evento ocorreu."
},
"consentId": {
"type": "string",
"format": "uuid",
"pattern": "^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$",
"minLength": 1,
"maxLength": 100,
"description": "Identificador do consentimento que está sendo notificado da ocorrência de um evento."
},
"resourceId": {
"type": "string",
"format": "uuid",
"pattern": "^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$",
"minLength": 1,
"maxLength": 100,
"description": "Identificador único do recurso relacionado ao evento."
}
},
"required": ["eventId", "eventType", "eventDatetime", "consentId", "resourceId"]
}
Exemplo:
Considerando um evento de mudança de saldo em uma conta de resourceId “25cac914-d8ae-6789-b215-650a6215820d”, compartilhada através do consentimento "urn:bancoex:C1DD33123", a instituição transmissora deverá gerar um eventos e comunicá-lo através de tópico específico referente ao SoftwareStatment “6f1710a4-9f68-462f-8232-b59273bf4d16” do detentor deste consentimento.
Tópico utilizado: “6f1710a4-9f68-462f-8232-b59273bf4d16”
Header:
"x-notification-id": "945ee234-efb5-46ca-a604-67e60d9d810c"
Payload:
{
"consentId": "urn:bancoex:C1DD33123",
"resourceId": "25cac914-d8ae-6789-b215-650a6215820d",
"eventId": "a278632b-680c-4f94-8d54-08204fb1b658",
"eventType": "ACCOUNT_BALANCE_CHANGED",
"eventDateTime": "2025-06-09T10:30:40Z"
}
Exemplos em Java: Produtor e Consumidor
Exemplo de Produtor em Java
A criação do ProducerRecord inclui a configuração de headers e payload conforme discutido.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
...
public class KafkaProducerWithJackson {
private Producer<String, String> producer;
...
public void insertNotification(OfbEvent event) {
try {
// Criar o ObjectMapper do Jackson
ObjectMapper objectMapper = new ObjectMapper();
// Gerar UUID e timestamps para os headers
String notificationId = UUID.randomUUID().toString();
// Criar o mapa de dados para o body (valores hardcoded para demo)
Map<String, String> eventBody = new HashMap<>();
eventBody.put("consentId", "urn:bancoex:C1DD33123");
eventBody.put("resourceId", "25cac914-d8ae-6789-b215-650a6215820d");
eventBody.put("eventId", "a278632b-680c-4f94-8d54-08204fb1b658");
eventBody.put("eventType", "ACCOUNT_BALANCE_CHANGED");
eventBody.put("eventDateTime", "2025-06-09T10:30:40Z");
String jsonBody = objectMapper.writeValueAsString(eventBody);
// Criar o registro do produtor com headers
ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", null, null, null, jsonBody);
Header notificationIdHeader = new RecordHeader("x-notification-id", notificationId.getBytes());
record.headers.add(notificationIdHeader);
// Enviar a mensagem
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Exemplo de Consumidor em Java
No lado do consumidor, a leitura do payload e dos headers, junto ao uso do timestamp nativo, é diretamente acessível.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SimpleKafkaConsumer {
private KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
private final static String TOPIC = "your_topic_name";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String GROUP_ID = "your_group_id";
public SimpleKafkaConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Para pegar desde o início
consumer = new KafkaConsumer<>(properties);
executorService = Executors.newSingleThreadExecutor();
}
public void subscribeTopic(String topic) {
consumer.subscribe(Collections.singletonList(topic));
}
public void startConsuming() {
executorService.submit(this::processMessages);
}
private void processMessages() {
try {
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Recebido:\nTopic: %s\nPartition: %d\nOffset: %d\nKey: %s\nValue: %s\n\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// Processamento adicional das mensagens
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public void stopConsuming() {
// Ordenadamente encerrar o serviço do executor
executorService.shutdown();
try {
// Aguardar término das tarefas em andamento
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
SimpleKafkaConsumer consumer = new SimpleKafkaConsumer();
consumer.subscribeTopic(TOPIC);
consumer.startConsuming();
// Simular uma operação que você gostaria de pausar ou parar a aplicação
Runtime.getRuntime().addShutdownHook(new Thread(consumer::stopConsuming));
}
}
Related content
ESTE É UM CONTEÚDO EM DESENVOLVIMENTO E NÃO DEVE SER CONSIDERADO COMO VERSÃO FINAL!
Clique aqui para maiores informações