Skip to content

docs: materializations#4121

Open
mdibaiee wants to merge 4 commits intomainfrom
mahdi/materialization-docs
Open

docs: materializations#4121
mdibaiee wants to merge 4 commits intomainfrom
mahdi/materialization-docs

Conversation

@mdibaiee
Copy link
Copy Markdown
Member

Description:

(Describe the high level scope of new or changed features)

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)

}
```

Note that here, `configSchema` is the main configuration that is provided to the connector in order to set up the connection, etc., while `resourceConfigSchema` describes the configuration of each _resource_ in the destination, this is usually a table, a folder, a messaging queue topic, etc.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configSchema is the main configuration

resourceConfigSchema describes the configuration of each resource in the destination

This reads as though configSchema is provided once, and resourceConfigSchema is provided at-least-once. However the example Spec data structure shows just a single resourceConfigSchema object, and I notice that both are typed as JSON blobs in the protobuf.

Maybe the word "each" is confusing me?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, resourceConfigSchema is just a schema, and that schema determines the structure of Request.Validate.Binding.resourceConfig which is the type of repeated field Request.Validate.bindings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't recommend any change, I'm just providing newbie feedback while I still can.

FYI the following terms are not defined here, and that will cause new engineers to pause.

  • control plane: yes, we know what a control plane is, but not as clearly as we know what, for example, a message queue is
  • connector: simply an input or output plugin/module/capability, a bit more difficult when used with derivations
  • runtime: this term is a difficult overload, I have some ideas for a future doc to help

OTOH, the following terms are understood correctly by new engineers because internal use matches external use.

  • database
  • warehouse
  • file storage
  • message queue


# Request.Validate

After the connector has been configured by knowing what inputs it takes, and having the user provide those configurations, the control plane and the runtime validate the new configuration, and the configured resources (e.g. tables, topics) that the user would like to materialize in their destination.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
After the connector has been configured by knowing what inputs it takes, and having the user provide those configurations, the control plane and the runtime validate the new configuration, and the configured resources (e.g. tables, topics) that the user would like to materialize in their destination.
After a user configures a materialize connector within the constraints of `Response.Spec`, the control plane validates the proposed configuration through `Request.Validate`.
The proposal includes the configured resources (e.g. tables, folders, messages queues) that the user would like to materialize in their destination, and the materialization performs checks to ensure the proposed will not fail in subsequent `Request.(Apply|Open)` calls.

Drop "and the runtime" because IIUC only the control plane invokes Validate.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The control plane asks the runtime to invoke Validate. Invocations of connectors are always done by the runtime. There are constraints such as IP allowlisting or other networking setups, of which only the data plane is part.

}
```

Note that here, `configSchema` is the main configuration that is provided to the connector in order to set up the connection, etc., while `resourceConfigSchema` describes the configuration of each _resource_ in the destination, this is usually a table, a folder, a messaging queue topic, etc.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Note that here, `configSchema` is the main configuration that is provided to the connector in order to set up the connection, etc., while `resourceConfigSchema` describes the configuration of each _resource_ in the destination, this is usually a table, a folder, a messaging queue topic, etc.
Note that `configSchema` and `resourceConfigSchema` are schemas satisfied by the UI, whose constructed forms are passed by `Request.(Validate|Apply|Open)`.
`configSchema` configures the connection _between Flow and the connector_, while `resourceConfigSchema` configures _each destination resource_ (e.g. a table, folder, messaging queue).


The key range provided in `Request.Open.range.key_begin` and `Request.Open.range.key_end` let the connector know that it is being provided part (or all) of the key range for collections that it is materializing. The reason for this is that the runtime can distribute work among multiple instances of the connector, so for example two materialize-postgres instances may be spawned with key ranges `[00000000, 80000000)` and `[80000000, FFFFFFFF]`. The connectors do not need to _filter_ the incoming documents, they are already distributed by the runtime correctly, however it is important for the connector to know which key range it is processing, in order to avoid stomping over another instance's foot. An example is this:

