/
Schema da Mensagem Trafegada

ESTE É UM CONTEÚDO EM DESENVOLVIMENTO E NÃO DEVE SER CONSIDERADO COMO VERSÃO FINAL!
Clique aqui para maiores informações

Schema da Mensagem Trafegada

Estrutura da Mensagem

No Apache Kafka, cada mensagem trafegada é composta por dois elementos principais: Headers e Payload.

Headers

Os headers são metadados adicionais que acompanham a mensagem, permitindo o transporte de informações auxiliares sem fazer parte do payload principal. Eles consistem em pares chave-valor.

  • Atributos dos Headers:

    • x-notification-id: Identificador único da notificação.

      • Tipo: UUID

      • Formato: String

Payload

O payload contém os dados da mensagem de notificação em formato JSON.

  • Atributos do Payload:

    • eventId: Identificador único para o evento de acordo com UUID RFC4122.

      • tipo: UUID

      • format: String

      • pattern: ^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$

      • required: true

      • minLength: 1

      • maxLength: 100

    • eventType: Tipo de evento que está sendo notificado.

      • Tipo: String

      • Formato: Para a POC deverá conter o valor ACCOUNT_BALANCE_CHANGED

      • Required: true

    • eventDatetime: Data e hora em que o evento ocorreu.

      • Tipo: String

      • Formato: ISO-8601

      • Required: true

    • consentId: identificador do consentimento que está sendo notificado da ocorrência de um evento

      • tipo: UUID

      • format: String

      • pattern: ^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$

      • required: true

      • minLength: 1

      • maxLength: 100

    • resourceId: Identificador único do recurso relacionado ao evento. No caso de contas de depósito à vista, de poupança ou de pagamento pré-paga, corresponde ao AccountId.

      • tipo: UUID

      • format: String

      • pattern: ^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$

      • required: true

      • minLength: 1

      • maxLength: 100

Uso do Timestamp Nativo do Kafka

O timestamp de uma mensagem no Kafka é gerenciado pelo próprio Apache Kafka e pode ser acessado pelo consumidor através da estrutura ConsumerRecord. O Kafka introduziu o conceito de timestamps nativos nas mensagens a partir da versão 0.10.0. Existem dois tipos principais de timestamps que podem ser associados a uma mensagem:

  • CreateTime: Esse é o timestamp definido no momento em que o produtor envia a mensagem. Ele representa a hora em que a mensagem foi criada.

  • LogAppendTime: Esse é o timestamp atribuído por Kafka no momento em que a mensagem é adicionada ao log do servidor. Esse tipo de timestamp só é usado se a configuração do tópico estiver definida para usá-lo como padrão.

Desta forma, não existe necessidade de um header ou dado no payload para trafegar esta informação.

Schema

O schema abaixo pode ser utilizado como referência para o payload do body e se desejado cadastradp no SchemaRegistry do Kafka. Por limitações do Kafka, apenas o schema referente ao body pode ser registrado, o header “x-notification-id” deve ser adicionado a todas as mensagens de notificação produzidas.

{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "EventNotification", "type": "object", "properties": { "eventId": { "type": "string", "format": "uuid", "pattern": "^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$", "minLength": 1, "maxLength": 100, "description": "Identificador único para o evento de acordo com UUID RFC4122." }, "eventType": { "type": "string", "enum": ["ACCOUNT_BALANCE_CHANGED"], "description": "Tipo de evento que está sendo notificado. Para a POC deverá conter o valor ACCOUNT_BALANCE_CHANGED" }, "eventDatetime": { "type": "string", "format": "date-time", "description": "Data e hora em que o evento ocorreu." }, "consentId": { "type": "string", "format": "uuid", "pattern": "^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$", "minLength": 1, "maxLength": 100, "description": "Identificador do consentimento que está sendo notificado da ocorrência de um evento." }, "resourceId": { "type": "string", "format": "uuid", "pattern": "^[a-zA-Z0-9][a-zA-Z0-9-]{0,99}$", "minLength": 1, "maxLength": 100, "description": "Identificador único do recurso relacionado ao evento." } }, "required": ["eventId", "eventType", "eventDatetime", "consentId", "resourceId"] }

 

Exemplo:

Considerando um evento de mudança de saldo em uma conta de resourceId “25cac914-d8ae-6789-b215-650a6215820d”, compartilhada através do consentimento "urn:bancoex:C1DD33123", a instituição transmissora deverá gerar um eventos e comunicá-lo através de tópico específico referente ao SoftwareStatment “6f1710a4-9f68-462f-8232-b59273bf4d16” do detentor deste consentimento.

 

Tópico utilizado: “6f1710a4-9f68-462f-8232-b59273bf4d16”

Header:   

"x-notification-id": "945ee234-efb5-46ca-a604-67e60d9d810c"

 

Payload:   

{ "consentId": "urn:bancoex:C1DD33123", "resourceId": "25cac914-d8ae-6789-b215-650a6215820d", "eventId": "a278632b-680c-4f94-8d54-08204fb1b658", "eventType": "ACCOUNT_BALANCE_CHANGED", "eventDateTime": "2025-06-09T10:30:40Z" }

 

Exemplos em Java: Produtor e Consumidor

Exemplo de Produtor em Java

A criação do ProducerRecord inclui a configuração de headers e payload conforme discutido.

import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.StringSerializer; ... public class KafkaProducerWithJackson { private Producer<String, String> producer; ... public void insertNotification(OfbEvent event) { try { // Criar o ObjectMapper do Jackson ObjectMapper objectMapper = new ObjectMapper(); // Gerar UUID e timestamps para os headers String notificationId = UUID.randomUUID().toString(); // Criar o mapa de dados para o body (valores hardcoded para demo) Map<String, String> eventBody = new HashMap<>(); eventBody.put("consentId", "urn:bancoex:C1DD33123"); eventBody.put("resourceId", "25cac914-d8ae-6789-b215-650a6215820d"); eventBody.put("eventId", "a278632b-680c-4f94-8d54-08204fb1b658"); eventBody.put("eventType", "ACCOUNT_BALANCE_CHANGED"); eventBody.put("eventDateTime", "2025-06-09T10:30:40Z"); String jsonBody = objectMapper.writeValueAsString(eventBody); // Criar o registro do produtor com headers ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", null, null, null, jsonBody); Header notificationIdHeader = new RecordHeader("x-notification-id", notificationId.getBytes()); record.headers.add(notificationIdHeader); // Enviar a mensagem producer.send(record); } catch (Exception e) { e.printStackTrace(); } } }

Exemplo de Consumidor em Java

No lado do consumidor, a leitura do payload e dos headers, junto ao uso do timestamp nativo, é diretamente acessível.

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class SimpleKafkaConsumer { private KafkaConsumer<String, String> consumer; private ExecutorService executorService; private final static String TOPIC = "your_topic_name"; private final static String BOOTSTRAP_SERVERS = "localhost:9092"; private final static String GROUP_ID = "your_group_id"; public SimpleKafkaConsumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Para pegar desde o início consumer = new KafkaConsumer<>(properties); executorService = Executors.newSingleThreadExecutor(); } public void subscribeTopic(String topic) { consumer.subscribe(Collections.singletonList(topic)); } public void startConsuming() { executorService.submit(this::processMessages); } private void processMessages() { try { while (!Thread.currentThread().isInterrupted()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Recebido:\nTopic: %s\nPartition: %d\nOffset: %d\nKey: %s\nValue: %s\n\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); // Processamento adicional das mensagens } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } public void stopConsuming() { // Ordenadamente encerrar o serviço do executor executorService.shutdown(); try { // Aguardar término das tarefas em andamento if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } public static void main(String[] args) { SimpleKafkaConsumer consumer = new SimpleKafkaConsumer(); consumer.subscribeTopic(TOPIC); consumer.startConsuming(); // Simular uma operação que você gostaria de pausar ou parar a aplicação Runtime.getRuntime().addShutdownHook(new Thread(consumer::stopConsuming)); } }

 

 

Related content

ESTE É UM CONTEÚDO EM DESENVOLVIMENTO E NÃO DEVE SER CONSIDERADO COMO VERSÃO FINAL!
Clique aqui para maiores informações