Concepts

MPPM Envelope

Every message in an Open-M pipeline is wrapped in an MPPM envelope โ€” the universal message contract. It carries the business payload, a correlation ID that traces the message across all hops, a sliding step window of recent payloads, schema references, and delivery metadata. The envelope is the platform concern; the payload inside it is the business concern.

What is MPPM?

MPPM stands for Multi-Part Pipeline Message. The name reflects its defining characteristic: unlike a simple point-to-point message that carries only a current payload, an MPPM envelope carries a structured history โ€” a sliding window of the last N payloads the message passed through. Every component in the pipeline sees not just what just arrived, but what was produced by recent upstream steps.

This design eliminates a class of integration problems that plague traditional middleware: a downstream component needing data from two hops back either has to receive it carried forward explicitly in the payload (coupling upstream and downstream schema concerns) or has to make a lookup call (adding latency and a failure point). With MPPM, the step window makes recent context available as a first-class platform feature, with no business logic involvement.

๐Ÿ“ฆ

The MPPM envelope is always the transport wrapper. The envelope schema is platform-defined and validated by every Open-M wrapper. The payload inside the envelope is the business data โ€” its schema is governed by the Schema Registry and declared in the pipeline YAML.

Envelope anatomy

An MPPM envelope is a structured JSON (or Protobuf/Avro) object with three top-level sections: header (identity and routing metadata), payload (the current business data), and steps (the sliding history window).

json โ€” MPPM envelope structure
{
  "envelope_version": "1.0",

  // โ”€โ”€ HEADER โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
  "correlation_id":  "550e8400-e29b-41d4-a716-446655440000",  // traces full journey
  "pipeline_id":     "logistics.orders.order-fulfillment",
  "component_id":    "customer-enricher",                       // last producer
  "instance_id":     "customer-enricher-pod-7f4b9d",
  "cluster":         "k8s-prod-eu-west",
  "timestamp":       "2026-04-14T09:23:01.443Z",
  "schema_ref":      "logistics.schemas.order-enriched-customer:1.0.0",
  "content_type":    "application/json",
  "delivery_guarantee": "at_least_once",
  "step_depth":      3,

  // โ”€โ”€ PAYLOAD โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
  "payload": {
    "orderId":    "ORD-2026-9921",
    "customer":   { "id": "CUST-4471", "tier": "gold" },
    "lineItems":  [ /* ... */ ]
  },

  // โ”€โ”€ STEP WINDOW โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
  "steps": [
    {
      // step[0] โ€” what arrived at customer-enricher (the raw order)
      "component_id": "inbound-order-receiver",
      "schema_ref":   "logistics.schemas.order-received:1.0.0",
      "timestamp":   "2026-04-14T09:23:00.991Z",
      "payload":     { "orderId": "ORD-2026-9921", /* raw inbound */ }
    },
    {
      // step[-1] โ€” two hops back
      "component_id": "http-gateway",
      "schema_ref":   "logistics.schemas.http-inbound:1.0.0",
      "timestamp":   "2026-04-14T09:23:00.801Z",
      "payload":     { /* raw HTTP body */ }
    }
  ]
}

Field reference

Header fields

FieldTypeReqDescription
envelope_version string required Envelope schema version ("1.0"). Allows forward-compatible parsing if the envelope schema evolves.
correlation_id UUID required Immutable identifier generated at pipeline entry and carried unchanged through every hop. Used for DLQ replay, log correlation, scatter-gather aggregation, and the IDE's message inspector. Never changes during the message's lifetime.
pipeline_id string required Fully-qualified pipeline identifier in namespace.name format. Set at entry and immutable.
component_id string required ID of the component that produced this envelope version. Updated by each wrapper on publish.
instance_id string required Pod/process identifier of the specific component instance that produced the envelope. Useful for diagnosing issues in horizontally-scaled deployments.
cluster string required Kubernetes cluster reference where this envelope was produced. Relevant in multi-cluster deployments.
timestamp ISO 8601 required UTC timestamp of when this envelope was produced by the component. Each wrapper sets a fresh timestamp on publish.
schema_ref string required Schema registry reference for the current payload field, in namespace.schemas.name:version format. Validated by the receiving wrapper before business logic runs.
content_type string required MIME type of the payload bytes โ€” e.g. application/json, application/xml, text/csv. Tells the wrapper how to deserialise the payload before passing it to business logic.
delivery_guarantee enum required One of at_most_once, at_least_once, effectively_once. Inherited from the pipeline YAML connection stanza and governs wrapper ack behaviour.
step_depth integer required Maximum number of step entries in the steps array. Default 3. Set at pipeline level; wrappers trim the steps array when it exceeds this depth.
reply_to_topic string optional Ephemeral reply inbox topic for synchronous request/reply patterns. Present only when a component is waiting for a response. TTL is set to 2ร— the configured sync_timeout.
scatter_index integer optional Branch index in scatter-gather patterns (0-based). Present only when the envelope is one of N branches in a scatter operation.
scatter_count integer optional Total number of branches in a scatter-gather. The gather component uses this to know when all branches have returned before emitting a merged envelope.