A connector creates staging tables where the data is loaded into before being commited to the destination table. These staging tables are named `flow_stage_<target_table_name>`. With this naming pattern, if two instances are spawned, they will both try to create, and write to the same table, resulting in a mixing of data between them, so instead the connector should create its staging tables as `flow_stage_<keyBegin>_<target_table_name>`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true for all materialization connectors, including those that follow patterns other than "Recovery Log with Idempotent Apply"?

For example, SQLite and Postgres execute multiple INSERT and UPDATE statements directly on the target table.

Also, I don't see any staging tables named flow_stage_... but several named flow_temp_... and Snowflake uses flow_staging_....

I'd also suggest that this paragraph be reduced to just mention the creation of temporary resources, and leave the details for later on in the document.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is not for all connectors, this is an example of using key ranges to separate concerns of separate instances operating on different key ranges.


## Request.Load

Flow wants to apply [reductions](https://docs.estuary.dev/reference/reduction-strategies/) on top of existing documents, as well as gather information on whether certain keys already exist in the destination or not. In order to achieve this, it sends `Request.Load` messages which point to a binding index as well as the key of the document. The connector then must look for this document in the destination and return the full document as part of `Response.Loaded`. This process gathers information that is later provided back to the connector during `Request.Store`, explained below.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please state that the runtime sends Request.Load once per document, and the connector sends Response.Loaded once per document iff the document is found in the destination.


Once all load requests are sent, the runtime sends `Request.Flush`, to which the connector responds with `Request.Flushed` with a connector state. The use case for this message is quite niche, but explained below.

Consider a store like materialize-elastic. Elastic is a document DB suited for point lookup and point update. It doesn’t provide a meaningful bulk query API for reads or write (its "batch" operations don’t scale to the degree we’d need), so the basic strategy is to stream out Load’d documents, and stream in Store’d documents. We throw our hands up and say "this connector is at-least-once".
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This paragraph introduces Store and at-least-once without explaining them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the rest of this ## section. Are you proposing a new behavior, or describing existing behavior?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a "niche" hypothetical use case for the Flushed message having a state.


Once all load requests are sent, the runtime sends `Request.Flush`, to which the connector responds with `Request.Flushed` with a connector state. The use case for this message is quite niche, but explained below.

Consider a store like materialize-elastic. Elastic is a document DB suited for point lookup and point update. It doesn’t provide a meaningful bulk query API for reads or write (its "batch" operations don’t scale to the degree we’d need), so the basic strategy is to stream out Load’d documents, and stream in Store’d documents. We throw our hands up and say "this connector is at-least-once".
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the rest of this ## section. Are you proposing a new behavior, or describing existing behavior?


Compare to delta updates mode: collection documents -1, 3, and 2 are combined to store a delta-update document of 4. The next transaction starts anew, and 6, -7, and -1 combine to arrive at a delta-update document of -2. These delta updates are a windowed combine over documents seen in the current transaction only, and unlike before are not a full reduction of the document. If delta updates were written to pub/sub, note that a subscriber could further reduce over each delta update to recover the fully reduced document of 2.

Note that many use cases require only `lastWriteWins` reduction behavior, and for these use cases delta updates does the "right thing" by trivially re-writing each document with its most recent version. This matches the behavior of Kafka Connect, for example.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that, for example, materialize-postgres does not accommodate the cited use case. With delta updates enabled (it.Exists == false), the connector:

  1. Creates target tables without primary key.
  2. Strictly delivers insert statements, never updates.

if converted, err := b.target.ConvertAll(it.Key, it.Values, it.RawJSON); err != nil {
return nil, fmt.Errorf("converting store parameters: %w", err)
} else if it.Exists {
batch.Queue(b.storeUpdateSQL, converted...)
} else {
batch.Queue(b.storeInsertSQL, converted...)
}

{{- if not $.DeltaUpdates }},
PRIMARY KEY (
{{- range $ind, $key := $.Keys }}
{{- if $ind }}, {{end -}}
{{$key.Identifier}}
{{- end -}}
)
{{- end }}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe what is meant is that if the destination system itself has a sort of lastWriteWins way of reducing documents (such as Kafka Connect), then the documents will end up with the fully reduced document when using delta updates as well, not that our materializations will have the same outcome with delta updates.


### Push-only Endpoints & Delta Updates

Some systems, such as APIs, Webhooks, and Pub/Sub, are push-only in nature. Estuary's materializations can run in a "delta updates" mode, where loads are always skipped and Estuary does not attempt to store fully-reduced documents. Instead, during the store phase, the runtime sends delta updates which reflect the combined roll-up of collection documents processed only within this transaction.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much attention is given to the "combined roll-up" process. IIUC:

  1. The process combines multiple changes for a key, described by multiple documents referencing that key, into a single change represented by a single document.
  2. This process is implemented far upstream of materialization connectors, and the materialize connectors are not exposed to side effects of the process - documents are just documents.

It would be helpful to move these details into a final paragraph such that it is not so distracting. Without it, the delta update feature is trivial to understand, and the intermediate roll-up is a helpful FYI, not critical to understanding or implementing materializations.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give your explanation of delta updates without the roll-up being involved? In my mind they are tangled


These three messages are highly related, and depending on the various patterns for writing a materialization connector, they will be used very differently. So here we explain some of the various patterns, use cases, scale-out strategies, limitations and nuances for each.

Before diving into the patterns, one general constraint that is largely what dictates the patterns must be explained: transactionality. Flow sends documents to materializations as part of a transaction, which must either all be committed, or none at all.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must either all be committed, or none at all

I don't believe this is generally true.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacobmarble what do you mean? it is not true where / when?

This document's aim is to explain in as much detail as possible the materialization protocol (i.e. how does Flow runtime interact with a materialization connector), and explain the various constraints and considerations necessary when implementing a connector.

The actual protocol is defined as protobuf messages [here](https://github.com/estuary/flow/blob/master/go/protocols/materialize/materialize.proto). Here we will reference some of those message types (in this doc these message types are denoted with a `Request.` or `Response.` prefix) and will explain their role. This document also references the [materialize-boilerplate](./materialize-boilerplate) implementation to create a real connection between the raw protocol and the actual boilerplate library we use to develop materializations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good place to "set the table" with some definitions that could be clearer (enhance these suggestions with human or artificial intelligence):

  • collection: the storage container from which documents are transported to materialization connectors
  • document: a record belonging to a collection, received by the materializer; sometimes delivered as a complete record ("standard updates"), sometimes as a partial record ("delta updates")
  • transaction: a batch of documents to be delivered to the destination, ideally exactly-once, in certain cases at-least-once
  • recovery log: the durable Flow log, a record of every action taken by materializers, managed by the runtime

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a new format:

# Definitions

  • control plane, data plane
  • collection, connector, runtime

# The materialize relationship
What is the runtime is doing, and what does it expect, when it sends Load/Flush/Store/StartCommit/Ack? We don't need runtime implementation details; we need contractual details.

## Definitions

  • Flow recovery log
  • transaction (in the materialize sense)
  • materialization protocol: streaming gRPC service defined at ...
  • connector state

## Spec and Validate: the Control Plane check

## Spec, Apply, Open: the Data Plane handshake

## Load and Flush: the Data Plane READ handler

## Store and StartCommit: the Data Plane WRITE handler

eg "After the runtime receives StartedCommit, it knows that A, B and C, so it commits X, Y, and Z to the recovery log. Now the transaction is (describe the state), so it notifies the materialization connector by sending Acknowledge."

## Acknowledge: the Data Plane WRITE OK signal

# Transactional patterns and delivery semantics

## Definitions

  • exactly-once, at-least-once
    (Table of the four patterns, column per attribute; processing guarantee, checkpoint authority, ...)

## Remote Store is Authoritative

## ...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to defer to other docs for definitions of those parts. For connectors and collections we have https://docs.estuary.dev/concepts/, and for control plane and data plane, I assume Joaquin's docs on data plane and operations will include some docs on what a data plane is and we can refer to it.

I will add references to existing docs for as many of these words as do have an available reference the first time they are mentioned 👍🏽

mdibaiee and others added 4 commits April 10, 2026 15:54
Co-authored-by: Jacob Marble <jacobmarble@gmail.com>
Co-authored-by: Jacob Marble <jacobmarble@gmail.com>
@mdibaiee mdibaiee force-pushed the mahdi/materialization-docs branch from e46ddf5 to 3142387 Compare April 10, 2026 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants