Skip to main content

Monitoring

Covers the three pillars of observability for kube-based controllers: structured logging, distributed tracing, and metrics.

Structured Logging

kube-rs uses the tracing crate. Use tracing-subscriber to configure output format and filters.

tracing_subscriber::fmt()
.with_env_filter("kube=info,my_controller=debug")
.json() // structured JSON logging
.init();

Spans Automatically Added by Controller

The Controller automatically creates a tracing span for each reconcile invocation. The span includes the following information:

FieldContent
object.refObjectRef (includes name and namespace)
object.reasonReconcileReason (object updated, reconciler requested retry, etc.)

When you call tracing::info!() or similar inside the reconciler, this span context is automatically included.

RUST_LOG Filtering

# kube internal HTTP request logging
RUST_LOG=kube=debug,my_controller=info

# individual watch event logging (very verbose)
RUST_LOG=kube=trace

# suppress HTTP-level noise
RUST_LOG=kube=info,hyper=warn,tower=warn

In production, it is common to set kube=warn or higher, and only open the controller logic at info or debug.

Distributed Tracing

By connecting OpenTelemetry with an OTLP exporter, you can visualize reconcile invocations in distributed tracing systems (Jaeger, Tempo, etc.).

use tracing_subscriber::layer::SubscriberExt;
use opentelemetry_otlp::SpanExporter;

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(SpanExporter::builder().with_tonic().build()?)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;

let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

tracing_subscriber::registry()
.with(telemetry)
.with(tracing_subscriber::fmt::layer())
.init();

Client's TraceLayer

As discussed in Client Internal Architecture, the TraceLayer sits at the top of the Tower stack. Spans are automatically added to all HTTP requests.

Information included in the span:

  • HTTP method (GET, PATCH, etc.)
  • URL path
  • Response status code
  • Request duration

These HTTP spans are linked under the reconciler span, so you can directly see which API requests were made during a single reconcile invocation in the tracing UI.

Using the #[instrument] Macro

Applying #[instrument] to the reconciler lets you automatically add function arguments and custom fields to the span:

use tracing::instrument;

#[instrument(skip(ctx), fields(trace_id))]
async fn reconcile(obj: Arc<MyResource>, ctx: Arc<Context>) -> Result<Action, Error> {
// Record the OpenTelemetry trace_id in the current span
Span::current().record(
"trace_id",
&tracing::field::display(
opentelemetry::trace::TraceContextExt::current_with_context()
),
);

// trace_id is automatically included in subsequent tracing::info!() calls
tracing::info!("reconciling");
// ...
Ok(Action::requeue(Duration::from_secs(300)))
}

When using the JSON formatter in tracing-subscriber, the trace_id field is included in structured logs, allowing you to search for all related logs by trace_id in log systems (Loki, CloudWatch, etc.).

Metrics

Metrics are collected by consuming the results of the Controller stream.

MetricTypeDescription
reconcile_totalCounterTotal reconcile count (with success/failure labels)
reconcile_duration_secondsHistogramReconcile duration
reconcile_errors_totalCounterError count (with error type labels)
reconcile_queue_depthGaugeNumber of items pending in the scheduler

Collection Pattern

let metrics = ctx.metrics.clone();
Controller::new(api, wc)
.run(reconcile, error_policy, ctx)
.for_each(|result| {
let metrics = metrics.clone();
async move {
match result {
Ok((obj_ref, _action)) => {
metrics.reconcile_success.inc();
}
Err(err) => {
metrics.reconcile_errors.inc();
}
}
}
})
.await;

To measure reconcile duration, measure it directly inside the reconciler function:

async fn reconcile(obj: Arc<MyResource>, ctx: Arc<Context>) -> Result<Action, Error> {
let start = std::time::Instant::now();
let result = reconcile_inner(&obj, &ctx).await;
ctx.metrics.reconcile_duration.observe(start.elapsed().as_secs_f64());
result
}

Exposing Metrics

Expose a /metrics endpoint via a separate HTTP server. Use the prometheus or metrics + metrics-exporter-prometheus crates.

use axum::{routing::get, Router};
use prometheus::TextEncoder;

async fn metrics_handler() -> String {
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode_to_string(&metric_families).unwrap()
}

let app = Router::new().route("/metrics", get(metrics_handler));

Health check

Expose the controller's readiness and liveness as Kubernetes probes.

Readiness

As discussed in Reflector and Store, the Store is empty on creation and only gets populated when the watcher stream is polled. The readiness probe checks whether the Store has completed its initial list load.

let (reader, writer) = reflector::store();

// Pass the reader to the health server
let health_reader = reader.clone();
tokio::spawn(async move {
let app = Router::new()
.route("/readyz", get(move || async move {
match health_reader.wait_until_ready().await {
Ok(()) => (StatusCode::OK, "ready"),
Err(_) => (StatusCode::SERVICE_UNAVAILABLE, "not ready"),
}
}));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
axum::serve(listener, app).await.unwrap();
});

Liveness

Checks whether the reconcile loop is alive. Track the last successful reconcile time, and consider it unhealthy if a certain duration is exceeded.

use std::sync::atomic::{AtomicI64, Ordering};

struct Context {
client: Client,
last_reconcile: AtomicI64, // Unix timestamp
}

async fn reconcile(obj: Arc<MyResource>, ctx: Arc<Context>) -> Result<Action, Error> {
// reconcile logic...
ctx.last_reconcile.store(
jiff::Timestamp::now().as_second(),
Ordering::Relaxed,
);
Ok(Action::requeue(Duration::from_secs(300)))
}

The liveness endpoint returns 503 if the time elapsed since the last reconcile exceeds the threshold.

Execution Structure

Run the health server and metrics server alongside the Controller:

// health + metrics server
tokio::spawn(health_and_metrics_server(reader.clone()));

// Run the Controller
Controller::new(api, wc)
.run(reconcile, error_policy, ctx)
.for_each(|res| async move {
match res {
Ok(o) => tracing::info!("reconciled {:?}", o),
Err(e) => tracing::error!("reconcile error: {:?}", e),
}
})
.await;

The typical pattern is to run the HTTP server as a separate task using tokio::spawn, while the Controller runs on the main task.