Payload field

The payload field contains the actual business data. It is typed bytes with a declared content_type and validated against schema_ref on receive. The envelope does not prescribe an encoding โ€” payload can be JSON, XML, CSV, or any registered format. For pipeline-internal messages, JSON is recommended because the IDE's message inspector renders it without a schema lookup.

The step window

The steps array is a sliding window of the last N payloads the message passed through โ€” ordered from oldest (index 0) to most recent. Each wrapper prepends a new step entry on publish and trims the oldest if the array exceeds step_depth.

Pipeline flow
http-gateway
โ†’
inbound-order-receiver
โ†’
customer-enricher โ† you are here
โ†’
inventory-validator
Step window carried by the envelope leaving customer-enricher (depth 3)
steps[2] โ€” most recent customer-enricher output
schema: order-enriched-customer:1.0.0
steps[1] inbound-order-receiver output
schema: order-received:1.0.0
steps[0] โ€” oldest http-gateway output
schema: http-inbound:1.0.0

Each step entry records the producing component, the schema reference of the payload at that point, a timestamp, and the payload bytes themselves. This gives any downstream component โ€” or any UTL-X mapping โ€” read access to recent upstream states without any component needing to explicitly pass that data forward.

Step depth configuration

The default depth is 3. It is configured at pipeline level in the YAML spec.defaults stanza and can be overridden per component. Depth choices:

yaml โ€” step depth configuration
spec:
  defaults:
    step_depth: 3       # default โ€” suitable for most pipelines

  components:
    - id: compliance-archiver
      config:
        step_depth: 0   # unlimited โ€” full audit trail

    - id: high-throughput-router
      config:
        step_depth: 1   # minimal โ€” only current payload, reduces message size
๐Ÿ’ก

Why not use message headers for upstream context? Headers are size-limited, untyped, and not schema-governed. By embedding the step window as a first-class part of the message body, Open-M provides full schema validation at every hop, consistent serialisation, and straightforward debugging โ€” reading any message on any topic shows the full recent history.

Log envelope

Alongside the business MPPM envelope, every component publishes structured log events to a dedicated log topic. The log envelope is lighter than a full MPPM envelope โ€” it does not carry the step window โ€” but it carries the correlation_id, which is the bridge between the business data stream and the log stream.

json โ€” log envelope
{
  "correlation_id": "550e8400-e29b-41d4-a716-446655440000",
  "pipeline_id":    "logistics.orders.order-fulfillment",
  "component_id":   "customer-enricher",
  "instance_id":    "customer-enricher-pod-7f4b9d",
  "cluster":        "k8s-prod-eu-west",
  "timestamp":      "2026-04-14T09:23:01.443Z",
  "level":          "INFO",
  "message":        "Customer lookup successful",
  "context": {
    "customer_id": "CUST-4471",
    "cache_hit":   true,
    "lookup_ms":   12
  }
}

Log topics follow the naming convention {pipeline-namespace}.{pipeline-name}.{component-id}.log. A developer investigating a DLQ failure takes the correlation_id from the failed envelope and queries the log topics of each component to reconstruct the full trace of that specific message's journey. In the IDE's live test mode, clicking the log icon on a component node opens an inline log tail panel filtered to the selected correlation ID.

Delivery guarantees

Each connection in a pipeline declares a delivery guarantee, which governs how the wrapper acknowledges messages to the Pulsar/Kafka broker. The guarantee is carried in the envelope's delivery_guarantee field and is set from the pipeline YAML connection stanza.

at_most_once
At most once
Message is acknowledged before processing. If the component crashes mid-processing, the message is lost. No retry.
Use for telemetry, metrics, or non-critical notifications where duplicate avoidance matters more than loss.
at_least_once
At least once
Message is acknowledged only after successful processing. On failure, the broker redelivers. Duplicates are possible under failure scenarios.
Default. Suitable for most integration scenarios where idempotent downstream processing is possible.
effectively_once
Effectively once
Combines at-least-once delivery with deduplication at the broker level (Pulsar transactions / Kafka exactly-once). No duplicates under any failure mode.
Required for financial transactions, fulfillment dispatch, or any action with real-world side effects that must not be repeated.
โš ๏ธ

A schema validation failure โ€” where the incoming payload does not conform to the declared schema_ref โ€” is always treated as a permanent error, routed directly to the DLQ with no retry regardless of the configured delivery guarantee. Schema violations indicate a contract breach, not a transient failure.

Error routing & DLQ

Open-M distinguishes between two failure destinations: the error topic and the DLQ. They serve different purposes and different consumers.

The error topic is a standard Pulsar/Kafka topic. Other pipeline components can subscribe to it. An error-handler component in the pipeline can consume from it and implement retry logic, alerting, compensation flows, or manual intervention queues. Error topics follow the naming convention {pipeline-namespace}.{pipeline-name}.{component-id}.err.

