Skip to content

API Reference

setup() — recommended entry point

from helixobs.setup import setup

tel = setup(
    service_name,
    *,
    instrument_id=None,
    endpoint="localhost:4317",
    insecure=True,
    otlp=False,
    log_endpoint=None,
    process_name=None,
    credential=None,
    auth_endpoint=None,
    instrument_class=Instrument,
)

Configures logging and returns a ready-to-use Instrument stamped with the same service_name.

Parameter Type Default Description
service_name str OTel service name for traces and logs
instrument_id str Short instrument identifier (e.g. "MY_INST"). Required unless instrument_class owns its own ID
endpoint str "localhost:4317" Herald gRPC address
insecure bool True Disable TLS (set False in production with TLS)
otlp bool False Ship logs via OTLP instead of stdout JSON
log_endpoint str\|None None OTel Collector address for logs. Falls back to OTEL_EXPORTER_OTLP_ENDPOINT, then http://localhost:4317
process_name str\|None None Pipeline process name for the Pipeline Logs dashboard. Use INST_ID/pipeline/stage convention
credential str\|Callable\|None None Registration secret or callable returning one. Required when herald auth is enabled
auth_endpoint str\|None None Herald auth token endpoint. Required when credential is set
instrument_class type Instrument Subclass to instantiate instead of base Instrument

Instrument

Instrument.create(stage, *, id, parents=None) → Token

Returns a Token for a new entity. Works as a context manager, decorator, or explicit API — see Tracking Entities.

# Context manager (recommended)
with tel.create("ingest", id="block-001", parents=["upstream"]) as token:
    token.set_attribute("size_mb", 42)
    # complete() called automatically on exit

# Decorator
@tel.create("ingest", id=lambda block_id, **_: block_id)
def ingest(block_id):
    ...

# Explicit API
token = tel.create("ingest", id="block-001", parents=["upstream"])
token.start()
token.complete()

Instrument.operate(operation, *, entity_id=None) → Token

Returns a Token for work on an existing entity. Writes to entity_operations, not entities. Same three usage patterns as create().

with tel.operate("archive", entity_id="event-7") as token:
    write_archive()
    token.set_attribute("path", "/data/event-7.h5")

Deferred entity ID. entity_id is optional. Omit it to open the trace immediately — before the entity is known — then call token.set_entity_id(id) once discovered. All logs emitted before the call share the same otelTraceID and are reachable from the Entity Inspector via the operation's trace link.

with tel.operate("stage-deletion") as token:
    items = fetch_work()          # logged under this trace
    if not items:
        return                    # no entity → span forwarded as plain trace
    token.set_entity_id(items[0].dataset_name)
    process(items)

If the span closes without entity_id a WARNING is logged and no entity_operations row is written.

Instrument.trace(name, *, attributes=None)

Context manager for a plain OTel span with no entity semantics. Use for infrastructure work — HTTP handlers, background loops, daemon iterations — where log correlation by trace ID is useful but no entity is being tracked. The herald forwards the span unchanged.

with tel.trace("handle-request", attributes={"method": "POST"}):
    with tel.child_span("validate"):
        validate(request)
    with tel.child_span("write-db"):
        write(request)

child_span() calls inside a tel.trace() block automatically inherit the trace context and share its otelTraceID. Log lines emitted anywhere inside carry otel_trace_id, filterable in Loki without needing an entity ID.

Instrument.child_span(name, *, parent_id=None, attributes=None)

Context manager for a child span that appears in the Tempo trace waterfall but does not create an entity row. Use for internal sub-steps within an entity's processing.

with tel.create("process", id="block-001") as token:
    with tel.child_span("filter", attributes={"filter.type": "bandpass"}):
        apply_filter()

Instrument.shutdown()

Flushes pending spans and shuts down the exporter. Call at application exit if you need a clean flush — not required if the process exits normally.


Token

token.start() → Token

Starts the OTel span. Returns self for chaining. Called automatically when entering a with block.

token.complete(metadata=None)

Ends the span in success state. metadata is a dict of JSON-serialisable values stored in TimescaleDB.

token.error(metadata=None)

Records a helix.error span event, marks the span as failed, and ends the span. Use for hard failures — the operation cannot continue.

token.error({"message": "timeout", "stage": "archive"})

Triggers configured notifications (Slack, GitHub Issues).

token.add_error(metadata=None)

Records a helix.error span event and marks the span as failed, but leaves the span open. Use for soft/recoverable failures where the operation continues and you will call complete() or error() later.

with tel.operate("post-process", entity_id=product_id) as token:
    try:
        write_header()
    except Exception as e:
        token.add_error({"stage": "write-header", "message": type(e).__name__})
    # context manager calls complete() on clean exit

Write a concise message — don't dump raw exceptions

The message field drives notification deduplication. Every occurrence of the same error class must produce the same message string so they converge on a single GitHub issue.

# Good — stable, human-readable error kind:
token.add_error({"message": "db_id_overflow", "stage": "dump_header"})
token.add_error({"message": type(e).__name__, "stage": "dump_header"})

# Bad — str(e) may embed per-entity payload data (tracebacks, floats, etc.)
# causing every entity to open its own issue:
token.add_error({"message": str(e)})

Full exception detail belongs in the log line (automatically correlated to this entity's trace via otelTraceID) or in a separate "detail" metadata key, not in message.

token.add_event(name, attributes=None)

Records a named span event. Events named helix.event.* are stored in entity_events and appear in the Entity Inspector timeline.

token.add_event("helix.event.classified", attributes={"label": "candidate", "confidence": "0.97"})

token.set_entity_id(entity_id)

Sets the entity ID mid-operation. Use with deferred entity_id on operate().

with tel.operate("stage-replication") as token:
    dataset = fetch_next()
    if dataset:
        token.set_entity_id(dataset.name)

Equivalent to token.set_attribute("helix.entity.id", entity_id).

token.set_attribute(key, value)

Sets a span attribute. Values are coerced to strings.

token.set_attribute("output.path", "/data/result.h5")

configure_logging() — direct logging setup

from helixobs.logging import configure_logging

configure_logging(otlp=False, service_name=None)

Prefer setup() for the common case. Use this directly only when you need logs without traces.

Parameter Default Description
otlp False Ship logs via OTLP gRPC instead of stdout JSON
service_name None Required when otlp=True

Safe to call multiple times — subsequent calls are no-ops.


install_context_fields() — inject fields only

from helixobs.logging import install_context_fields
install_context_fields()

Injects helix context fields into log records without adding or modifying any handlers. Use this when your pipeline already has its own logging setup and you only want helix_entity_id, otel_trace_id, etc. available in your existing format string.