• /
  • EnglishEspañolFrançais日本語한국어Português
  • EntrarComeçar agora

Esta tradução de máquina é fornecida para sua comodidade.

Caso haja alguma divergência entre a versão em inglês e a traduzida, a versão em inglês prevalece. Acesse esta página para mais informações.

Criar um problema

Monitore o Kafka auto-hospedado com OpenTelemetry

Monitore seu cluster Apache Kafka auto-hospedado instalando o OpenTelemetry Collector diretamente em hosts Linux.

Antes de você começar

Certifique-se de ter:

  • Uma conta New Relic com uma

  • OpenJDK instalado no host de monitoramento

  • JMX habilitado nos brokers Kafka (normalmente na porta 9999)

  • Acesso à rede do coletor para os brokers Kafka:

    • Porta do servidor bootstrap (normalmente 9092)
    • Porta JMX (normalmente 9999)

Etapa 1: Instalar o OpenTelemetry Collector

Baixe e instale o binário OpenTelemetry Collector Contrib para o sistema operacional do seu host em OpenTelemetry Collector releases.

Etapa 2: Baixe o JMX scraper

O scraper JMX coleta métricas detalhadas dos MBeans do broker Kafka:

bash
$
# Create directory in user home (no sudo needed)
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \
>
https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jar

Importante

Compatibilidade de versão: Este guia usa JMX Scraper 1.52.0. Versões mais antigas do OpenTelemetry Collector podem não incluir o hash deste scraper em sua lista de compatibilidade. Para obter os melhores resultados, use a versão mais recente do OpenTelemetry Collector, que inclui suporte para esta versão do JMX Scraper.

Etapa 3: Crie a configuração de métricas personalizadas JMX

Crie um arquivo de configuração JMX personalizado para coletar métricas Kafka adicionais não incluídas no sistema de destino padrão.

Crie o arquivo ~/opentelemetry/kafka-jmx-config.yaml com a seguinte configuração:

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Dica

Personalize a coleta de métricas: Você pode coletar métricas Kafka adicionais adicionando regras MBean personalizadas ao arquivo kafka-jmx-config.yaml:

Passo 4: Crie a configuração do coletor

Crie a configuração principal do OpenTelemetry Collector em ~/opentelemetry/config.yaml.

receivers:
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers:
- ${env:KAFKA_BROKER_ADDRESS}
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
topic_match: ".*"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# JMX receiver for broker-specific metrics
jmx/kafka_broker-1:
jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar
endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
target_system: kafka
collection_interval: 30s
jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml
resource_attributes:
broker.id: "1"
broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
metrics/brokers-cluster-topics:
receivers: [jmx/kafka_broker-1, kafkametrics]
processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation]
exporters: [otlp/newrelic]
metrics/jmx-cluster:
receivers: [jmx/kafka_broker-1]
processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation]
exporters: [otlp/newrelic]

Observações de configuração:

  • Endpoint OTLP: Usa https://otlp.nr-data.net:4317 (região dos EUA) ou https://otlp.eu01.nr-data.net:4317 (região da UE). Consulte Configure seu endpoint OTLP para outras regiões

Importante

Para vários brokers, adicione receptores JMX adicionais com diferentes endpoints e IDs de broker para monitorar cada broker em seu cluster.

Etapa 5: Definir variáveis de ambiente

Definir as variáveis de ambiente necessárias:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BROKER_ADDRESS="localhost:9092"
$
export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"

Substituir:

  • YOUR_LICENSE_KEY com sua chave de licença New Relic
  • my-kafka-cluster com um nome exclusivo para seu cluster Kafka
  • localhost:9092 com o endereço do seu servidor bootstrap Kafka
  • localhost:9999 com seu endpoint JMX do broker Kafka

Etapa 6: Inicie o coletor

Execute o coletor diretamente (sem necessidade de sudo):

bash
$
# Start the collector with your config
$
otelcol-contrib --config ~/opentelemetry/config.yaml

O coletor começará a enviar métricas do Kafka para o New Relic em alguns minutos.

Crie um serviço systemd para execução persistente (requer sudo para configuração única):

bash
$
# Create systemd service file
$
sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF
$
[Unit]
$
Description=OpenTelemetry Collector for Kafka
$
After=network.target
$
$
[Service]
$
Type=simple
$
User=$USER
$
WorkingDirectory=$HOME/opentelemetry
$
ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml
$
Restart=on-failure
$
Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"
$
Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"
$
Environment="KAFKA_BROKER_ADDRESS=localhost:9092"
$
Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"
$
$
[Install]
$
WantedBy=multi-user.target
$
EOF

Substitua YOUR_LICENSE_KEY e outros valores e, em seguida, habilite e inicie o serviço:

bash
$
sudo systemctl daemon-reload
$
sudo systemctl enable otelcol-contrib
$
sudo systemctl start otelcol-contrib
$
sudo systemctl status otelcol-contrib

Etapa 7: (Opcional) Instrumente aplicativos de produtor ou consumidor

Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka, use o OpenTelemetry Java Agent:

  1. Baixe o agente Java:

    bash
    $
    mkdir -p ~/otel-java
    $
    curl -L -o ~/otel-java/opentelemetry-javaagent.jar \
    >
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
  2. Inicie seu aplicativo com o agente:

    bash
    $
    java \
    >
    -javaagent:~/otel-java/opentelemetry-javaagent.jar \
    >
    -Dotel.service.name="kafka-producer-1" \
    >
    -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
    >
    -Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \
    >
    -Dotel.exporter.otlp.protocol="grpc" \
    >
    -Dotel.metrics.exporter="otlp" \
    >
    -Dotel.traces.exporter="otlp" \
    >
    -Dotel.logs.exporter="otlp" \
    >
    -Dotel.instrumentation.kafka.experimental-span-attributes="true" \
    >
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
    >
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
    >
    -Dotel.instrumentation.kafka.enabled="true" \
    >
    -jar your-kafka-application.jar

Substituir:

  • kafka-producer-1 com um nome exclusivo para seu aplicativo produtor ou consumidor
  • my-kafka-cluster com o mesmo nome de cluster usado na configuração do coletor
  • https://otlp.nr-data.net:4317 com seu endpoint New Relic OTLP (use https://otlp.eu01.nr-data.net:4317 para a região da UE). Para outros endpoints e opções de configuração, consulte Configure seu endpoint OTLP.

O Java Agent fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando:

  • Latências de solicitação
  • Métricas de throughput
  • Taxas de erro
  • Rastreamento distribuído

Para configuração avançada, consulte a documentação de instrumentação do Kafka.

Etapa 6: (Opcional) Encaminhar logs do broker Kafka

Para coletar logs do broker Kafka de seus hosts e enviá-los para New Relic, configure o receptor de log de arquivo em seu OpenTelemetry Collector.

Encontre seus dados

Após alguns minutos, suas métricas Kafka devem aparecer no New Relic. Consulte Encontre seus dados para obter instruções detalhadas sobre como explorar suas métricas Kafka em diferentes visualizações na interface do usuário do New Relic.

Você também pode consultar seus dados com NRQL:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

Resolução de problemas

Próximos passos

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.