Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion crates/validation/src/materialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn walk_materialization<C: Connectors>(
let models::MaterializationDef {
on_incompatible_schema_change,
source: sources,
target_naming,
mut target_naming,
endpoint,
bindings: bindings_model,
mut shards,
Expand All @@ -124,6 +124,23 @@ async fn walk_materialization<C: Connectors>(
reset,
} = model;

// Model fix: Promote source.target_naming == WithSchema to top-level
// MatchSourceStructure. These represent identical behavior, and this
// progressive upgrade lets us eventually remove the legacy field.
if target_naming.is_none() {
if let Some(source) = &sources {
if source.to_normalized_def().target_naming == models::TargetNaming::WithSchema {
target_naming = Some(models::TargetNamingStrategy::MatchSourceStructure {
table_template: None,
schema_template: None,
});
model_fixes.push(
"promoted source.targetNaming 'withSchema' to top-level targetNaming 'matchSourceStructure'".to_string(),
);
}
}
}

indexed::walk_name(
scope,
"materialization",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
---
source: crates/validation/tests/transition_tests.rs
expression: "(&outcome.built_materializations[0].model,\n&outcome.built_materializations[0].model_fixes)"
---
(
Some(
MaterializationDef {
source: Some(
Configured(
SourceDef {
capture: Some(
Capture(
"the/capture",
),
),
target_naming: WithSchema,
delta_updates: false,
fields_recommended: Bool(
true,
),
},
),
),
target_naming: Some(
SingleSchema {
schema: "my_schema",
table_template: None,
},
),
on_incompatible_schema_change: Backfill,
endpoint: Connector(
ConnectorConfig {
image: "other/image",
config: RawValue(
{"a":"config"},
),
},
),
bindings: [
MaterializationBinding {
resource: RawValue(
{"_meta":{"path":["table","path"]},"table":"bar"},
),
source: Collection(
Collection(
"the/collection",
),
),
disable: false,
priority: 0,
fields: MaterializationFields {
group_by: [],
require: {
Field(
"F1",
): RawValue(
{},
),
Field(
"f_two",
): RawValue(
{},
),
},
exclude: [
Field(
"F2",
),
],
recommended: Bool(
true,
),
},
backfill: 0,
on_incompatible_schema_change: None,
},
],
shards: ShardTemplate {
disable: false,
min_txn_duration: None,
max_txn_duration: None,
hot_standbys: None,
ring_buffer_size: None,
read_channel_size: None,
log_level: None,
flags: {},
},
expect_pub_id: None,
triggers: None,
delete: false,
reset: false,
},
),
[
"removed dropped exclude projection FY of source collection the/collection",
"removed dropped exclude projection does/not/exist of source collection the/collection",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
source: crates/validation/tests/transition_tests.rs
expression: "(&outcome.built_materializations[0].model,\n&outcome.built_materializations[0].model_fixes)"
---
(
Some(
MaterializationDef {
source: Some(
Configured(
SourceDef {
capture: Some(
Capture(
"the/capture",
),
),
target_naming: WithSchema,
delta_updates: false,
fields_recommended: Bool(
true,
),
},
),
),
target_naming: Some(
MatchSourceStructure {
table_template: None,
schema_template: None,
},
),
on_incompatible_schema_change: Backfill,
endpoint: Connector(
ConnectorConfig {
image: "other/image",
config: RawValue(
{"a":"config"},
),
},
),
bindings: [
MaterializationBinding {
resource: RawValue(
{"_meta":{"path":["table","path"]},"table":"bar"},
),
source: Collection(
Collection(
"the/collection",
),
),
disable: false,
priority: 0,
fields: MaterializationFields {
group_by: [],
require: {
Field(
"F1",
): RawValue(
{},
),
Field(
"f_two",
): RawValue(
{},
),
},
exclude: [
Field(
"F2",
),
],
recommended: Bool(
true,
),
},
backfill: 0,
on_incompatible_schema_change: None,
},
],
shards: ShardTemplate {
disable: false,
min_txn_duration: None,
max_txn_duration: None,
hot_standbys: None,
ring_buffer_size: None,
read_channel_size: None,
log_level: None,
flags: {},
},
expect_pub_id: None,
triggers: None,
delete: false,
reset: false,
},
),
[
"promoted source.targetNaming 'withSchema' to top-level targetNaming 'matchSourceStructure'",
"removed dropped exclude projection FY of source collection the/collection",
"removed dropped exclude projection does/not/exist of source collection the/collection",
],
)
45 changes: 45 additions & 0 deletions crates/validation/tests/transition_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,51 @@ driver:
));
}

#[test]
fn test_target_naming_promotion() {
// When source.targetNaming is "withSchema" and no top-level targetNaming is set,
// validation should promote it to MatchSourceStructure as a model fix.
let outcome = common::run(
MODEL_YAML,
r#"
test://example/catalog.yaml:
materializations:
the/materialization:
source:
capture: the/capture
targetNaming: withSchema
"#,
);
insta::assert_debug_snapshot!((
&outcome.built_materializations[0].model,
&outcome.built_materializations[0].model_fixes
));
}

#[test]
fn test_target_naming_no_promotion_when_set() {
// When top-level targetNaming is already set, source.targetNaming should not
// trigger promotion (the top-level value takes precedence).
let outcome = common::run(
MODEL_YAML,
r#"
test://example/catalog.yaml:
materializations:
the/materialization:
source:
capture: the/capture
targetNaming: withSchema
targetNaming:
strategy: singleSchema
schema: my_schema
"#,
);
insta::assert_debug_snapshot!((
&outcome.built_materializations[0].model,
&outcome.built_materializations[0].model_fixes
));
}

#[test]
fn test_manual_redact_salt_override() {
// Test that manually specified redact_salt overrides existing salt
Expand Down
Loading