The DLQ is the final destination after all retries and error-routing attempts are exhausted. It is for operations teams, not pipeline logic. DLQ topics follow the naming convention {pipeline-namespace}.{pipeline-name}.{component-id}.dlq. Every DLQ event surfaces in the IDE's operations dashboard as a red node and triggers a configured alert.

yaml โ€” error routing and DLQ configuration
connections:
  - id: conn-enricher-to-validator
    from: { component: customer-enricher, port: output }
    to:   { component: inventory-validator, port: input }
    route:
      topic:             logistics.orders.order-fulfillment.customer-enricher.out
      subscription:      logistics.orders.order-fulfillment.inventory-validator.sub
      delivery_guarantee: at_least_once
    retry:
      max_attempts:    3
      strategy:        exponential_backoff
      initial_delay_ms: 500
      max_delay_ms:    30000
    error_routing:
        # After max_attempts exhausted, route here before DLQ
      topic:        logistics.orders.order-fulfillment.customer-enricher.err
      subscription: logistics.orders.order-fulfillment.order-error-handler.err-sub

error_handling:
  dlq_strategy:     per_component
  replay_enabled:   true
  alert_on_dlq:     true
  alert_channel:    ops-pagerduty

When a component fails to process an envelope, the wrapper appends error metadata to the envelope before routing it to the error topic. The error envelope retains the full original payload and step window, plus an error section containing the failure reason, component, timestamp, and attempt count. This makes the failed message fully inspectable from the DLQ without having to reconstruct what went wrong.

Wrapper lifecycle

Every Open-M component is a containerised microservice with two layers: the Open-M wrapper and the business logic engine. The wrapper is responsible for all envelope handling. The business logic engine never touches the broker or the envelope structure directly.

1
Subscribe
On startup, the wrapper subscribes to its designated input subscription on the Pulsar/Kafka broker using the subscription name and type declared in the pipeline YAML.
2
Deserialise
On message receipt, the wrapper deserialises the MPPM envelope. It validates envelope_version and checks the header fields for completeness.
3
Validate schema
The wrapper validates the payload against the declared schema_ref using the Schema Registry. A validation failure is a permanent error โ€” the envelope is routed to the DLQ immediately with no retry.
4
Execute UTL-X transform (if Mode 2)
If the incoming connection carries a UTL-X transform stanza, the wrapper executes it in-process before passing the payload to business logic. The transform is pure and stateless. Named inputs are assembled from the envelope and step window at this point.
5
Extract & pass to business logic
The wrapper extracts current_payload (and optionally the step window, if the component declares it needs historical context) and passes them to the business logic engine. The engine never sees the full envelope.
6
Receive output
The business logic engine returns an output payload. The wrapper validates it against the component's declared output port schema.
7
Construct new envelope
The wrapper builds the outbound MPPM envelope: prepends a new step entry (containing the previous current_payload), trims the steps array to step_depth, sets the new payload, updates component_id, instance_id, timestamp, and schema_ref.
8
Publish & acknowledge
The wrapper publishes the new envelope to the output topic and acknowledges the input message to the broker according to the configured delivery_guarantee. For at_least_once, ack happens after successful publish. For at_most_once, ack happens before processing.
9
Publish log event
The wrapper publishes a structured log event to the component's dedicated log topic, carrying the correlation_id, processing result, and any context data the business logic engine emitted.

Schema evolution

The MPPM envelope schema itself is versioned separately from payload schemas, in open-m-envelope-proto. Envelope schema changes follow semver compatibility rules and require all wrappers to be updated before the new version activates. The control plane coordinates this via a staged rollout.

Payload schema evolution on connections is governed by the Schema Registry. The control plane enforces compatibility at deploy time:

Version bumpSchema change allowedDeployment model
PATCH No schema change. Documentation or metadata only. Rolling restart.
MINOR Backward-compatible only: new optional fields with defaults, no field removal, no type changes. Old envelopes remain readable by the new consumer. Rolling restart. Old and new envelopes coexist.
MAJOR Any schema change permitted, including breaking changes. Blue/green deployment required. Old version drains before new version activates.

Replay

Because Pulsar and Kafka retain messages by offset for a configurable retention period, any envelope can be replayed by correlation_id. The Open-M agent exposes a Replay(correlation_id, from_step) command that the IDE or operations dashboard can invoke. This enables a surgical re-run of a specific failed message without replaying the entire pipeline backlog.

cli โ€” replay a failed envelope
# Replay a specific correlation ID from the DLQ, starting at the customer-enricher step
open-m replay \
  --pipeline  logistics.orders.order-fulfillment \
  --correlation-id  550e8400-e29b-41d4-a716-446655440000 \
  --from-component  customer-enricher \
  --env  production
โ„น๏ธ

In the IDE's live test mode, replay is available directly from the message tap inspector or the DLQ panel โ€” select any envelope by correlation ID and click Replay. The platform seeks to the original broker offset and reprocesses without touching other messages in the queue.