Monitore seu cluster Apache Kafka auto-hospedado instalando o OpenTelemetry Collector diretamente em hosts Linux.
Antes de você começar
Certifique-se de ter:
Uma conta New Relic com uma
OpenJDK instalado no host de monitoramento
JMX habilitado nos brokers Kafka (normalmente na porta 9999)
Acesso à rede do coletor para os brokers Kafka:
- Porta do servidor bootstrap (normalmente 9092)
- Porta JMX (normalmente 9999)
Etapa 1: Instalar o OpenTelemetry Collector
Baixe e instale o binário OpenTelemetry Collector Contrib para o sistema operacional do seu host em OpenTelemetry Collector releases.
Etapa 2: Baixe o JMX scraper
O scraper JMX coleta métricas detalhadas dos MBeans do broker Kafka:
$# Create directory in user home (no sudo needed)$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \> https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jarImportante
Compatibilidade de versão: Este guia usa JMX Scraper 1.52.0. Versões mais antigas do OpenTelemetry Collector podem não incluir o hash deste scraper em sua lista de compatibilidade. Para obter os melhores resultados, use a versão mais recente do OpenTelemetry Collector, que inclui suporte para esta versão do JMX Scraper.
Etapa 3: Crie a configuração de métricas personalizadas JMX
Crie um arquivo de configuração JMX personalizado para coletar métricas Kafka adicionais não incluídas no sistema de destino padrão.
Crie o arquivo ~/opentelemetry/kafka-jmx-config.yaml com a seguinte configuração:
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Dica
Personalize a coleta de métricas: Você pode coletar métricas Kafka adicionais adicionando regras MBean personalizadas ao arquivo kafka-jmx-config.yaml:
Encontre os nomes de MBean disponíveis na documentação de monitoramento do Kafka
Isso permite que você colete qualquer métrica JMX exposta pelos brokers Kafka com base em suas necessidades específicas de monitoramento.
Passo 4: Crie a configuração do coletor
Crie a configuração principal do OpenTelemetry Collector em ~/opentelemetry/config.yaml.
receivers: # Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: - ${env:KAFKA_BROKER_ADDRESS} protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# JMX receiver for broker-specific metrics jmx/kafka_broker-1: jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS} target_system: kafka collection_interval: 30s jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml resource_attributes: broker.id: "1" broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: metrics/brokers-cluster-topics: receivers: [jmx/kafka_broker-1, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation] exporters: [otlp/newrelic]
metrics/jmx-cluster: receivers: [jmx/kafka_broker-1] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]Observações de configuração:
- Endpoint OTLP: Usa
https://otlp.nr-data.net:4317(região dos EUA) ouhttps://otlp.eu01.nr-data.net:4317(região da UE). Consulte Configure seu endpoint OTLP para outras regiões
Importante
Para vários brokers, adicione receptores JMX adicionais com diferentes endpoints e IDs de broker para monitorar cada broker em seu cluster.
Etapa 5: Definir variáveis de ambiente
Definir as variáveis de ambiente necessárias:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BROKER_ADDRESS="localhost:9092"$export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"Substituir:
YOUR_LICENSE_KEYcom sua chave de licença New Relicmy-kafka-clustercom um nome exclusivo para seu cluster Kafkalocalhost:9092com o endereço do seu servidor bootstrap Kafkalocalhost:9999com seu endpoint JMX do broker Kafka
Etapa 6: Inicie o coletor
Execute o coletor diretamente (sem necessidade de sudo):
$# Start the collector with your config$otelcol-contrib --config ~/opentelemetry/config.yamlO coletor começará a enviar métricas do Kafka para o New Relic em alguns minutos.
Crie um serviço systemd para execução persistente (requer sudo para configuração única):
$# Create systemd service file$sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF$[Unit]$Description=OpenTelemetry Collector for Kafka$After=network.target$
$[Service]$Type=simple$User=$USER$WorkingDirectory=$HOME/opentelemetry$ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml$Restart=on-failure$Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"$Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"$Environment="KAFKA_BROKER_ADDRESS=localhost:9092"$Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"$
$[Install]$WantedBy=multi-user.target$EOFSubstitua YOUR_LICENSE_KEY e outros valores e, em seguida, habilite e inicie o serviço:
$sudo systemctl daemon-reload$sudo systemctl enable otelcol-contrib$sudo systemctl start otelcol-contrib$sudo systemctl status otelcol-contribEtapa 7: (Opcional) Instrumente aplicativos de produtor ou consumidor
Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka, use o OpenTelemetry Java Agent:
Baixe o agente Java:
bash$mkdir -p ~/otel-java$curl -L -o ~/otel-java/opentelemetry-javaagent.jar \>https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarInicie seu aplicativo com o agente:
bash$java \>-javaagent:~/otel-java/opentelemetry-javaagent.jar \>-Dotel.service.name="kafka-producer-1" \>-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \>-Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \>-Dotel.exporter.otlp.protocol="grpc" \>-Dotel.metrics.exporter="otlp" \>-Dotel.traces.exporter="otlp" \>-Dotel.logs.exporter="otlp" \>-Dotel.instrumentation.kafka.experimental-span-attributes="true" \>-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \>-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \>-Dotel.instrumentation.kafka.enabled="true" \>-jar your-kafka-application.jar
Substituir:
kafka-producer-1com um nome exclusivo para seu aplicativo produtor ou consumidormy-kafka-clustercom o mesmo nome de cluster usado na configuração do coletorhttps://otlp.nr-data.net:4317com seu endpoint New Relic OTLP (usehttps://otlp.eu01.nr-data.net:4317para a região da UE). Para outros endpoints e opções de configuração, consulte Configure seu endpoint OTLP.
O Java Agent fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando:
- Latências de solicitação
- Métricas de throughput
- Taxas de erro
- Rastreamento distribuído
Para configuração avançada, consulte a documentação de instrumentação do Kafka.
Etapa 6: (Opcional) Encaminhar logs do broker Kafka
Para coletar logs do broker Kafka de seus hosts e enviá-los para New Relic, configure o receptor de log de arquivo em seu OpenTelemetry Collector.
Encontre seus dados
Após alguns minutos, suas métricas Kafka devem aparecer no New Relic. Consulte Encontre seus dados para obter instruções detalhadas sobre como explorar suas métricas Kafka em diferentes visualizações na interface do usuário do New Relic.
Você também pode consultar seus dados com NRQL:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Resolução de problemas
Próximos passos
- Explorar métricas Kafka - Veja a referência completa de métricas
- Criar dashboards personalizados - Criar visualizações para seus dados do Kafka
- Configure alertas - Monitore métricas críticas como atraso do consumidor e partições sub-replicadas