#metrics #logging-tracing #tracing #logging #observability

emit_otlp

Emit diagnostic events to an OpenTelemetry-compatible collector

5 releases

new 0.11.0-alpha.5 Jun 13, 2024
0.11.0-alpha.4 Jun 12, 2024
0.11.0-alpha.3 Jun 8, 2024
0.11.0-alpha.2 Jun 7, 2024

#252 in Debugging

Download history 411/week @ 2024-06-03

411 downloads per month

MIT/Apache

415KB
7K SLoC

emit_otlp

otlp

Current docs

Emit diagnostic events via the OpenTelemetry Protocol (OTLP).

This library sends export requests directly to some remote OTLP receiver. If you need to integrate emit with the OpenTelemetry SDK, see emit-opentelemetry.


lib.rs:

Emit diagnostic events via the OpenTelemetry Protocol (OTLP).

This library provides Otlp, an emit::Emitter that sends export requests directly to some remote OTLP receiver. If you need to integrate emit with the OpenTelemetry SDK, see emit-opentelemetry.

How it works

┌────────────────────────────────────────┐  ┌─────────────┐    ┌─────────────────────────────┐
│                caller                  │  │   channel   │    │     background worker       │
│                                        │  │             │    │                             │
│ emit::Event─┬─*─►is trace?──►Span──────┼──┼──►Trace─────┼─┐  │ ExportTraceServiceRequest   │
│             │                          │  │             │ │  │                             │
│             ├─*─►is metric?─►Metric────┼──┼──►Metrics───┼─┼──► ExportMetricsServiceRequest │
│             │                          │  │             │ │  │                             │
│             └─*─────────────►LogRecord─┼──┼──►Logs──────┼─┘  │ ExportLogsServiceRequest    │
└────────────────────────────────────────┘  └─────────────┘    └─────────────────────────────┘
* Only if the logs/trace/metrics signal is configured

The emitter is based on an asynchronous, batching channel. A diagnostic event makes its way from emit::emit! through to the remote OTLP receiver in the following key steps:

  1. Determine what kind of signal the event belongs to:
  • If the event carries emit::Kind::Span, and the trace signal is configured, then treat it as a span.
  • If the event carries emit::Kind::Metric, and the metrics signal is configured, then treat it as a metric.
  • In any other case, if the logs signal is configured, then treat it as a log record.
  1. Serialize the event into the OTLP datastructure in the target format (JSON/protobuf).
  2. Put the serialized event into a channel. Each signal has its own internal queue in the channel.
  3. On a background worker, process the events in the channel by forming them up into OTLP export requests and sending them using the target protocol (HTTP/gRPC).

This library is based on hyper with tokio for HTTP, and rustls with ring for TLS. These dependencies are not configurable and can't be swapped for alternative implementations.

Getting started

Add emit and emit_otlp to your Cargo.toml:

[dependencies.emit]
version = "0.11.0-alpha.5"

[dependencies.emit_otlp]
version = "0.11.0-alpha.5"

Initialize emit at the start of your main.rs using an OTLP emitter:

fn main() {
let rt = emit::setup()
.emit_to(emit_otlp::new()
// Add required resource properties for OTLP
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
// Configure endpoints for logs/traces/metrics using gRPC + protobuf
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.traces(emit_otlp::traces_grpc_proto("http://localhost:4319"))
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap())
.init();

// Your app code goes here

rt.blocking_flush(std::time::Duration::from_secs(30));
}

The [new] method returns an OtlpBuilder, which can be configured with endpoints for the desired signals through its OtlpBuilder::logs, OtlpBuilder::traces, and OtlpBuilder::metrics methods.

You don't need to configure all signals, but you should at least configure OtlpBuilder::logs.

Once the builder is configured, call OtlpBuilder::spawn and pass the resulting Otlp to emit::Setup::emit_to.

Where the background worker is spawned

The Otlp emitter doesn't do any work directly. That's all handled by a background worker created through OtlpBuilder::spawn. Where OtlpBuilder::spawn actually spawns that background worker depends on where it's called.

If OtlpBuilder::spawn is called within a tokio runtime, then the worker will spawn into that runtime:

// This will spawn in the active tokio runtime because of #[tokio::main]

#[tokio::main]
async fn main() {
let rt = emit::setup()
.emit_to(emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap())
.init();

rt.blocking_flush(std::time::Duration::from_secs(30));
}

If OtlpBuilder::spawn is called outside a tokio runtime, then the worker will spawn on a background thread with a single-threaded executor on it:

// This will spawn on a background thread because there's no active tokio runtime

