Pipeline YAML Reference

spec.connections

Each entry in spec.connections represents one arrow on the canvas. Under the hood it is three things at once: an output topic the upstream component publishes to, a named durable subscription the downstream component consumes from, and a route identity that ties them together for operational traceability.

What is a connection?

A single visual arrow maps to three distinct broker objects:

When you draw an arrow in the IDE, all three are auto-generated from the naming convention. Every auto-generated value is editable.

Full annotated example

yaml — complete connection stanza
connections:
  - id:   conn-enricher-to-validator            # auto: {from-id}--{to-id}
    from:
      component: customer-enricher
      port:      output                              # typed port from component manifest
    to:
      component: inventory-validator
      port:      input

    route:
      topic:             logistics.orders.order-fulfillment.customer-enricher.out  # auto
      subscription:      logistics.orders.order-fulfillment.inventory-validator.sub # auto
      subscription_type: Shared              # inherits spec.defaults
      delivery_guarantee: at_least_once      # inherits spec.defaults
      source_schema_ref: logistics.schemas.order-enriched-customer:1.0.0
      target_schema_ref: logistics.schemas.order-to-validate:1.0.0
      retention_policy:
        ttl_hours:    24
        max_size_mb:  512

    transform:                                  # present only for Mode 2 UTL-X connections
      type: utlx
      mode: ref
      ref:  logistics.mappings.enrich-to-validate:1.2.0

    retry:
      max_attempts:     3                     # inherits spec.defaults
      strategy:         exponential_backoff
      initial_delay_ms: 500
      max_delay_ms:     30000

    error_routing:
      topic:        logistics.orders.order-fulfillment.customer-enricher.err       # auto
      subscription: logistics.orders.order-fulfillment.order-error-handler.err-sub # auto

from / to

FieldTypeReqDescription
from.componentstringrequiredID of the upstream component. Must match an entry in spec.components.
from.portstringrequiredPort name on the upstream component. Typically output, error, or a named output port declared in the component manifest. Error-routing connections use error.
to.componentstringrequiredID of the downstream component.
to.portstringrequiredPort name on the downstream component. Typically input.

route stanza

FieldTypeReqDescription
topicstringautoPulsar/Kafka topic name. Auto-generated as {ns}.{pipeline}.{from-component}.out. Must be stable across deployments.
subscriptionstringautoNamed durable subscription. Auto-generated as {ns}.{pipeline}.{to-component}.sub. Renaming drops unprocessed messages — treat as immutable in production.
subscription_typeenumoptionalOne of Shared, Exclusive, Failover, Key_Shared. Default Shared (correct for horizontally-scaled components). See subscription types below.
delivery_guaranteeenumoptionalat_most_once | at_least_once | effectively_once. Inherits from spec.defaults.
source_schema_refstringoptionalSchema of the payload arriving on this topic (what the upstream component produces). Used for IDE compatibility checks and UTL-X transform input validation. Format: namespace.schemas.name:version.
target_schema_refstringoptionalSchema the downstream component expects to receive. Present when a Mode 2 UTL-X transform is declared on this connection. The transform bridges source → target.
schema_refstringoptionalShorthand when source and target schemas match (Mode 1 — no transform). Use source_schema_ref + target_schema_ref when a transform is present.
retention_policy.ttl_hoursintegeroptionalTopic retention time in hours. Inherits from spec.defaults.retention_policy.
retention_policy.max_size_mbintegeroptionalMaximum topic backlog size in MB before oldest messages are dropped.

Subscription types

TypeBehaviourWhen to use
SharedMessages distributed round-robin across all consumers in the subscription. Multiple replicas share the load.Default. Correct for horizontally-scaled components (replicas > 1).
ExclusiveOnly one consumer at a time. Others are on standby. Fails if consumer disconnects.Ordered processing where only one replica should run.
FailoverOne active consumer; standby takes over automatically on disconnect. Message order preserved.Active/passive high-availability with ordering requirement.
Key_SharedMessages with the same key always routed to the same consumer. Multiple consumers, ordered per key.Stateful per-entity processing (e.g. all events for one customer to the same pod). Required for scatter-gather reply collection.

retry stanza

FieldTypeDescription
max_attemptsintegerMaximum delivery attempts before the message is routed to the error topic. Default 3. Set to 1 for error-routing connections (no retry on the error handler).
strategyenumimmediate | fixed | exponential_backoff. Controls the delay between retry attempts.
initial_delay_msintegerDelay before the first retry (ms). Used by fixed and exponential_backoff.
max_delay_msintegerMaximum backoff cap (ms). Applies to exponential_backoff only.

error_routing stanza

After max_attempts are exhausted, the wrapper routes the failed envelope (with error metadata appended) to the error topic. A separate error-handler component in the pipeline subscribes to this topic. If no error handler is connected, the IDE shows a canvas warning.

FieldTypeDescription
topicstringError topic name. Auto-generated as {ns}.{pipeline}.{from-component}.err.
subscriptionstringSubscription name on the error topic for the error-handler component. Auto-generated.
ℹ️

The error topic is a standard Pulsar/Kafka topic — other pipeline components can subscribe to it. The DLQ is the final destination after all retries and error-routing attempts are exhausted. Configure the pipeline-level DLQ in error_handling.

Fan-out & fan-in

Fan-out (one output topic, multiple consumers) is expressed as two separate connections that share the same route.topic but have different route.subscription names. In the canvas this appears as two arrows leaving the same output port.

yaml — fan-out: one topic, two subscribers
connections:
  - id:   conn-enricher-to-validator
    from: { component: customer-enricher, port: output }
    to:   { component: inventory-validator, port: input  }
    route:
      topic:        logistics.orders.order-fulfillment.customer-enricher.out
      subscription: logistics.orders.order-fulfillment.inventory-validator.sub

  - id:   conn-enricher-to-audit
    from: { component: customer-enricher, port: output }
    to:   { component: audit-logger,         port: input  }
    route:
      topic:        logistics.orders.order-fulfillment.customer-enricher.out
      subscription: logistics.orders.order-fulfillment.audit-logger.sub   # different sub, same topic

Fan-in from the same MPPM correlation chain is handled by the UTL-X transform.inputs[] N-input mechanism — see the transforms reference. Fan-in from independent correlation chains requires a Mode 3 stateful-join component.

Naming convention

All auto-generated topic and subscription names follow this pattern, derived from the pipeline namespace, pipeline name, and component IDs:

pattern
# Output topic
{namespace}.{pipeline-name}.{from-component-id}.out

# Subscription
{namespace}.{pipeline-name}.{to-component-id}.sub

# Error topic
{namespace}.{pipeline-name}.{from-component-id}.err

# Log topic (on component, not connection)
{namespace}.{pipeline-name}.{component-id}.log

# DLQ topic
{namespace}.{pipeline-name}.{component-id}.dlq

# Example — logistics.orders pipeline, customer-enricher → inventory-validator:
logistics.orders.order-fulfillment.customer-enricher.out
logistics.orders.order-fulfillment.inventory-validator.sub
logistics.orders.order-fulfillment.customer-enricher.err