spec.connections
Each entry in spec.connections represents one arrow on the canvas. Under the hood it is three things at once: an output topic the upstream component publishes to, a named durable subscription the downstream component consumes from, and a route identity that ties them together for operational traceability.
What is a connection?
A single visual arrow maps to three distinct broker objects:
- Output topic — the Pulsar/Kafka topic the upstream component's wrapper publishes the MPPM envelope to.
- Subscription — the named durable subscription the downstream wrapper consumes from. The subscription name is stable and version-controlled — if it changes between deployments, messages are silently dropped.
- Route — the logical pairing with its own ID, used for operational dashboards, error routing, and tap inspection.
When you draw an arrow in the IDE, all three are auto-generated from the naming convention. Every auto-generated value is editable.
Full annotated example
connections: - id: conn-enricher-to-validator # auto: {from-id}--{to-id} from: component: customer-enricher port: output # typed port from component manifest to: component: inventory-validator port: input route: topic: logistics.orders.order-fulfillment.customer-enricher.out # auto subscription: logistics.orders.order-fulfillment.inventory-validator.sub # auto subscription_type: Shared # inherits spec.defaults delivery_guarantee: at_least_once # inherits spec.defaults source_schema_ref: logistics.schemas.order-enriched-customer:1.0.0 target_schema_ref: logistics.schemas.order-to-validate:1.0.0 retention_policy: ttl_hours: 24 max_size_mb: 512 transform: # present only for Mode 2 UTL-X connections type: utlx mode: ref ref: logistics.mappings.enrich-to-validate:1.2.0 retry: max_attempts: 3 # inherits spec.defaults strategy: exponential_backoff initial_delay_ms: 500 max_delay_ms: 30000 error_routing: topic: logistics.orders.order-fulfillment.customer-enricher.err # auto subscription: logistics.orders.order-fulfillment.order-error-handler.err-sub # auto
from / to
| Field | Type | Req | Description |
|---|---|---|---|
| from.component | string | required | ID of the upstream component. Must match an entry in spec.components. |
| from.port | string | required | Port name on the upstream component. Typically output, error, or a named output port declared in the component manifest. Error-routing connections use error. |
| to.component | string | required | ID of the downstream component. |
| to.port | string | required | Port name on the downstream component. Typically input. |
route stanza
| Field | Type | Req | Description |
|---|---|---|---|
| topic | string | auto | Pulsar/Kafka topic name. Auto-generated as {ns}.{pipeline}.{from-component}.out. Must be stable across deployments. |
| subscription | string | auto | Named durable subscription. Auto-generated as {ns}.{pipeline}.{to-component}.sub. Renaming drops unprocessed messages — treat as immutable in production. |
| subscription_type | enum | optional | One of Shared, Exclusive, Failover, Key_Shared. Default Shared (correct for horizontally-scaled components). See subscription types below. |
| delivery_guarantee | enum | optional | at_most_once | at_least_once | effectively_once. Inherits from spec.defaults. |
| source_schema_ref | string | optional | Schema of the payload arriving on this topic (what the upstream component produces). Used for IDE compatibility checks and UTL-X transform input validation. Format: namespace.schemas.name:version. |
| target_schema_ref | string | optional | Schema the downstream component expects to receive. Present when a Mode 2 UTL-X transform is declared on this connection. The transform bridges source → target. |
| schema_ref | string | optional | Shorthand when source and target schemas match (Mode 1 — no transform). Use source_schema_ref + target_schema_ref when a transform is present. |
| retention_policy.ttl_hours | integer | optional | Topic retention time in hours. Inherits from spec.defaults.retention_policy. |
| retention_policy.max_size_mb | integer | optional | Maximum topic backlog size in MB before oldest messages are dropped. |
Subscription types
| Type | Behaviour | When to use |
|---|---|---|
| Shared | Messages distributed round-robin across all consumers in the subscription. Multiple replicas share the load. | Default. Correct for horizontally-scaled components (replicas > 1). |
| Exclusive | Only one consumer at a time. Others are on standby. Fails if consumer disconnects. | Ordered processing where only one replica should run. |
| Failover | One active consumer; standby takes over automatically on disconnect. Message order preserved. | Active/passive high-availability with ordering requirement. |
| Key_Shared | Messages with the same key always routed to the same consumer. Multiple consumers, ordered per key. | Stateful per-entity processing (e.g. all events for one customer to the same pod). Required for scatter-gather reply collection. |
retry stanza
| Field | Type | Description |
|---|---|---|
| max_attempts | integer | Maximum delivery attempts before the message is routed to the error topic. Default 3. Set to 1 for error-routing connections (no retry on the error handler). |
| strategy | enum | immediate | fixed | exponential_backoff. Controls the delay between retry attempts. |
| initial_delay_ms | integer | Delay before the first retry (ms). Used by fixed and exponential_backoff. |
| max_delay_ms | integer | Maximum backoff cap (ms). Applies to exponential_backoff only. |
error_routing stanza
After max_attempts are exhausted, the wrapper routes the failed envelope (with error metadata appended) to the error topic. A separate error-handler component in the pipeline subscribes to this topic. If no error handler is connected, the IDE shows a canvas warning.
| Field | Type | Description |
|---|---|---|
| topic | string | Error topic name. Auto-generated as {ns}.{pipeline}.{from-component}.err. |
| subscription | string | Subscription name on the error topic for the error-handler component. Auto-generated. |
The error topic is a standard Pulsar/Kafka topic — other pipeline components can subscribe to it. The DLQ is the final destination after all retries and error-routing attempts are exhausted. Configure the pipeline-level DLQ in error_handling.
Fan-out & fan-in
Fan-out (one output topic, multiple consumers) is expressed as two separate connections that share the same route.topic but have different route.subscription names. In the canvas this appears as two arrows leaving the same output port.
connections: - id: conn-enricher-to-validator from: { component: customer-enricher, port: output } to: { component: inventory-validator, port: input } route: topic: logistics.orders.order-fulfillment.customer-enricher.out subscription: logistics.orders.order-fulfillment.inventory-validator.sub - id: conn-enricher-to-audit from: { component: customer-enricher, port: output } to: { component: audit-logger, port: input } route: topic: logistics.orders.order-fulfillment.customer-enricher.out subscription: logistics.orders.order-fulfillment.audit-logger.sub # different sub, same topic
Fan-in from the same MPPM correlation chain is handled by the UTL-X transform.inputs[] N-input mechanism — see the transforms reference. Fan-in from independent correlation chains requires a Mode 3 stateful-join component.
Naming convention
All auto-generated topic and subscription names follow this pattern, derived from the pipeline namespace, pipeline name, and component IDs:
# Output topic {namespace}.{pipeline-name}.{from-component-id}.out # Subscription {namespace}.{pipeline-name}.{to-component-id}.sub # Error topic {namespace}.{pipeline-name}.{from-component-id}.err # Log topic (on component, not connection) {namespace}.{pipeline-name}.{component-id}.log # DLQ topic {namespace}.{pipeline-name}.{component-id}.dlq # Example — logistics.orders pipeline, customer-enricher → inventory-validator: logistics.orders.order-fulfillment.customer-enricher.out logistics.orders.order-fulfillment.inventory-validator.sub logistics.orders.order-fulfillment.customer-enricher.err