fn main() {
let rt = emit::setup()
.emit_to(emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap())
.init();

rt.blocking_flush(std::time::Duration::from_secs(30));
}

Configuring for gRPC+protobuf

The logs_grpc_proto, traces_grpc_proto, and metrics_grpc_proto functions produce builders for gRPC+protobuf:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.traces(emit_otlp::traces_grpc_proto("http://localhost:4319"))
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap()

gRPC is based on HTTP and internally uses well-known URI paths to route RPC requests. These paths are appended automatically to the endpoint, so you don't need to specify them during configuration.

Configuring for HTTP+JSON

The logs_http_json, traces_http_json, and metrics_http_json functions produce builders for HTTP+JSON:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.logs(emit_otlp::logs_http_json("http://localhost:4318/v1/logs"))
.traces(emit_otlp::traces_http_json("http://localhost:4318/v1/traces"))
.metrics(emit_otlp::metrics_http_json("http://localhost:4318/v1/metrics"))
.spawn()
.unwrap()

Configuring for HTTP+protobuf

The logs_http_proto, traces_http_proto, and metrics_http_proto functions produce builders for HTTP+protobuf:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.logs(emit_otlp::logs_http_proto("http://localhost:4318/v1/logs"))
.traces(emit_otlp::traces_http_proto("http://localhost:4318/v1/traces"))
.metrics(emit_otlp::metrics_http_proto("http://localhost:4318/v1/metrics"))
.spawn()
.unwrap()

Configuring TLS

If the tls Cargo feature is enabled, and the scheme of your endpoint uses the https:// scheme then it will use TLS from rustls and rustls-native-certs.

Configuring compression

If the gzip Cargo feature is enabled then gzip compression will be applied automatically to all export requests.

You can disable any compression through an OtlpTransportBuilder:

emit_otlp::new()
.logs(emit_otlp::logs_proto(emit_otlp::http("http://localhost:4318/v1/logs")
.allow_compression(false))
)

Customizing HTTP headers

You can specify custom headers to be used for HTTP or gRPC requests through an OtlpTransportBuilder:

emit_otlp::new()
.logs(emit_otlp::logs_proto(emit_otlp::http("http://localhost:4318/v1/logs")
.headers([
("X-ApiKey", "abcd"),
]))
)

Configuring a resource

The OtlpBuilder::resource method configures the OTLP resource to send with each export request. Some OTLP receivers accept data without a resource but the OpenTelemetry specification itself mandates it.

At a minimum, you should add the service.name property:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})

You should also consider setting other well-known resource properties:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
#[emit::key("telemetry.sdk.language")]
language: emit_otlp::telemetry_sdk_language(),
#[emit::key("telemetry.sdk.name")]
sdk: emit_otlp::telemetry_sdk_name(),
#[emit::key("telemetry.sdk.version")]
version: emit_otlp::telemetry_sdk_version(),
})

Logs

All emit::Events can be represented as OTLP log records. You should at least configure the logs signal to make sure all diagnostics are captured in some way. A minimal logging configuration for gRPC+Protobuf is:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4318"))
.spawn()
.unwrap()

The following diagnostic:

emit::info!("Hello, OTLP!");

will produce the following HTTP+JSON export request:

http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716804019165847000,
"observedTimeUnixNano": 1716804019165847000,
"body": {
"stringValue": "Hello, OTLP!"
},
"attributes": [],
"severityNumber": 9,
"severityText": "info"
}
]
}
]
}
]
}

When the traces signal is not configured, diagnostic events for spans are represented as regular OTLP log records. The following diagnostic:

#[emit::span("Compute {a} + {b}")]
fn add(a: i32, b: i32) -> i32 {
let r = a + b;

emit::info!("Produced {r}", r);

r
}

add(1, 3);

will produce the following HTTP+JSON export request:

http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716804240222377000,
"observedTimeUnixNano": 1716804240222377000,
"body": {
"stringValue": "Produced 4"
},
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "r",
"value": {
"intValue": 4
}
}
],
"severityNumber": 9,
"severityText": "info",
"traceId": "489571cc6b94414ceb4a32ccc2c7df09",
"spanId": "a93239061c12aa4c"
},
{
"timeUnixNano": 1716804240222675000,
"observedTimeUnixNano": 1716804240222675000,
"body": {
"stringValue": "Compute 1 + 3"
},
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "event_kind",
"value": {
"stringValue": "span"
}
},
{
"key": "span_name",
"value": {
"stringValue": "Compute {a} + {b}"
}
}
],
"severityNumber": 9,
"severityText": "info",
"traceId": "489571cc6b94414ceb4a32ccc2c7df09",
"spanId": "a93239061c12aa4c"
}
]
}
]
}
]
}

