Monitore seu cluster Kafka em execução no Kubernetes com o operador Strimzi, implantando o OpenTelemetry Collector. O coletor descobre automaticamente os pods do broker Kafka e coleta métricas abrangentes.
Antes de você começar
Certifique-se de ter:
- Uma conta New Relic com uma
- Cluster Kubernetes com acesso kubectl
- Kafka implantado via operador Strimzi com JMX habilitado
Habilite JMX no Strimzi Kafka
Certifique-se de que seu cluster Kafka tenha JMX habilitado no recurso Strimzi Kafka:
apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-cluster namespace: kafkaspec: kafka: jmxOptions: {} # Enables JMX with default settings # ...other broker configurationEtapa 1: Criar namespace
Crie um namespace dedicado para o OpenTelemetry Collector (ou use seu namespace Kafka existente):
$kubectl create namespace kafkaEtapa 2: Crie um segredo com a chave de licença
Armazene sua chave de licença do New Relic como um segredo do Kubernetes:
$kubectl create secret generic nr-license-key \> --from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \> -n kafkaSubstitua YOUR_LICENSE_KEY pela sua chave de licença New Relic real.
Etapa 3: Implantar o OpenTelemetry Collector
3.1 Criar imagem de coletor personalizada
Crie uma imagem personalizada do OpenTelemetry Collector com tempo de execução Java e JMX scraper.
Importante
Compatibilidade de versão: Este guia usa JMX Scraper 1.52.0 e OpenTelemetry Collector 0.143.1. Versões mais antigas do coletor podem não incluir o hash deste scraper em sua lista de compatibilidade. Para obter os melhores resultados, use as versões mais recentes, conforme mostrado neste guia.
Arquitetura de destino: Consulte a página OpenTelemetry Collector releases para encontrar o binário correto para a arquitetura do seu sistema (por exemplo, linux_amd64, linux_arm64, darwin_amd64). Atualize a variável TARGETARCH no Dockerfile de acordo.
Salvar como Dockerfile:
# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector BinaryARG OTEL_VERSION=0.143.1ARG TARGETARCH=linux_amd64ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcolRUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)ARG JMX_SCRAPER_JAR_VERSION=1.52.0ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)ARG USER_UID=65532RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtimeFROM openjdk:17-jre-slim
COPY /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jarCOPY /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888ENTRYPOINT ["/otelcol-contrib"]CMD ["--config", "/conf/otel-agent-config.yaml"]Crie e envie a imagem:
$docker build -t your-registry/otel-collector-kafka:latest .$docker push your-registry/otel-collector-kafka:latest3.2 Crie o ConfigMap de métricas personalizadas JMX
Primeiro, crie um ConfigMap com a configuração de métricas JMX personalizadas. Salve como jmx-kafka-config.yaml:
apiVersion: v1kind: ConfigMapmetadata: name: jmx-kafka-config namespace: kafkadata: jmx-kafka-config.yaml: | --- rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Dica
Personalize a coleta de métricas: Você pode coletar métricas Kafka adicionais adicionando regras MBean personalizadas ao arquivo kafka-jmx-config.yaml:
Encontre os nomes de MBean disponíveis na documentação de monitoramento do Kafka
Isso permite que você colete qualquer métrica JMX exposta pelos brokers Kafka com base em suas necessidades específicas de monitoramento.
Aplique o JMX ConfigMap:
$kubectl apply -f jmx-kafka-config.yaml3.3 Crie o ConfigMap do coletor
Crie um ConfigMap com a configuração do OpenTelemetry Collector. Salve como otel-kafka-config.yaml:
---apiVersion: v1kind: ConfigMapmetadata: name: otel-collector-config namespace: kafka labels: app: otel-collectordata: otel-collector-config.yaml: | receivers: # Kafka cluster-level metrics (runs once per OTEL collector) kafkametrics/cluster: brokers: - "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# Receiver creator for dynamic per-broker JMX receivers receiver_creator: watch_observers: [k8s_observer] receivers: # JMX receiver template (created per discovered broker pod) jmx: rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka" config: endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi' jar_path: /opt/opentelemetry-jmx-scraper.jar target_system: kafka jmx_configs: /conf-jmx/jmx-kafka-config.yaml collection_interval: 30s # Set dynamic resource attributes from discovered pod resource_attributes: broker.endpoint: '`endpoint`'
exporters: otlp: endpoint: https://otlp.nr-data.net:4317 tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true headers: api-key: ${NEW_RELIC_LICENSE_KEY}
processors: # Batch processor for efficiency batch/aggregation: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Detect system resources resourcedetection: detectors: [env, docker, system] timeout: 5s override: false
# Add Kafka cluster metadata resource/kafka_metadata: attributes: - key: kafka.cluster.name value: my-cluster action: upsert
# Extract Kubernetes attributes k8sattributes: auth_type: serviceAccount passthrough: false extract: metadata: - k8s.pod.name - k8s.pod.uid - k8s.namespace.name - k8s.node.name labels: - tag_name: strimzi.cluster key: strimzi.io/cluster from: pod - tag_name: strimzi.kind key: strimzi.io/kind from: pod
# Transform metrics for New Relic UI transform: metric_statements: - context: metric statements: # Clean up descriptions and units - set(description, "") where description != "" - set(unit, "") where unit != ""
- context: resource statements: # Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit) - set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id") - delete_key(attributes, "broker.endpoint") - delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
# Filter to include only cluster-level metrics filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic cumulativetodelta:
extensions: # K8s observer extension k8s_observer: auth_type: serviceAccount observe_pods: true observe_nodes: false
service: extensions: [k8s_observer]
pipelines: # Per-broker metrics pipeline (with broker.id) metrics/broker: receivers: - receiver_creator - kafkametrics/cluster processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/exclude_cluster_metrics - transform - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated) metrics/cluster: receivers: - receiver_creator processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/include_cluster_metrics - transform/remove_broker_id - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]Observações de configuração:
- Substitua
my-cluster-kafka-bootstrappelo nome do seu serviço Strimzi Kafka - Substitua
my-clusteremruleekafka.cluster.namepelo nome do seu cluster - Atualize o namespace se for diferente de
kafka - Endpoint OTLP: Usa
https://otlp.nr-data.net:4317(região dos EUA) ouhttps://otlp.eu01.nr-data.net:4317(região da UE). Consulte Configure seu endpoint OTLP para outras regiões - O
receiver_creatordescobre automaticamente os pods do broker Kafka usando rótulos Strimzi
Aplique o ConfigMap:
$kubectl apply -f otel-kafka-config.yaml3.4 Implante o coletor
Crie a implantação. Salve como otel-collector-deployment.yaml:
apiVersion: apps/v1kind: Deploymentmetadata: name: otel-collector namespace: kafkaspec: replicas: 1 selector: matchLabels: app: otel-collector template: metadata: labels: app: otel-collector spec: serviceAccountName: otel-collector containers: - name: otel-collector image: your-registry/otel-collector-kafka:latest env: - name: NEW_RELIC_LICENSE_KEY valueFrom: secretKeyRef: name: nr-license-key key: NEW_RELIC_LICENSE_KEY resources: limits: cpu: "1" memory: "2Gi" requests: cpu: "500m" memory: "1Gi" volumeMounts: - name: vol-kafka-test-cluster mountPath: /conf - name: jmx-config mountPath: /conf-jmx ports: - containerPort: 4317 # OTLP gRPC - containerPort: 4318 # OTLP HTTP - containerPort: 8888 # Metrics volumes: - name: vol-kafka-test-cluster configMap: name: otel-collector-config items: - key: otel-collector-config.yaml path: otel-agent-config.yaml - name: jmx-config configMap: name: jmx-kafka-config items: - key: jmx-kafka-config.yaml path: jmx-kafka-config.yaml---apiVersion: v1kind: ServiceAccountmetadata: name: otel-collector namespace: kafka---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRolemetadata: name: otel-collectorrules: - apiGroups: [""] resources: ["pods", "nodes"] verbs: ["get", "list", "watch"]---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRoleBindingmetadata: name: otel-collectorsubjects: - kind: ServiceAccount name: otel-collector namespace: kafkaroleRef: kind: ClusterRole name: otel-collector apiGroup: rbac.authorization.k8s.ioConfiguração de recursos:
- Os limites de recursos acima são adequados para clusters Kafka de tamanho médio (5-10 brokers, 20-100 tópicos)
Aplicar a implantação:
$kubectl apply -f otel-collector-deployment.yamlVerifique se o coletor está em execução:
$kubectl get pods -n kafka -l app=otel-collector$kubectl logs -n kafka -l app=otel-collector -fEtapa 4: (Opcional) Instrumente aplicativos produtores ou consumidores
Para coletar telemetria em nível de aplicativo de aplicativos produtores e consumidores Kafka em execução no Kubernetes, instrumente-os com o OpenTelemetry Java Agent.
Instrumente seu aplicativo Kafka
Para instrumentar seus aplicativos produtor ou consumidor Kafka, adicione o OpenTelemetry Java Agent à sua implantação existente:
Baixar o agente Java: Adicione um contêiner init para baixar o JAR do agente:
initContainers:- name: download-otel-agentimage: busybox:latestcommand:- sh- -c- |wget -O /otel/opentelemetry-javaagent.jar \https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarvolumeMounts:- name: otel-agentmountPath: /otelConfigurar o agente Java: Adicione variáveis de ambiente ao seu contêiner de aplicativo:
env:- name: JAVA_TOOL_OPTIONSvalue: >--javaagent:/otel/opentelemetry-javaagent.jar-Dotel.service.name="kafka-producer"-Dotel.resource.attributes="kafka.cluster.name=my-cluster"-Dotel.exporter.otlp.endpoint="http://localhost:4317"-Dotel.exporter.otlp.protocol="grpc"-Dotel.metrics.exporter="otlp"-Dotel.traces.exporter="otlp"-Dotel.logs.exporter="otlp"-Dotel.instrumentation.kafka.experimental-span-attributes="true"-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"-Dotel.instrumentation.kafka.producer-propagation.enabled="true"-Dotel.instrumentation.kafka.enabled="true"volumeMounts:- name: otel-agentmountPath: /otelAdicione o volume: Inclua a definição de volume:
volumes:- name: otel-agentemptyDir: {}
Substituir:
kafka-producercom um nome exclusivo para seu aplicativomy-clustercom o nome do seu cluster Kafka
Dica
A configuração acima envia telemetria para um OpenTelemetry Collector em execução em localhost:4317. Implantar seu próprio coletor com esta configuração:
receivers: otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: "${NEW_RELIC_LICENSE_KEY}" compression: gzip timeout: 30s
service: pipelines: traces: receivers: [otlp] exporters: [otlp/newrelic] metrics: receivers: [otlp] exporters: [otlp/newrelic] logs: receivers: [otlp] exporters: [otlp/newrelic]Isso permite que você personalize o processamento, adicione filtros ou direcione para vários back-ends. Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.
O Java Agent fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando:
- Latências de solicitação
- Métricas de throughput
- Taxas de erro
- Rastreamento distribuído
Para configuração avançada, consulte a documentação de instrumentação do Kafka.
Etapa 5: (Opcional) Encaminhe os logs do broker Kafka
Para coletar logs do broker Kafka de seus pods Kubernetes e enviá-los para o New Relic, configure o receptor de log de arquivo em seu OpenTelemetry Collector.
Encontre seus dados
Após alguns minutos, suas métricas Kafka devem aparecer no New Relic. Consulte Encontre seus dados para obter instruções detalhadas sobre como explorar suas métricas Kafka em diferentes visualizações na interface do usuário do New Relic.
Você também pode consultar seus dados com NRQL:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Resolução de problemas
Próximos passos
- Explorar métricas Kafka - Veja a referência completa de métricas
- Criar dashboards personalizados - Criar visualizações para seus dados do Kafka
- Configure alertas - Monitore métricas críticas como atraso do consumidor e partições sub-replicadas