From aff9d75269a1eafd9e20c271d9205722752d1b49 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Tue, 6 Jan 2026 11:36:05 +0530 Subject: [PATCH 1/6] set connector refs for models --- runtime/parser/parse_metrics_view.go | 3 ++- runtime/parser/parse_model.go | 6 ++++++ runtime/parser/parse_partial_data.go | 8 ++++++-- runtime/parser/parser.go | 9 ++++++++- runtime/parser/parser_test.go | 2 ++ 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/runtime/parser/parse_metrics_view.go b/runtime/parser/parse_metrics_view.go index 098091e8b0b..19b6ba88c8c 100644 --- a/runtime/parser/parse_metrics_view.go +++ b/runtime/parser/parse_metrics_view.go @@ -715,7 +715,8 @@ func (p *Parser) parseMetricsView(node *Node) error { return err } node.Refs = append(node.Refs, securityRefs...) - + // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later + node.Refs = append(node.Refs, ResourceName{Name: node.Connector, Kind: ResourceKindUnspecified}) var cacheTTLDuration time.Duration if tmp.Cache.KeyTTL != "" { cacheTTLDuration, err = time.ParseDuration(tmp.Cache.KeyTTL) diff --git a/runtime/parser/parse_model.go b/runtime/parser/parse_model.go index 52ab4c14e51..a7a8bfc7a0a 100644 --- a/runtime/parser/parse_model.go +++ b/runtime/parser/parse_model.go @@ -116,6 +116,9 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { inputProps = map[string]any{} } + // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later + node.Refs = append(node.Refs, ResourceName{Name: inputConnector, Kind: ResourceKindUnspecified}) + // Special handling for adding SQL to the input properties if sql := strings.TrimSpace(node.SQL); sql != "" { refs, err := p.inferSQLRefs(node) @@ -154,6 +157,9 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { if outputConnector == "" { outputConnector = p.defaultOLAPConnector() } + + // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later + node.Refs = append(node.Refs, ResourceName{Name: outputConnector, Kind: ResourceKindUnspecified}) outputProps := tmp.Output.Properties // Backwards compatibility: materialize can be specified outside of the output properties diff --git a/runtime/parser/parse_partial_data.go b/runtime/parser/parse_partial_data.go index 0890bdc9ece..0cb7e9284a8 100644 --- a/runtime/parser/parse_partial_data.go +++ b/runtime/parser/parse_partial_data.go @@ -34,11 +34,15 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin count++ resolver = "sql" resolverProps["sql"] = raw.SQL + var connector string if raw.Connector != "" { - resolverProps["connector"] = raw.Connector + connector = raw.Connector } else if contextualConnector != "" { - resolverProps["connector"] = contextualConnector + connector = contextualConnector } + resolverProps["connector"] = connector + // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later + refs = append(refs, ResourceName{Kind: ResourceKindUnspecified, Name: connector}) } // Handle metrics SQL resolver diff --git a/runtime/parser/parser.go b/runtime/parser/parser.go index f5f25d4fbd7..ad262b0d2e3 100644 --- a/runtime/parser/parser.go +++ b/runtime/parser/parser.go @@ -787,7 +787,14 @@ func (p *Parser) inferUnspecifiedRefs(r *Resource) { } } - // Rule 4: Skip it + // Rule 4: For any resource if there is a connector with that name, use it + n := ResourceName{Kind: ResourceKindConnector, Name: ref.Name} + if _, ok := p.Resources[n.Normalized()]; ok { + refs = append(refs, n) + continue + } + + // Rule 5: Skip it } slices.SortFunc(refs, func(a, b ResourceName) int { diff --git a/runtime/parser/parser_test.go b/runtime/parser/parser_test.go index dcfaec8ca44..048371d09c1 100644 --- a/runtime/parser/parser_test.go +++ b/runtime/parser/parser_test.go @@ -255,6 +255,7 @@ schema: default { Name: ResourceName{Kind: ResourceKindModel, Name: "s1"}, Paths: []string{"/sources/s1.yaml"}, + Refs: []ResourceName{{Kind: ResourceKindConnector, Name: "s3"}}, ModelSpec: &runtimev1.ModelSpec{ InputConnector: "s3", OutputConnector: "duckdb", @@ -269,6 +270,7 @@ schema: default { Name: ResourceName{Kind: ResourceKindModel, Name: "s2"}, Paths: []string{"/sources/s2.sql"}, + Refs: []ResourceName{{Kind: ResourceKindConnector, Name: "postgres"}}, ModelSpec: &runtimev1.ModelSpec{ InputConnector: "postgres", OutputConnector: "duckdb", From ce0379d4e73b14086e3927a7134ac53228ddac6e Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Wed, 7 Jan 2026 19:02:59 +0530 Subject: [PATCH 2/6] use post parse hooks --- runtime/parser/parse_alert.go | 6 ++- runtime/parser/parse_api.go | 5 ++- runtime/parser/parse_canvas.go | 4 +- runtime/parser/parse_component.go | 2 +- runtime/parser/parse_connector.go | 2 +- runtime/parser/parse_explore.go | 2 +- runtime/parser/parse_metrics_view.go | 29 ++++++++++--- runtime/parser/parse_migration.go | 2 +- runtime/parser/parse_model.go | 34 +++++++++------- runtime/parser/parse_node.go | 2 + runtime/parser/parse_partial_data.go | 18 ++++---- runtime/parser/parse_report.go | 2 +- runtime/parser/parse_theme.go | 2 +- runtime/parser/parser.go | 61 ++++++++++++++++++++++++---- runtime/parser/parser_test.go | 43 ++++++++++++++++++++ 15 files changed, 163 insertions(+), 51 deletions(-) diff --git a/runtime/parser/parse_alert.go b/runtime/parser/parse_alert.go index 12ba3937beb..3e1de8ba887 100644 --- a/runtime/parser/parse_alert.go +++ b/runtime/parser/parse_alert.go @@ -138,11 +138,13 @@ func (p *Parser) parseAlert(node *Node) error { if !isLegacyQuery { var refs []ResourceName - resolver, resolverProps, refs, err = p.parseDataYAML(tmp.Data, node.Connector) + var connector string + resolver, resolverProps, connector, refs, err = p.parseDataYAML(tmp.Data, node.Connector) if err != nil { return fmt.Errorf(`failed to parse "data": %w`, err) } node.Refs = append(node.Refs, refs...) + p.addConnectorRef(node, connector) // Query for: validate only one of user_id, user_email, or attributes is set n := 0 @@ -270,7 +272,7 @@ func (p *Parser) parseAlert(node *Node) error { } // Track alert - r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parse_api.go b/runtime/parser/parse_api.go index 58848f1e478..3cbdb3dfed8 100644 --- a/runtime/parser/parse_api.go +++ b/runtime/parser/parse_api.go @@ -92,11 +92,12 @@ func (p *Parser) parseAPI(node *Node) error { } // Parse the resolver and its properties from the DataYAML - resolver, resolverProps, resolverRefs, err := p.parseDataYAML(&tmp.DataYAML, node.Connector) + resolver, resolverProps, connector, resolverRefs, err := p.parseDataYAML(&tmp.DataYAML, node.Connector) if err != nil { return err } node.Refs = append(node.Refs, resolverRefs...) + p.addConnectorRef(node, connector) securityRules, err := tmp.Security.Proto() if err != nil { @@ -108,7 +109,7 @@ func (p *Parser) parseAPI(node *Node) error { } } - r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parse_canvas.go b/runtime/parser/parse_canvas.go index 517bdd345fa..fd93cd12e6d 100644 --- a/runtime/parser/parse_canvas.go +++ b/runtime/parser/parse_canvas.go @@ -256,7 +256,7 @@ func (p *Parser) parseCanvas(node *Node) error { } // Track canvas - r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } @@ -287,7 +287,7 @@ func (p *Parser) parseCanvas(node *Node) error { // Track inline components for _, def := range inlineComponentDefs { - r, err := p.insertResource(ResourceKindComponent, def.name, node.Paths, def.refs...) + r, err := p.insertResource(ResourceKindComponent, def.name, node.Paths, def.refs, nil) if err != nil { // Normally we could return the error, but we can't do that here because we've already inserted the canvas. // Since the component has been validated with insertDryRun in parseCanvasItemComponent, this error should never happen in practice. diff --git a/runtime/parser/parse_component.go b/runtime/parser/parse_component.go index 41ef60a75d3..f5ede1c9f8e 100644 --- a/runtime/parser/parse_component.go +++ b/runtime/parser/parse_component.go @@ -72,7 +72,7 @@ func (p *Parser) parseComponent(node *Node) error { node.Refs = append(node.Refs, refs...) // Track component - r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parse_connector.go b/runtime/parser/parse_connector.go index f38008866e9..127455741bd 100644 --- a/runtime/parser/parse_connector.go +++ b/runtime/parser/parse_connector.go @@ -65,7 +65,7 @@ func (p *Parser) parseConnector(node *Node) error { } // Insert the connector - r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parse_explore.go b/runtime/parser/parse_explore.go index 198308083e5..5cc334ffd78 100644 --- a/runtime/parser/parse_explore.go +++ b/runtime/parser/parse_explore.go @@ -265,7 +265,7 @@ func (p *Parser) parseExplore(node *Node) error { } // Track explore - r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parse_metrics_view.go b/runtime/parser/parse_metrics_view.go index 19b6ba88c8c..b9946fe0c91 100644 --- a/runtime/parser/parse_metrics_view.go +++ b/runtime/parser/parse_metrics_view.go @@ -715,8 +715,27 @@ func (p *Parser) parseMetricsView(node *Node) error { return err } node.Refs = append(node.Refs, securityRefs...) - // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later - node.Refs = append(node.Refs, ResourceName{Name: node.Connector, Kind: ResourceKindUnspecified}) + node.postParseHooks = append(node.postParseHooks, func(r *Resource) { + // check if the model is actually a resource in which case no need to add a ref to the connector + // model's connector can be different and a ref to model will ensure correct DAG link + if tmp.Model != "" { + _, ok := p.Resources[ResourceName{Kind: ResourceKindModel, Name: tmp.Model}.Normalized()] + if ok { + return + } + } + n := ResourceName{ResourceKindConnector, node.Connector}.Normalized() + if _, ok := p.Resources[n]; !ok { + return + } + for _, ref := range r.Refs { + if ref.Normalized() == n { + return + } + } + r.Refs = append(r.Refs, n) + }) + var cacheTTLDuration time.Duration if tmp.Cache.KeyTTL != "" { cacheTTLDuration, err = time.ParseDuration(tmp.Cache.KeyTTL) @@ -732,7 +751,7 @@ func (p *Parser) parseMetricsView(node *Node) error { } // insert metrics view resource immediately after parsing the inline explore as it inserts the explore resource so we should not return an error now - r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { // If we fail to insert the metrics view, we must delete the inline explore if it was created. if exploreRes != nil { @@ -818,7 +837,7 @@ func (p *Parser) parseMetricsView(node *Node) error { if tmp.DefaultTheme != "" { refs = append(refs, ResourceName{Kind: ResourceKindTheme, Name: tmp.DefaultTheme}) } - e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs...) + e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs, node.postParseHooks) if err != nil { // We mustn't error because we have already emitted one resource. // Since this probably means an explore has been defined separately, we can just ignore this error. @@ -1003,7 +1022,7 @@ func (p *Parser) parseAndInsertInlineExplore(tmp *MetricsViewYAML, mvName string name = tmp.Explore.Name } // Track explore - r, err := p.insertResource(ResourceKindExplore, name, mvPaths, refs...) + r, err := p.insertResource(ResourceKindExplore, name, mvPaths, refs, nil) if err != nil { return false, nil, err } diff --git a/runtime/parser/parse_migration.go b/runtime/parser/parse_migration.go index 64846fca3e1..6f4389053e9 100644 --- a/runtime/parser/parse_migration.go +++ b/runtime/parser/parse_migration.go @@ -19,7 +19,7 @@ func (p *Parser) parseMigration(node *Node) error { } // Add resource - r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parse_model.go b/runtime/parser/parse_model.go index a7a8bfc7a0a..5b0e64d1725 100644 --- a/runtime/parser/parse_model.go +++ b/runtime/parser/parse_model.go @@ -116,8 +116,8 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { inputProps = map[string]any{} } - // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later - node.Refs = append(node.Refs, ResourceName{Name: inputConnector, Kind: ResourceKindUnspecified}) + // all connectors are not explicit and may not exist as resource + p.addConnectorRef(node, inputConnector) // Special handling for adding SQL to the input properties if sql := strings.TrimSpace(node.SQL); sql != "" { @@ -158,8 +158,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { outputConnector = p.defaultOLAPConnector() } - // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later - node.Refs = append(node.Refs, ResourceName{Name: outputConnector, Kind: ResourceKindUnspecified}) + p.addConnectorRef(node, outputConnector) outputProps := tmp.Output.Properties // Backwards compatibility: materialize can be specified outside of the output properties @@ -184,11 +183,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { var incrementalStateResolverProps *structpb.Struct if tmp.State != nil { var refs []ResourceName - incrementalStateResolver, incrementalStateResolverProps, refs, err = p.parseDataYAML(tmp.State, outputConnector) + var connector string + incrementalStateResolver, incrementalStateResolverProps, connector, refs, err = p.parseDataYAML(tmp.State, outputConnector) if err != nil { return fmt.Errorf(`failed to parse "state": %w`, err) } node.Refs = append(node.Refs, refs...) + p.addConnectorRef(node, connector) } // Parse partitions resolver @@ -202,11 +203,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } if tmp.Partitions != nil { var refs []ResourceName - partitionsResolver, partitionsResolverProps, refs, err = p.parseDataYAML(tmp.Partitions, inputConnector) + var connector string + partitionsResolver, partitionsResolverProps, connector, refs, err = p.parseDataYAML(tmp.Partitions, inputConnector) if err != nil { return fmt.Errorf(`failed to parse "partitions": %w`, err) } node.Refs = append(node.Refs, refs...) + p.addConnectorRef(node, connector) // As a small convenience, automatically set the watermark field for resolvers where we know a good default if tmp.PartitionsWatermark == "" { @@ -220,12 +223,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { var modelTests []*runtimev1.ModelTest for i := range tmp.Tests { t := tmp.Tests[i] - modelTest, refs, err := p.parseModelTest(t.Name, &t.DataYAML, outputConnector, node.Name, t.Assert) + modelTest, connector, refs, err := p.parseModelTest(t.Name, &t.DataYAML, outputConnector, node.Name, t.Assert) if err != nil { return fmt.Errorf(`failed to parse test %q: %w`, t.Name, err) } modelTests = append(modelTests, modelTest) node.Refs = append(node.Refs, refs...) + p.addConnectorRef(node, connector) } var retryDelay *uint32 @@ -239,7 +243,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } // Insert the model - r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } @@ -287,10 +291,10 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } // parseModelTests parses the model tests from the YAML file -func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelName, assert string) (*runtimev1.ModelTest, []ResourceName, error) { +func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelName, assert string) (*runtimev1.ModelTest, string, []ResourceName, error) { // Validate required name field if name == "" { - return nil, nil, fmt.Errorf(`test must have a "name" defined`) + return nil, "", nil, fmt.Errorf(`test must have a "name" defined`) } hasSQL := data.SQL != "" @@ -299,24 +303,24 @@ func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelNam // Validate that exactly one of "sql" or "assert" is provided switch { case hasSQL && hasAssertion: - return nil, nil, fmt.Errorf(`test %q must not have both "sql" and "assert" defined`, name) + return nil, "", nil, fmt.Errorf(`test %q must not have both "sql" and "assert" defined`, name) case !hasSQL && !hasAssertion: - return nil, nil, fmt.Errorf(`test %q must have either "sql" or "assert" defined`, name) + return nil, "", nil, fmt.Errorf(`test %q must have either "sql" or "assert" defined`, name) case hasAssertion: // Wrap assertion condition in a SQL query following SQLMesh audit pattern // Query for rows that violate the assertion (bad data) data.SQL = fmt.Sprintf("SELECT * FROM %s WHERE NOT (%s)", modelName, assert) } - resolver, props, refs, err := p.parseDataYAML(data, connector) + resolver, props, connector, refs, err := p.parseDataYAML(data, connector) if err != nil { - return nil, nil, err + return nil, "", nil, err } return &runtimev1.ModelTest{ Name: name, Resolver: resolver, ResolverProperties: props, - }, refs, nil + }, connector, refs, nil } // inferSQLRefs attempts to infer table references from the node's SQL. diff --git a/runtime/parser/parse_node.go b/runtime/parser/parse_node.go index 4e950135a35..bfd1de79d27 100644 --- a/runtime/parser/parse_node.go +++ b/runtime/parser/parse_node.go @@ -29,6 +29,8 @@ type Node struct { SQLPath string SQLAnnotations map[string]any SQLUsesTemplating bool + + postParseHooks []postParseHook } // parseNode multiplexes to the appropriate parse function based on the node kind. diff --git a/runtime/parser/parse_partial_data.go b/runtime/parser/parse_partial_data.go index 0cb7e9284a8..05434655f72 100644 --- a/runtime/parser/parse_partial_data.go +++ b/runtime/parser/parse_partial_data.go @@ -22,10 +22,10 @@ type DataYAML struct { // parseDataYAML parses a data resolver and its properties from a DataYAML. // The contextualConnector argument is optional; if provided and the resolver supports a connector, it becomes the default connector for the resolver. // It returns the resolver name, its properties, and refs found in the resolver props. -func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (string, *structpb.Struct, []ResourceName, error) { +func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (string, *structpb.Struct, string, []ResourceName, error) { // Parse the resolver and its properties var count int - var resolver string + var resolver, connector string var refs []ResourceName resolverProps := make(map[string]any) @@ -34,15 +34,13 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin count++ resolver = "sql" resolverProps["sql"] = raw.SQL - var connector string if raw.Connector != "" { connector = raw.Connector } else if contextualConnector != "" { connector = contextualConnector } resolverProps["connector"] = connector - // kind is unspecified because all connectors are not explicit and may not exist as resource, will be resolved later - refs = append(refs, ResourceName{Kind: ResourceKindUnspecified, Name: connector}) + } // Handle metrics SQL resolver @@ -73,7 +71,7 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin props = make(map[string]any) err := raw.Glob.Decode(props) if err != nil { - return "", nil, nil, fmt.Errorf("failed to parse glob properties: %w", err) + return "", nil, "", nil, fmt.Errorf("failed to parse glob properties: %w", err) } } @@ -91,17 +89,17 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin // Validate there was exactly one resolver if count == 0 { - return "", nil, nil, fmt.Errorf(`the API definition does not specify a resolver (for example, "sql:", "metrics_sql:", ...)`) + return "", nil, "", nil, fmt.Errorf(`the API definition does not specify a resolver (for example, "sql:", "metrics_sql:", ...)`) } if count > 1 { - return "", nil, nil, fmt.Errorf(`the API definition specifies more than one resolver`) + return "", nil, "", nil, fmt.Errorf(`the API definition specifies more than one resolver`) } // Convert resolver properties to structpb.Struct resolverPropsPB, err := structpb.NewStruct(resolverProps) if err != nil { - return "", nil, nil, fmt.Errorf("encountered invalid property type: %w", err) + return "", nil, "", nil, fmt.Errorf("encountered invalid property type: %w", err) } - return resolver, resolverPropsPB, refs, nil + return resolver, resolverPropsPB, connector, refs, nil } diff --git a/runtime/parser/parse_report.go b/runtime/parser/parse_report.go index a6debea5545..95fdbda7b57 100644 --- a/runtime/parser/parse_report.go +++ b/runtime/parser/parse_report.go @@ -179,7 +179,7 @@ func (p *Parser) parseReport(node *Node) error { } // Track report - r, err := p.insertResource(ResourceKindReport, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindReport, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parse_theme.go b/runtime/parser/parse_theme.go index 574e5ea7f14..4473ed3104b 100644 --- a/runtime/parser/parse_theme.go +++ b/runtime/parser/parse_theme.go @@ -116,7 +116,7 @@ func (p *Parser) parseTheme(node *Node) error { return err } - r, err := p.insertResource(ResourceKindTheme, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindTheme, node.Name, node.Paths, node.Refs, node.postParseHooks) if err != nil { return err } diff --git a/runtime/parser/parser.go b/runtime/parser/parser.go index ad262b0d2e3..76cb36e7c8d 100644 --- a/runtime/parser/parser.go +++ b/runtime/parser/parser.go @@ -191,6 +191,7 @@ type Parser struct { insertedResources []*Resource updatedResources []*Resource deletedResources []*Resource + postParseHooks map[ResourceName][]postParseHook } // ParseRillYAML parses only the project's rill.yaml (or rill.yml) file. @@ -332,6 +333,7 @@ func (p *Parser) reload(ctx context.Context) error { p.resourceNamesForDataPaths = make(map[string][]ResourceName) p.resourcesForPath = make(map[string][]*Resource) p.resourcesForUnspecifiedRef = make(map[string][]*Resource) + p.postParseHooks = make(map[ResourceName][]postParseHook) p.insertedResources = nil p.updatedResources = nil p.deletedResources = nil @@ -359,6 +361,16 @@ func (p *Parser) reload(ctx context.Context) error { p.inferUnspecifiedRefs(r) } + // run all post-parse hooks + for rn, hooks := range p.postParseHooks { + for _, hook := range hooks { + r := p.Resources[rn] + if r != nil { + hook(r) + } + } + } + return nil } @@ -533,6 +545,16 @@ func (p *Parser) reparseExceptRillYAML(ctx context.Context, paths []string) (*Di } } + // run all post-parse hooks + for rn, hooks := range p.postParseHooks { + for _, hook := range hooks { + r := p.Resources[rn] + if r != nil { + hook(r) + } + } + } + // Phase 3: Build the diff using p.insertedResources, p.updatedResources and p.deletedResources diff := &Diff{ ModifiedDotEnv: modifiedDotEnv, @@ -787,14 +809,7 @@ func (p *Parser) inferUnspecifiedRefs(r *Resource) { } } - // Rule 4: For any resource if there is a connector with that name, use it - n := ResourceName{Kind: ResourceKindConnector, Name: ref.Name} - if _, ok := p.Resources[n.Normalized()]; ok { - refs = append(refs, n) - continue - } - - // Rule 5: Skip it + // Rule 4: Skip it } slices.SortFunc(refs, func(a, b ResourceName) int { @@ -822,7 +837,7 @@ func (p *Parser) insertDryRun(kind ResourceKind, name string) error { // insertResource inserts a resource in the parser's internal state. // After calling insertResource, the caller can directly modify the returned resource's spec. -func (p *Parser) insertResource(kind ResourceKind, name string, paths []string, refs ...ResourceName) (*Resource, error) { +func (p *Parser) insertResource(kind ResourceKind, name string, paths []string, refs []ResourceName, postParseHooks []postParseHook) (*Resource, error) { // Create the resource if not already present (ensures the spec for its kind is never nil) rn := ResourceName{Kind: kind, Name: name} _, ok := p.Resources[rn.Normalized()] @@ -897,6 +912,11 @@ func (p *Parser) insertResource(kind ResourceKind, name string, paths []string, } } + // Track post-parse hooks + if len(postParseHooks) > 0 { + p.postParseHooks[r.Name.Normalized()] = postParseHooks + } + return r, nil } @@ -966,6 +986,9 @@ func (p *Parser) deleteResource(r *Resource) { } } + // Remove from postParseHooks + delete(p.postParseHooks, r.Name.Normalized()) + // Track in deleted resources (unless it was in insertedResources, in which case it's not a real deletion) if !foundInInserted { p.deletedResources = append(p.deletedResources, r) @@ -1054,6 +1077,24 @@ func (p *Parser) isDev() bool { return strings.EqualFold(p.Environment, "dev") } +// addConnectorRef adds a post-parse hook to conditionally add a ref to the connector. +// The hook runs after all resources are parsed and only adds a reference if the connector exists, +// since connectors may not be explicitly defined as resources. +func (p *Parser) addConnectorRef(node *Node, connectorName string) { + node.postParseHooks = append(node.postParseHooks, func(r *Resource) { + n := ResourceName{ResourceKindConnector, connectorName}.Normalized() + if _, ok := p.Resources[n]; !ok { + return + } + for _, ref := range r.Refs { + if ref.Normalized() == n { + return + } + } + r.Refs = append(r.Refs, n) + }) +} + // pathIsSQL returns true if the path is a SQL file func pathIsSQL(path string) bool { return strings.HasSuffix(path, ".sql") @@ -1190,3 +1231,5 @@ func newDuckDBError(err error) error { }, } } + +type postParseHook func(r *Resource) diff --git a/runtime/parser/parser_test.go b/runtime/parser/parser_test.go index 048371d09c1..8b509a64062 100644 --- a/runtime/parser/parser_test.go +++ b/runtime/parser/parser_test.go @@ -850,6 +850,49 @@ func TestRefInferrence(t *testing.T) { }, diff) } +func TestConnectorRef(t *testing.T) { + ctx := context.Background() + files := map[string]string{ + // rill.yaml + `rill.yaml`: ``, + // connector duckdb + `connectors/duckdb.yaml`: ` +driver: duckdb +`, + // model m1 + `models/m1.sql`: ` +SELECT 1 +`, + } + resources := []*Resource{ + // m1 + { + Name: ResourceName{Kind: ResourceKindModel, Name: "m1"}, + Paths: []string{"/models/m1.sql"}, + Refs: []ResourceName{{Kind: ResourceKindConnector, Name: "duckdb"}}, + ModelSpec: &runtimev1.ModelSpec{ + RefreshSchedule: &runtimev1.Schedule{RefUpdate: true}, + InputConnector: "duckdb", + InputProperties: must(structpb.NewStruct(map[string]any{"sql": strings.TrimSpace(files["models/m1.sql"])})), + OutputConnector: "duckdb", + ChangeMode: runtimev1.ModelChangeMode_MODEL_CHANGE_MODE_RESET, + }, + }, + // duckdb connector + { + Name: ResourceName{Kind: ResourceKindConnector, Name: "duckdb"}, + Paths: []string{"/connectors/duckdb.yaml"}, + ConnectorSpec: &runtimev1.ConnectorSpec{ + Driver: "duckdb", + }, + }, + } + repo := makeRepo(t, files) + p, err := Parse(ctx, repo, "", "", "duckdb") + require.NoError(t, err) + requireResourcesAndErrors(t, p, resources, nil) +} + func BenchmarkReparse(b *testing.B) { ctx := context.Background() files := map[string]string{ From 2f3eb90cc3c9f6ec5db84bc7458a1226cace9dce Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Wed, 7 Jan 2026 19:30:28 +0530 Subject: [PATCH 3/6] lint fix --- runtime/parser/parse_partial_data.go | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/parser/parse_partial_data.go b/runtime/parser/parse_partial_data.go index 05434655f72..9e276db489c 100644 --- a/runtime/parser/parse_partial_data.go +++ b/runtime/parser/parse_partial_data.go @@ -40,7 +40,6 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin connector = contextualConnector } resolverProps["connector"] = connector - } // Handle metrics SQL resolver From cc65e71af07bd0048d2cf815546c3336e4726e7d Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Thu, 8 Jan 2026 12:02:49 +0530 Subject: [PATCH 4/6] extract to common function --- runtime/parser/parse_metrics_view.go | 1 + runtime/parser/parser.go | 33 ++++++++++++---------------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/runtime/parser/parse_metrics_view.go b/runtime/parser/parse_metrics_view.go index fb0e64e6693..5ade3b76434 100644 --- a/runtime/parser/parse_metrics_view.go +++ b/runtime/parser/parse_metrics_view.go @@ -733,6 +733,7 @@ func (p *Parser) parseMetricsView(node *Node) error { if tmp.Model != "" { _, ok := p.Resources[ResourceName{Kind: ResourceKindModel, Name: tmp.Model}.Normalized()] if ok { + r.Refs = nil return } } diff --git a/runtime/parser/parser.go b/runtime/parser/parser.go index 76cb36e7c8d..660d3faeca5 100644 --- a/runtime/parser/parser.go +++ b/runtime/parser/parser.go @@ -361,16 +361,7 @@ func (p *Parser) reload(ctx context.Context) error { p.inferUnspecifiedRefs(r) } - // run all post-parse hooks - for rn, hooks := range p.postParseHooks { - for _, hook := range hooks { - r := p.Resources[rn] - if r != nil { - hook(r) - } - } - } - + p.runPostParseHooks() return nil } @@ -545,15 +536,8 @@ func (p *Parser) reparseExceptRillYAML(ctx context.Context, paths []string) (*Di } } - // run all post-parse hooks - for rn, hooks := range p.postParseHooks { - for _, hook := range hooks { - r := p.Resources[rn] - if r != nil { - hook(r) - } - } - } + // Run post-parse hooks + p.runPostParseHooks() // Phase 3: Build the diff using p.insertedResources, p.updatedResources and p.deletedResources diff := &Diff{ @@ -1095,6 +1079,17 @@ func (p *Parser) addConnectorRef(node *Node, connectorName string) { }) } +func (p *Parser) runPostParseHooks() { + for rn, hooks := range p.postParseHooks { + for _, hook := range hooks { + r := p.Resources[rn] + if r != nil { + hook(r) + } + } + } +} + // pathIsSQL returns true if the path is a SQL file func pathIsSQL(path string) bool { return strings.HasSuffix(path, ".sql") From 233f3b4669ba4f5d49f7e9640043c42c56f3cc95 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Thu, 8 Jan 2026 12:54:27 +0530 Subject: [PATCH 5/6] connector ref logic change --- runtime/parser/parse_metrics_view.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/runtime/parser/parse_metrics_view.go b/runtime/parser/parse_metrics_view.go index 5ade3b76434..9a735918471 100644 --- a/runtime/parser/parse_metrics_view.go +++ b/runtime/parser/parse_metrics_view.go @@ -733,7 +733,13 @@ func (p *Parser) parseMetricsView(node *Node) error { if tmp.Model != "" { _, ok := p.Resources[ResourceName{Kind: ResourceKindModel, Name: tmp.Model}.Normalized()] if ok { - r.Refs = nil + // clear older refs to connector, if any + for i, ref := range r.Refs { + if ref.Kind == ResourceKindConnector && ref.Name == node.Connector { + r.Refs = append(r.Refs[:i], r.Refs[i+1:]...) + break + } + } return } } From 49f6ac970aa299acd9cf5b2ec549407cffafecdc Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Thu, 8 Jan 2026 16:34:42 +0530 Subject: [PATCH 6/6] handle resource updates --- runtime/parser/parse_alert.go | 5 +- runtime/parser/parse_api.go | 5 +- runtime/parser/parse_canvas.go | 2 +- runtime/parser/parse_component.go | 3 +- runtime/parser/parse_connector.go | 3 +- runtime/parser/parse_explore.go | 2 +- runtime/parser/parse_metrics_view.go | 23 +++--- runtime/parser/parse_migration.go | 4 +- runtime/parser/parse_model.go | 15 ++-- runtime/parser/parse_node.go | 11 ++- runtime/parser/parse_report.go | 3 +- runtime/parser/parse_theme.go | 3 +- runtime/parser/parser.go | 102 ++++++++++++++++++++------- 13 files changed, 121 insertions(+), 60 deletions(-) diff --git a/runtime/parser/parse_alert.go b/runtime/parser/parse_alert.go index 3e1de8ba887..e82d1be2da7 100644 --- a/runtime/parser/parse_alert.go +++ b/runtime/parser/parse_alert.go @@ -12,6 +12,7 @@ import ( "github.com/rilldata/rill/runtime/drivers/slack" "github.com/rilldata/rill/runtime/pkg/duration" "github.com/rilldata/rill/runtime/pkg/pbutil" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" ) @@ -144,7 +145,7 @@ func (p *Parser) parseAlert(node *Node) error { return fmt.Errorf(`failed to parse "data": %w`, err) } node.Refs = append(node.Refs, refs...) - p.addConnectorRef(node, connector) + node.addPostParseHook(connector, p.addConnectorRef(connector)) // Query for: validate only one of user_id, user_email, or attributes is set n := 0 @@ -272,7 +273,7 @@ func (p *Parser) parseAlert(node *Node) error { } // Track alert - r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_api.go b/runtime/parser/parse_api.go index 3cbdb3dfed8..7ae62e38b61 100644 --- a/runtime/parser/parse_api.go +++ b/runtime/parser/parse_api.go @@ -7,6 +7,7 @@ import ( "unicode" "github.com/rilldata/rill/runtime/pkg/openapiutil" + "golang.org/x/exp/maps" ) // APIYAML is the raw structure of a API resource defined in YAML (does not include common fields) @@ -97,7 +98,7 @@ func (p *Parser) parseAPI(node *Node) error { return err } node.Refs = append(node.Refs, resolverRefs...) - p.addConnectorRef(node, connector) + node.addPostParseHook(connector, p.addConnectorRef(connector)) securityRules, err := tmp.Security.Proto() if err != nil { @@ -109,7 +110,7 @@ func (p *Parser) parseAPI(node *Node) error { } } - r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_canvas.go b/runtime/parser/parse_canvas.go index fd93cd12e6d..834200fd8d9 100644 --- a/runtime/parser/parse_canvas.go +++ b/runtime/parser/parse_canvas.go @@ -256,7 +256,7 @@ func (p *Parser) parseCanvas(node *Node) error { } // Track canvas - r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_component.go b/runtime/parser/parse_component.go index f5ede1c9f8e..2a7dfc6ad78 100644 --- a/runtime/parser/parse_component.go +++ b/runtime/parser/parse_component.go @@ -6,6 +6,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/santhosh-tekuri/jsonschema/v5" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" _ "embed" @@ -72,7 +73,7 @@ func (p *Parser) parseComponent(node *Node) error { node.Refs = append(node.Refs, refs...) // Track component - r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_connector.go b/runtime/parser/parse_connector.go index 127455741bd..ae10394b384 100644 --- a/runtime/parser/parse_connector.go +++ b/runtime/parser/parse_connector.go @@ -3,6 +3,7 @@ package parser import ( "fmt" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" "gopkg.in/yaml.v3" ) @@ -65,7 +66,7 @@ func (p *Parser) parseConnector(node *Node) error { } // Insert the connector - r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_explore.go b/runtime/parser/parse_explore.go index 5cc334ffd78..faae11fb710 100644 --- a/runtime/parser/parse_explore.go +++ b/runtime/parser/parse_explore.go @@ -265,7 +265,7 @@ func (p *Parser) parseExplore(node *Node) error { } // Track explore - r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_metrics_view.go b/runtime/parser/parse_metrics_view.go index 9a735918471..5ef7569f6fe 100644 --- a/runtime/parser/parse_metrics_view.go +++ b/runtime/parser/parse_metrics_view.go @@ -727,7 +727,7 @@ func (p *Parser) parseMetricsView(node *Node) error { return err } node.Refs = append(node.Refs, securityRefs...) - node.postParseHooks = append(node.postParseHooks, func(r *Resource) { + node.addPostParseHook(node.Connector, func(r *Resource) bool { // check if the model is actually a resource in which case no need to add a ref to the connector // model's connector can be different and a ref to model will ensure correct DAG link if tmp.Model != "" { @@ -736,23 +736,16 @@ func (p *Parser) parseMetricsView(node *Node) error { // clear older refs to connector, if any for i, ref := range r.Refs { if ref.Kind == ResourceKindConnector && ref.Name == node.Connector { + // okay to modify r.Refs here as we return immediately after r.Refs = append(r.Refs[:i], r.Refs[i+1:]...) - break + return true } } - return - } - } - n := ResourceName{ResourceKindConnector, node.Connector}.Normalized() - if _, ok := p.Resources[n]; !ok { - return - } - for _, ref := range r.Refs { - if ref.Normalized() == n { - return + return false } } - r.Refs = append(r.Refs, n) + f := p.addConnectorRef(node.Connector) + return f(r) }) var cacheTTLDuration time.Duration @@ -770,7 +763,7 @@ func (p *Parser) parseMetricsView(node *Node) error { } // insert metrics view resource immediately after parsing the inline explore as it inserts the explore resource so we should not return an error now - r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { // If we fail to insert the metrics view, we must delete the inline explore if it was created. if exploreRes != nil { @@ -856,7 +849,7 @@ func (p *Parser) parseMetricsView(node *Node) error { if tmp.DefaultTheme != "" { refs = append(refs, ResourceName{Kind: ResourceKindTheme, Name: tmp.DefaultTheme}) } - e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs, node.postParseHooks) + e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs, maps.Values(node.postParseHooks)) if err != nil { // We mustn't error because we have already emitted one resource. // Since this probably means an explore has been defined separately, we can just ignore this error. diff --git a/runtime/parser/parse_migration.go b/runtime/parser/parse_migration.go index 6f4389053e9..527faaf515b 100644 --- a/runtime/parser/parse_migration.go +++ b/runtime/parser/parse_migration.go @@ -2,6 +2,8 @@ package parser import ( "strings" + + "golang.org/x/exp/maps" ) // MigrationYAML is the raw structure of a Migration resource defined in YAML (does not include common fields) @@ -19,7 +21,7 @@ func (p *Parser) parseMigration(node *Node) error { } // Add resource - r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_model.go b/runtime/parser/parse_model.go index 5b0e64d1725..e0f6626bc7e 100644 --- a/runtime/parser/parse_model.go +++ b/runtime/parser/parse_model.go @@ -11,6 +11,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/pkg/duckdbsql" "github.com/rilldata/rill/runtime/pkg/fileutil" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" "gopkg.in/yaml.v3" ) @@ -116,8 +117,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { inputProps = map[string]any{} } - // all connectors are not explicit and may not exist as resource - p.addConnectorRef(node, inputConnector) + node.addPostParseHook(inputConnector, p.addConnectorRef(inputConnector)) // Special handling for adding SQL to the input properties if sql := strings.TrimSpace(node.SQL); sql != "" { @@ -157,8 +157,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { if outputConnector == "" { outputConnector = p.defaultOLAPConnector() } - - p.addConnectorRef(node, outputConnector) + node.addPostParseHook(outputConnector, p.addConnectorRef(outputConnector)) outputProps := tmp.Output.Properties // Backwards compatibility: materialize can be specified outside of the output properties @@ -189,7 +188,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { return fmt.Errorf(`failed to parse "state": %w`, err) } node.Refs = append(node.Refs, refs...) - p.addConnectorRef(node, connector) + node.addPostParseHook(connector, p.addConnectorRef(connector)) } // Parse partitions resolver @@ -209,7 +208,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { return fmt.Errorf(`failed to parse "partitions": %w`, err) } node.Refs = append(node.Refs, refs...) - p.addConnectorRef(node, connector) + node.addPostParseHook(connector, p.addConnectorRef(connector)) // As a small convenience, automatically set the watermark field for resolvers where we know a good default if tmp.PartitionsWatermark == "" { @@ -229,7 +228,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } modelTests = append(modelTests, modelTest) node.Refs = append(node.Refs, refs...) - p.addConnectorRef(node, connector) + node.addPostParseHook(connector, p.addConnectorRef(connector)) } var retryDelay *uint32 @@ -243,7 +242,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } // Insert the model - r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_node.go b/runtime/parser/parse_node.go index bfd1de79d27..ad29155c49c 100644 --- a/runtime/parser/parse_node.go +++ b/runtime/parser/parse_node.go @@ -30,7 +30,16 @@ type Node struct { SQLAnnotations map[string]any SQLUsesTemplating bool - postParseHooks []postParseHook + postParseHooks map[string]postParseHook +} + +func (n *Node) addPostParseHook(key string, hook postParseHook) { + if n.postParseHooks == nil { + n.postParseHooks = make(map[string]postParseHook) + } + if _, ok := n.postParseHooks[key]; !ok { + n.postParseHooks[key] = hook + } } // parseNode multiplexes to the appropriate parse function based on the node kind. diff --git a/runtime/parser/parse_report.go b/runtime/parser/parse_report.go index 95fdbda7b57..ccba36f49a4 100644 --- a/runtime/parser/parse_report.go +++ b/runtime/parser/parse_report.go @@ -12,6 +12,7 @@ import ( "github.com/rilldata/rill/runtime/drivers/slack" "github.com/rilldata/rill/runtime/pkg/duration" "github.com/rilldata/rill/runtime/pkg/pbutil" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" ) @@ -179,7 +180,7 @@ func (p *Parser) parseReport(node *Node) error { } // Track report - r, err := p.insertResource(ResourceKindReport, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindReport, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_theme.go b/runtime/parser/parse_theme.go index 4473ed3104b..d40faf8f5e8 100644 --- a/runtime/parser/parse_theme.go +++ b/runtime/parser/parse_theme.go @@ -5,6 +5,7 @@ import ( "github.com/mazznoer/csscolorparser" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "golang.org/x/exp/maps" ) // ThemeYAML is the raw structure of a Theme for the UI in YAML (does not include common fields) @@ -116,7 +117,7 @@ func (p *Parser) parseTheme(node *Node) error { return err } - r, err := p.insertResource(ResourceKindTheme, node.Name, node.Paths, node.Refs, node.postParseHooks) + r, err := p.insertResource(ResourceKindTheme, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parser.go b/runtime/parser/parser.go index 660d3faeca5..1cc8e003afd 100644 --- a/runtime/parser/parser.go +++ b/runtime/parser/parser.go @@ -168,6 +168,10 @@ type Diff struct { Deleted []ResourceName } +// postParseHook is run after all resources have been parsed. +// It returns true if the resource was modified. +type postParseHook func(r *Resource) bool + // Parser parses a Rill project directory into a set of resources. // After the initial parse, the parser can be used to incrementally reparse a subset of files. // Parser is not concurrency safe. @@ -185,13 +189,14 @@ type Parser struct { Errors []*runtimev1.ParseError // Internal state - resourcesForPath map[string][]*Resource // Reverse index of Resource.Paths - resourcesForUnspecifiedRef map[string][]*Resource // Reverse index of Resource.rawRefs where kind=ResourceKindUnspecified - resourceNamesForDataPaths map[string][]ResourceName // Index of local data files to resources that depend on them - insertedResources []*Resource - updatedResources []*Resource - deletedResources []*Resource - postParseHooks map[ResourceName][]postParseHook + resourcesForPath map[string][]*Resource // Reverse index of Resource.Paths + resourcesForUnspecifiedRef map[string][]*Resource // Reverse index of Resource.rawRefs where kind=ResourceKindUnspecified + resourceNamesForDataPaths map[string][]ResourceName // Index of local data files to resources that depend on them + insertedResources []*Resource + updatedResources []*Resource + deletedResources []*Resource + postParseHooks map[ResourceName][]postParseHook + resourceNamesForInferredRef map[string]map[ResourceName]bool } // ParseRillYAML parses only the project's rill.yaml (or rill.yml) file. @@ -334,6 +339,7 @@ func (p *Parser) reload(ctx context.Context) error { p.resourcesForPath = make(map[string][]*Resource) p.resourcesForUnspecifiedRef = make(map[string][]*Resource) p.postParseHooks = make(map[ResourceName][]postParseHook) + p.resourceNamesForInferredRef = make(map[string]map[ResourceName]bool) p.insertedResources = nil p.updatedResources = nil p.deletedResources = nil @@ -356,12 +362,15 @@ func (p *Parser) reload(ctx context.Context) error { return err } + seenResources := make(map[ResourceName]bool) + // Infer unspecified refs for all inserted resources for _, r := range p.insertedResources { + seenResources[r.Name.Normalized()] = true p.inferUnspecifiedRefs(r) } - p.runPostParseHooks() + p.runPostParseHooks(seenResources) return nil } @@ -537,7 +546,7 @@ func (p *Parser) reparseExceptRillYAML(ctx context.Context, paths []string) (*Di } // Run post-parse hooks - p.runPostParseHooks() + p.runPostParseHooks(inferRefsSeen) // Phase 3: Build the diff using p.insertedResources, p.updatedResources and p.deletedResources diff := &Diff{ @@ -973,6 +982,21 @@ func (p *Parser) deleteResource(r *Resource) { // Remove from postParseHooks delete(p.postParseHooks, r.Name.Normalized()) + // Remove from resourcesForInferredRef + // 1. If it is the resource which was inferred + delete(p.resourceNamesForInferredRef, strings.ToLower(r.Name.Name)) + // 2. If it is a resource which had an inferred ref to another resource + for k, resMap := range p.resourceNamesForInferredRef { + if _, ok := resMap[r.Name.Normalized()]; !ok { + continue + } + if len(resMap) == 1 { + delete(p.resourceNamesForInferredRef, k) + } else { + delete(resMap, r.Name.Normalized()) + } + } + // Track in deleted resources (unless it was in insertedResources, in which case it's not a real deletion) if !foundInInserted { p.deletedResources = append(p.deletedResources, r) @@ -1061,31 +1085,61 @@ func (p *Parser) isDev() bool { return strings.EqualFold(p.Environment, "dev") } -// addConnectorRef adds a post-parse hook to conditionally add a ref to the connector. -// The hook runs after all resources are parsed and only adds a reference if the connector exists, -// since connectors may not be explicitly defined as resources. -func (p *Parser) addConnectorRef(node *Node, connectorName string) { - node.postParseHooks = append(node.postParseHooks, func(r *Resource) { - n := ResourceName{ResourceKindConnector, connectorName}.Normalized() - if _, ok := p.Resources[n]; !ok { - return +// addConnectorRef returns a post-parse hook to conditionally add a ref to the connector. +// A connector ref can't be added until after all resources have been parsed, because the connector resource +// may not be explicitly defined. +func (p *Parser) addConnectorRef(connector string) postParseHook { + return func(r *Resource) bool { + // check if the connector exists + n := ResourceName{ResourceKindConnector, connector} + if _, ok := p.Resources[n.Normalized()]; !ok { + return false + } + + c := strings.ToLower(connector) + if res, ok := p.resourceNamesForInferredRef[c]; ok { + if _, ok := res[r.Name.Normalized()]; ok { + // already has a ref + return false + } } + + // add a ref to the connector if it doesn't already exist (due to an explicit ref) for _, ref := range r.Refs { - if ref.Normalized() == n { - return + if ref.Normalized() == n.Normalized() { + return false } } r.Refs = append(r.Refs, n) - }) + + // index in resourceNamesForInferredRef + resourceMap, ok := p.resourceNamesForInferredRef[c] + if !ok { + resourceMap = make(map[ResourceName]bool) + p.resourceNamesForInferredRef[c] = resourceMap + } + resourceMap[r.Name.Normalized()] = true + return true + } } -func (p *Parser) runPostParseHooks() { +func (p *Parser) runPostParseHooks(seenResources map[ResourceName]bool) { for rn, hooks := range p.postParseHooks { for _, hook := range hooks { r := p.Resources[rn] - if r != nil { - hook(r) + if r == nil { + continue + } + if !hook(r) { + continue } + // If the hook modified the resource, track it in updatedResources + // check if the resource is already in updatedResources/insertedResources + if _, ok := seenResources[r.Name.Normalized()]; ok { + continue + } + // otherwise, add to updatedResources + p.updatedResources = append(p.updatedResources, r) } } } @@ -1226,5 +1280,3 @@ func newDuckDBError(err error) error { }, } } - -type postParseHook func(r *Resource)