When the metrics signal is not configured, diagnostic events for metric samples are represented as regular OTLP log records. The following diagnostic:

emit::runtime::shared().emit(
emit::Metric::new(
emit::module!(),
emit::Empty,
"my_metric",
"count",
42,
emit::Empty,
)
);

will produce the following HTTP+JSON export request:

http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716876516012074000,
"observedTimeUnixNano": 1716876516012074000,
"body": {
"stringValue": "count of my_metric is 42"
},
"attributes": [
{
"key": "event_kind",
"value": {
"stringValue": "metric"
}
},
{
"key": "metric_agg",
"value": {
"stringValue": "count"
}
},
{
"key": "metric_name",
"value": {
"stringValue": "my_metric"
}
},
{
"key": "metric_value",
"value": {
"intValue": 42
}
}
],
"severityNumber": 9,
"severityText": "info"
}
]
}
]
}
]
}

Traces

When the traces signal is configured, emit::Events can be represented as OTLP spans so long as they satisfy the following conditions:

If any condition is not met, the event will be represented as an OTLP log record. If the logs signal is not configured then it will be discarded.

A minimal logging configuration for gRPC+Protobuf is:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.traces(emit_otlp::traces_grpc_proto("http://localhost:4318"))
.logs(emit_otlp::logs_grpc_proto("http://localhost:4318"))
.spawn()
.unwrap()

The following diagnostic:

#[emit::span("Compute {a} + {b}")]
fn add(a: i32, b: i32) -> i32 {
let r = a + b;

emit::info!("Produced {r}", r);

r
}

add(1, 3);

will produce the following HTTP+JSON export requests:

http://localhost:4318/v1/traces
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "my_app"
},
"spans": [
{
"name": "Compute {a} + {b}",
"kind": 0,
"startTimeUnixNano": 1716888416629816000,
"endTimeUnixNano": 1716888416630814000,
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
}
],
"traceId": "0a85ccaf666e11aaca6bd5d469e2850d",
"spanId": "2b9caa35eaefed3a"
}
]
}
]
}
]
}
http://localhost:4318/v1/logs
{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeLogs": [
{
"scope": {
"name": "my_app"
},
"logRecords": [
{
"timeUnixNano": 1716888416630507000,
"observedTimeUnixNano": 1716888416630507000,
"body": {
"stringValue": "Produced 4"
},
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "r",
"value": {
"intValue": 4
}
}
],
"severityNumber": 9,
"severityText": "info",
"traceId": "0a85ccaf666e11aaca6bd5d469e2850d",
"spanId": "2b9caa35eaefed3a"
}
]
}
]
}
]
}

If the emit::well_known::KEY_ERR property is set, then the resulting OTLP span will carry the semantic exception event:

#[emit::span(guard: span, "Compute {a} + {b}")]
fn add(a: i32, b: i32) -> i32 {
let r = a + b;

if r == 4 {
span.complete_with(|evt| {
emit::error!(
evt,
"Compute {a} + {b} failed",
a,
b,
r,
err: "Invalid result",
);
});
}

r
}

add(1, 3);
http://localhost:4318/v1/traces
{
"resourceSpans": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeSpans": [
{
"scope": {
"name": "my_app"
},
"spans": [
{
"name": "Compute {a} + {b}",
"kind": 0,
"startTimeUnixNano": 1716936430882852000,
"endTimeUnixNano": 1716936430883250000,
"attributes": [
{
"key": "a",
"value": {
"intValue": 1
}
},
{
"key": "b",
"value": {
"intValue": 3
}
},
{
"key": "r",
"value": {
"intValue": 4
}
}
],
"traceId": "6499bc190add060dad8822600ba65226",
"spanId": "b72c5152c32cc432",
"events": [
{
"name": "exception",
"timeUnixNano": 1716936430883250000,
"attributes": [
{
"key": "exception.message",
"value": {
"stringValue": "Invalid result"
}
}
]
}
],
"status": {
"message": "Invalid result",
"code": 2
}
}
]
}
]
}
]
}

Metrics

When the metrics signal is configured, emit::Events can be represented as OTLP metrics so long as they satisfy the following conditions:

If any condition is not met, the event will be represented as an OTLP log record. If the logs signal is not configured then it will be discarded.

A minimal logging configuration for gRPC+Protobuf is:

emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4318"))
.logs(emit_otlp::logs_grpc_proto("http://localhost:4318"))
.spawn()
.unwrap()

If the metric aggregation is "count" then the resulting OTLP metric is a monotonic sum:

emit::runtime::shared().emit(
emit::Metric::new(
emit::module!(),
emit::Empty,
"my_metric",
"count",
42,
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"sum": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716889540249854000,
"timeUnixNano": 1716889540249854000,
"value": 42
}
],
"aggregationTemporality": 2,
"isMonotonic": true
}
}
]
}
]
}
]
}

If the metric aggregation is "sum" then the resulting OTLP metric is a non-monotonic sum:

emit::runtime::shared().emit(
emit::Metric::new(
emit::module!(),
emit::Empty,
"my_metric",
"sum",
-8,
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"sum": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716889891391075000,
"timeUnixNano": 1716889891391075000,
"value": -8
}
],
"aggregationTemporality": 2,
"isMonotonic": false
}
}
]
}
]
}
]
}

Any other aggregation will be represented as an OTLP gauge:

emit::runtime::shared().emit(
emit::Metric::new(
emit::module!(),
emit::Empty,
"my_metric",
"last",
615,
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"gauge": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716890230856380000,
"timeUnixNano": 1716890230856380000,
"value": 615
}
]
}
}
]
}
]
}
]
}

If the metric aggregation is "count" or "sum", and value is a sequence, then each value will be summed to produce a single data point:

emit::runtime::shared().emit(
emit::Metric::new(
emit::module!(),
emit::Timestamp::from_unix(std::time::Duration::from_secs(1716890420))..emit::Timestamp::from_unix(std::time::Duration::from_secs(1716890425)),
"my_metric",
"count",
&[
1.0,
1.0,
1.0,
1.0,
1.0,
],
emit::props! {
a: true
},
)
);
http://localhost:4318/v1/metrics
{
"resourceMetrics": [
{
"resource": {
"attributes": [
{
"key": "service.name",
"value": {
"stringValue": "my_app"
}
}
]
},
"scopeMetrics": [
{
"scope": {
"name": "my_app"
},
"metrics": [
{
"name": "my_metric",
"unit": null,
"sum": {
"dataPoints": [
{
"attributes": [
{
"key": "a",
"value": {
"boolValue": true
}
}
],
"startTimeUnixNano": 1716890420000000000,
"timeUnixNano": 1716890425000000000,
"value": 5
}
],
"aggregationTemporality": 1,
"isMonotonic": true
}
}
]
}
]
}
]
}

Limitations

This library is not an alternative to the OpenTelemetry SDK. It's specifically targeted at emitting diagnostic events to OTLP-compatible services. It has some intentional limitations:

  • No propagation. This is the responsibility of the application to manage.
  • No histogram metrics. emit's data model for metrics is simplistic compared to OpenTelemetry's, so it doesn't support histograms or exponential histograms.
  • No span events. Only the conventional exception event is supported. Standalone log events are not converted into span events. They're sent via the logs endpoint instead.
  • No tracestate. emit's data model for spans doesn't include the W3C tracestate.

Troubleshooting

If you're not seeing diagnostics appear in your OTLP receiver, you can rule out configuration issues in emit_otlp by configuring emit's internal logger, and collect metrics from it:

use emit::metric::Source;

fn main() {
// 1. Initialize the internal logger
//    Diagnostics produced by `emit_otlp` itself will go here
let internal = emit::setup()
.emit_to(emit_term::stdout())
.init_internal();

let mut reporter = emit::metric::Reporter::new();

let rt = emit::setup()
.emit_to({
let otlp = emit_otlp::new()
.resource(emit::props! {
#[emit::key("service.name")]
service_name: env!("CARGO_PKG_NAME"),
})
.logs(emit_otlp::logs_grpc_proto("http://localhost:4319"))
.traces(emit_otlp::traces_grpc_proto("http://localhost:4319"))
.metrics(emit_otlp::metrics_grpc_proto("http://localhost:4319"))
.spawn()
.unwrap();

// 2. Add `emit_otlp`'s metrics to a reporter so we can see what it's up to
//    You can do this independently of the internal emitter
reporter.add_source(otlp.metric_source());

otlp
})
.init();

// Your app code goes here

rt.blocking_flush(std::time::Duration::from_secs(30));

// 3. Report metrics after attempting to flush
//    You could also do this periodically as your application runs
reporter.emit_metrics(&internal.emitter());
}

Diagnostics include when batches are emitted, and any failures observed along the way.

Dependencies

~8–20MB
~276K SLoC