diff --git a/runtime/parser/parse_alert.go b/runtime/parser/parse_alert.go index 12ba3937beb..e82d1be2da7 100644 --- a/runtime/parser/parse_alert.go +++ b/runtime/parser/parse_alert.go @@ -12,6 +12,7 @@ import ( "github.com/rilldata/rill/runtime/drivers/slack" "github.com/rilldata/rill/runtime/pkg/duration" "github.com/rilldata/rill/runtime/pkg/pbutil" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" ) @@ -138,11 +139,13 @@ func (p *Parser) parseAlert(node *Node) error { if !isLegacyQuery { var refs []ResourceName - resolver, resolverProps, refs, err = p.parseDataYAML(tmp.Data, node.Connector) + var connector string + resolver, resolverProps, connector, refs, err = p.parseDataYAML(tmp.Data, node.Connector) if err != nil { return fmt.Errorf(`failed to parse "data": %w`, err) } node.Refs = append(node.Refs, refs...) + node.addPostParseHook(connector, p.addConnectorRef(connector)) // Query for: validate only one of user_id, user_email, or attributes is set n := 0 @@ -270,7 +273,7 @@ func (p *Parser) parseAlert(node *Node) error { } // Track alert - r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_api.go b/runtime/parser/parse_api.go index 58848f1e478..7ae62e38b61 100644 --- a/runtime/parser/parse_api.go +++ b/runtime/parser/parse_api.go @@ -7,6 +7,7 @@ import ( "unicode" "github.com/rilldata/rill/runtime/pkg/openapiutil" + "golang.org/x/exp/maps" ) // APIYAML is the raw structure of a API resource defined in YAML (does not include common fields) @@ -92,11 +93,12 @@ func (p *Parser) parseAPI(node *Node) error { } // Parse the resolver and its properties from the DataYAML - resolver, resolverProps, resolverRefs, err := p.parseDataYAML(&tmp.DataYAML, node.Connector) + resolver, resolverProps, connector, resolverRefs, err := p.parseDataYAML(&tmp.DataYAML, node.Connector) if err != nil { return err } node.Refs = append(node.Refs, resolverRefs...) + node.addPostParseHook(connector, p.addConnectorRef(connector)) securityRules, err := tmp.Security.Proto() if err != nil { @@ -108,7 +110,7 @@ func (p *Parser) parseAPI(node *Node) error { } } - r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindAPI, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_canvas.go b/runtime/parser/parse_canvas.go index 517bdd345fa..834200fd8d9 100644 --- a/runtime/parser/parse_canvas.go +++ b/runtime/parser/parse_canvas.go @@ -256,7 +256,7 @@ func (p *Parser) parseCanvas(node *Node) error { } // Track canvas - r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindCanvas, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } @@ -287,7 +287,7 @@ func (p *Parser) parseCanvas(node *Node) error { // Track inline components for _, def := range inlineComponentDefs { - r, err := p.insertResource(ResourceKindComponent, def.name, node.Paths, def.refs...) + r, err := p.insertResource(ResourceKindComponent, def.name, node.Paths, def.refs, nil) if err != nil { // Normally we could return the error, but we can't do that here because we've already inserted the canvas. // Since the component has been validated with insertDryRun in parseCanvasItemComponent, this error should never happen in practice. diff --git a/runtime/parser/parse_component.go b/runtime/parser/parse_component.go index 41ef60a75d3..2a7dfc6ad78 100644 --- a/runtime/parser/parse_component.go +++ b/runtime/parser/parse_component.go @@ -6,6 +6,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/santhosh-tekuri/jsonschema/v5" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" _ "embed" @@ -72,7 +73,7 @@ func (p *Parser) parseComponent(node *Node) error { node.Refs = append(node.Refs, refs...) // Track component - r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindComponent, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_connector.go b/runtime/parser/parse_connector.go index f38008866e9..ae10394b384 100644 --- a/runtime/parser/parse_connector.go +++ b/runtime/parser/parse_connector.go @@ -3,6 +3,7 @@ package parser import ( "fmt" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" "gopkg.in/yaml.v3" ) @@ -65,7 +66,7 @@ func (p *Parser) parseConnector(node *Node) error { } // Insert the connector - r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindConnector, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_explore.go b/runtime/parser/parse_explore.go index 198308083e5..faae11fb710 100644 --- a/runtime/parser/parse_explore.go +++ b/runtime/parser/parse_explore.go @@ -265,7 +265,7 @@ func (p *Parser) parseExplore(node *Node) error { } // Track explore - r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_metrics_view.go b/runtime/parser/parse_metrics_view.go index 062b4cb17ef..5ef7569f6fe 100644 --- a/runtime/parser/parse_metrics_view.go +++ b/runtime/parser/parse_metrics_view.go @@ -727,6 +727,26 @@ func (p *Parser) parseMetricsView(node *Node) error { return err } node.Refs = append(node.Refs, securityRefs...) + node.addPostParseHook(node.Connector, func(r *Resource) bool { + // check if the model is actually a resource in which case no need to add a ref to the connector + // model's connector can be different and a ref to model will ensure correct DAG link + if tmp.Model != "" { + _, ok := p.Resources[ResourceName{Kind: ResourceKindModel, Name: tmp.Model}.Normalized()] + if ok { + // clear older refs to connector, if any + for i, ref := range r.Refs { + if ref.Kind == ResourceKindConnector && ref.Name == node.Connector { + // okay to modify r.Refs here as we return immediately after + r.Refs = append(r.Refs[:i], r.Refs[i+1:]...) + return true + } + } + return false + } + } + f := p.addConnectorRef(node.Connector) + return f(r) + }) var cacheTTLDuration time.Duration if tmp.Cache.KeyTTL != "" { @@ -743,7 +763,7 @@ func (p *Parser) parseMetricsView(node *Node) error { } // insert metrics view resource immediately after parsing the inline explore as it inserts the explore resource so we should not return an error now - r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindMetricsView, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { // If we fail to insert the metrics view, we must delete the inline explore if it was created. if exploreRes != nil { @@ -829,7 +849,7 @@ func (p *Parser) parseMetricsView(node *Node) error { if tmp.DefaultTheme != "" { refs = append(refs, ResourceName{Kind: ResourceKindTheme, Name: tmp.DefaultTheme}) } - e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs...) + e, err := p.insertResource(ResourceKindExplore, node.Name, node.Paths, refs, maps.Values(node.postParseHooks)) if err != nil { // We mustn't error because we have already emitted one resource. // Since this probably means an explore has been defined separately, we can just ignore this error. @@ -1014,7 +1034,7 @@ func (p *Parser) parseAndInsertInlineExplore(tmp *MetricsViewYAML, mvName string name = tmp.Explore.Name } // Track explore - r, err := p.insertResource(ResourceKindExplore, name, mvPaths, refs...) + r, err := p.insertResource(ResourceKindExplore, name, mvPaths, refs, nil) if err != nil { return false, nil, err } diff --git a/runtime/parser/parse_migration.go b/runtime/parser/parse_migration.go index 64846fca3e1..527faaf515b 100644 --- a/runtime/parser/parse_migration.go +++ b/runtime/parser/parse_migration.go @@ -2,6 +2,8 @@ package parser import ( "strings" + + "golang.org/x/exp/maps" ) // MigrationYAML is the raw structure of a Migration resource defined in YAML (does not include common fields) @@ -19,7 +21,7 @@ func (p *Parser) parseMigration(node *Node) error { } // Add resource - r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindMigration, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_model.go b/runtime/parser/parse_model.go index 52ab4c14e51..e0f6626bc7e 100644 --- a/runtime/parser/parse_model.go +++ b/runtime/parser/parse_model.go @@ -11,6 +11,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime/pkg/duckdbsql" "github.com/rilldata/rill/runtime/pkg/fileutil" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" "gopkg.in/yaml.v3" ) @@ -116,6 +117,8 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { inputProps = map[string]any{} } + node.addPostParseHook(inputConnector, p.addConnectorRef(inputConnector)) + // Special handling for adding SQL to the input properties if sql := strings.TrimSpace(node.SQL); sql != "" { refs, err := p.inferSQLRefs(node) @@ -154,6 +157,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { if outputConnector == "" { outputConnector = p.defaultOLAPConnector() } + node.addPostParseHook(outputConnector, p.addConnectorRef(outputConnector)) outputProps := tmp.Output.Properties // Backwards compatibility: materialize can be specified outside of the output properties @@ -178,11 +182,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { var incrementalStateResolverProps *structpb.Struct if tmp.State != nil { var refs []ResourceName - incrementalStateResolver, incrementalStateResolverProps, refs, err = p.parseDataYAML(tmp.State, outputConnector) + var connector string + incrementalStateResolver, incrementalStateResolverProps, connector, refs, err = p.parseDataYAML(tmp.State, outputConnector) if err != nil { return fmt.Errorf(`failed to parse "state": %w`, err) } node.Refs = append(node.Refs, refs...) + node.addPostParseHook(connector, p.addConnectorRef(connector)) } // Parse partitions resolver @@ -196,11 +202,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } if tmp.Partitions != nil { var refs []ResourceName - partitionsResolver, partitionsResolverProps, refs, err = p.parseDataYAML(tmp.Partitions, inputConnector) + var connector string + partitionsResolver, partitionsResolverProps, connector, refs, err = p.parseDataYAML(tmp.Partitions, inputConnector) if err != nil { return fmt.Errorf(`failed to parse "partitions": %w`, err) } node.Refs = append(node.Refs, refs...) + node.addPostParseHook(connector, p.addConnectorRef(connector)) // As a small convenience, automatically set the watermark field for resolvers where we know a good default if tmp.PartitionsWatermark == "" { @@ -214,12 +222,13 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { var modelTests []*runtimev1.ModelTest for i := range tmp.Tests { t := tmp.Tests[i] - modelTest, refs, err := p.parseModelTest(t.Name, &t.DataYAML, outputConnector, node.Name, t.Assert) + modelTest, connector, refs, err := p.parseModelTest(t.Name, &t.DataYAML, outputConnector, node.Name, t.Assert) if err != nil { return fmt.Errorf(`failed to parse test %q: %w`, t.Name, err) } modelTests = append(modelTests, modelTest) node.Refs = append(node.Refs, refs...) + node.addPostParseHook(connector, p.addConnectorRef(connector)) } var retryDelay *uint32 @@ -233,7 +242,7 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } // Insert the model - r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindModel, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } @@ -281,10 +290,10 @@ func (p *Parser) parseModel(ctx context.Context, node *Node) error { } // parseModelTests parses the model tests from the YAML file -func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelName, assert string) (*runtimev1.ModelTest, []ResourceName, error) { +func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelName, assert string) (*runtimev1.ModelTest, string, []ResourceName, error) { // Validate required name field if name == "" { - return nil, nil, fmt.Errorf(`test must have a "name" defined`) + return nil, "", nil, fmt.Errorf(`test must have a "name" defined`) } hasSQL := data.SQL != "" @@ -293,24 +302,24 @@ func (p *Parser) parseModelTest(name string, data *DataYAML, connector, modelNam // Validate that exactly one of "sql" or "assert" is provided switch { case hasSQL && hasAssertion: - return nil, nil, fmt.Errorf(`test %q must not have both "sql" and "assert" defined`, name) + return nil, "", nil, fmt.Errorf(`test %q must not have both "sql" and "assert" defined`, name) case !hasSQL && !hasAssertion: - return nil, nil, fmt.Errorf(`test %q must have either "sql" or "assert" defined`, name) + return nil, "", nil, fmt.Errorf(`test %q must have either "sql" or "assert" defined`, name) case hasAssertion: // Wrap assertion condition in a SQL query following SQLMesh audit pattern // Query for rows that violate the assertion (bad data) data.SQL = fmt.Sprintf("SELECT * FROM %s WHERE NOT (%s)", modelName, assert) } - resolver, props, refs, err := p.parseDataYAML(data, connector) + resolver, props, connector, refs, err := p.parseDataYAML(data, connector) if err != nil { - return nil, nil, err + return nil, "", nil, err } return &runtimev1.ModelTest{ Name: name, Resolver: resolver, ResolverProperties: props, - }, refs, nil + }, connector, refs, nil } // inferSQLRefs attempts to infer table references from the node's SQL. diff --git a/runtime/parser/parse_node.go b/runtime/parser/parse_node.go index 4e950135a35..ad29155c49c 100644 --- a/runtime/parser/parse_node.go +++ b/runtime/parser/parse_node.go @@ -29,6 +29,17 @@ type Node struct { SQLPath string SQLAnnotations map[string]any SQLUsesTemplating bool + + postParseHooks map[string]postParseHook +} + +func (n *Node) addPostParseHook(key string, hook postParseHook) { + if n.postParseHooks == nil { + n.postParseHooks = make(map[string]postParseHook) + } + if _, ok := n.postParseHooks[key]; !ok { + n.postParseHooks[key] = hook + } } // parseNode multiplexes to the appropriate parse function based on the node kind. diff --git a/runtime/parser/parse_partial_data.go b/runtime/parser/parse_partial_data.go index 0890bdc9ece..9e276db489c 100644 --- a/runtime/parser/parse_partial_data.go +++ b/runtime/parser/parse_partial_data.go @@ -22,10 +22,10 @@ type DataYAML struct { // parseDataYAML parses a data resolver and its properties from a DataYAML. // The contextualConnector argument is optional; if provided and the resolver supports a connector, it becomes the default connector for the resolver. // It returns the resolver name, its properties, and refs found in the resolver props. -func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (string, *structpb.Struct, []ResourceName, error) { +func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (string, *structpb.Struct, string, []ResourceName, error) { // Parse the resolver and its properties var count int - var resolver string + var resolver, connector string var refs []ResourceName resolverProps := make(map[string]any) @@ -35,10 +35,11 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin resolver = "sql" resolverProps["sql"] = raw.SQL if raw.Connector != "" { - resolverProps["connector"] = raw.Connector + connector = raw.Connector } else if contextualConnector != "" { - resolverProps["connector"] = contextualConnector + connector = contextualConnector } + resolverProps["connector"] = connector } // Handle metrics SQL resolver @@ -69,7 +70,7 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin props = make(map[string]any) err := raw.Glob.Decode(props) if err != nil { - return "", nil, nil, fmt.Errorf("failed to parse glob properties: %w", err) + return "", nil, "", nil, fmt.Errorf("failed to parse glob properties: %w", err) } } @@ -87,17 +88,17 @@ func (p *Parser) parseDataYAML(raw *DataYAML, contextualConnector string) (strin // Validate there was exactly one resolver if count == 0 { - return "", nil, nil, fmt.Errorf(`the API definition does not specify a resolver (for example, "sql:", "metrics_sql:", ...)`) + return "", nil, "", nil, fmt.Errorf(`the API definition does not specify a resolver (for example, "sql:", "metrics_sql:", ...)`) } if count > 1 { - return "", nil, nil, fmt.Errorf(`the API definition specifies more than one resolver`) + return "", nil, "", nil, fmt.Errorf(`the API definition specifies more than one resolver`) } // Convert resolver properties to structpb.Struct resolverPropsPB, err := structpb.NewStruct(resolverProps) if err != nil { - return "", nil, nil, fmt.Errorf("encountered invalid property type: %w", err) + return "", nil, "", nil, fmt.Errorf("encountered invalid property type: %w", err) } - return resolver, resolverPropsPB, refs, nil + return resolver, resolverPropsPB, connector, refs, nil } diff --git a/runtime/parser/parse_report.go b/runtime/parser/parse_report.go index a6debea5545..ccba36f49a4 100644 --- a/runtime/parser/parse_report.go +++ b/runtime/parser/parse_report.go @@ -12,6 +12,7 @@ import ( "github.com/rilldata/rill/runtime/drivers/slack" "github.com/rilldata/rill/runtime/pkg/duration" "github.com/rilldata/rill/runtime/pkg/pbutil" + "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/structpb" ) @@ -179,7 +180,7 @@ func (p *Parser) parseReport(node *Node) error { } // Track report - r, err := p.insertResource(ResourceKindReport, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindReport, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parse_theme.go b/runtime/parser/parse_theme.go index 574e5ea7f14..d40faf8f5e8 100644 --- a/runtime/parser/parse_theme.go +++ b/runtime/parser/parse_theme.go @@ -5,6 +5,7 @@ import ( "github.com/mazznoer/csscolorparser" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "golang.org/x/exp/maps" ) // ThemeYAML is the raw structure of a Theme for the UI in YAML (does not include common fields) @@ -116,7 +117,7 @@ func (p *Parser) parseTheme(node *Node) error { return err } - r, err := p.insertResource(ResourceKindTheme, node.Name, node.Paths, node.Refs...) + r, err := p.insertResource(ResourceKindTheme, node.Name, node.Paths, node.Refs, maps.Values(node.postParseHooks)) if err != nil { return err } diff --git a/runtime/parser/parser.go b/runtime/parser/parser.go index f5f25d4fbd7..1cc8e003afd 100644 --- a/runtime/parser/parser.go +++ b/runtime/parser/parser.go @@ -168,6 +168,10 @@ type Diff struct { Deleted []ResourceName } +// postParseHook is run after all resources have been parsed. +// It returns true if the resource was modified. +type postParseHook func(r *Resource) bool + // Parser parses a Rill project directory into a set of resources. // After the initial parse, the parser can be used to incrementally reparse a subset of files. // Parser is not concurrency safe. @@ -185,12 +189,14 @@ type Parser struct { Errors []*runtimev1.ParseError // Internal state - resourcesForPath map[string][]*Resource // Reverse index of Resource.Paths - resourcesForUnspecifiedRef map[string][]*Resource // Reverse index of Resource.rawRefs where kind=ResourceKindUnspecified - resourceNamesForDataPaths map[string][]ResourceName // Index of local data files to resources that depend on them - insertedResources []*Resource - updatedResources []*Resource - deletedResources []*Resource + resourcesForPath map[string][]*Resource // Reverse index of Resource.Paths + resourcesForUnspecifiedRef map[string][]*Resource // Reverse index of Resource.rawRefs where kind=ResourceKindUnspecified + resourceNamesForDataPaths map[string][]ResourceName // Index of local data files to resources that depend on them + insertedResources []*Resource + updatedResources []*Resource + deletedResources []*Resource + postParseHooks map[ResourceName][]postParseHook + resourceNamesForInferredRef map[string]map[ResourceName]bool } // ParseRillYAML parses only the project's rill.yaml (or rill.yml) file. @@ -332,6 +338,8 @@ func (p *Parser) reload(ctx context.Context) error { p.resourceNamesForDataPaths = make(map[string][]ResourceName) p.resourcesForPath = make(map[string][]*Resource) p.resourcesForUnspecifiedRef = make(map[string][]*Resource) + p.postParseHooks = make(map[ResourceName][]postParseHook) + p.resourceNamesForInferredRef = make(map[string]map[ResourceName]bool) p.insertedResources = nil p.updatedResources = nil p.deletedResources = nil @@ -354,11 +362,15 @@ func (p *Parser) reload(ctx context.Context) error { return err } + seenResources := make(map[ResourceName]bool) + // Infer unspecified refs for all inserted resources for _, r := range p.insertedResources { + seenResources[r.Name.Normalized()] = true p.inferUnspecifiedRefs(r) } + p.runPostParseHooks(seenResources) return nil } @@ -533,6 +545,9 @@ func (p *Parser) reparseExceptRillYAML(ctx context.Context, paths []string) (*Di } } + // Run post-parse hooks + p.runPostParseHooks(inferRefsSeen) + // Phase 3: Build the diff using p.insertedResources, p.updatedResources and p.deletedResources diff := &Diff{ ModifiedDotEnv: modifiedDotEnv, @@ -815,7 +830,7 @@ func (p *Parser) insertDryRun(kind ResourceKind, name string) error { // insertResource inserts a resource in the parser's internal state. // After calling insertResource, the caller can directly modify the returned resource's spec. -func (p *Parser) insertResource(kind ResourceKind, name string, paths []string, refs ...ResourceName) (*Resource, error) { +func (p *Parser) insertResource(kind ResourceKind, name string, paths []string, refs []ResourceName, postParseHooks []postParseHook) (*Resource, error) { // Create the resource if not already present (ensures the spec for its kind is never nil) rn := ResourceName{Kind: kind, Name: name} _, ok := p.Resources[rn.Normalized()] @@ -890,6 +905,11 @@ func (p *Parser) insertResource(kind ResourceKind, name string, paths []string, } } + // Track post-parse hooks + if len(postParseHooks) > 0 { + p.postParseHooks[r.Name.Normalized()] = postParseHooks + } + return r, nil } @@ -959,6 +979,24 @@ func (p *Parser) deleteResource(r *Resource) { } } + // Remove from postParseHooks + delete(p.postParseHooks, r.Name.Normalized()) + + // Remove from resourcesForInferredRef + // 1. If it is the resource which was inferred + delete(p.resourceNamesForInferredRef, strings.ToLower(r.Name.Name)) + // 2. If it is a resource which had an inferred ref to another resource + for k, resMap := range p.resourceNamesForInferredRef { + if _, ok := resMap[r.Name.Normalized()]; !ok { + continue + } + if len(resMap) == 1 { + delete(p.resourceNamesForInferredRef, k) + } else { + delete(resMap, r.Name.Normalized()) + } + } + // Track in deleted resources (unless it was in insertedResources, in which case it's not a real deletion) if !foundInInserted { p.deletedResources = append(p.deletedResources, r) @@ -1047,6 +1085,65 @@ func (p *Parser) isDev() bool { return strings.EqualFold(p.Environment, "dev") } +// addConnectorRef returns a post-parse hook to conditionally add a ref to the connector. +// A connector ref can't be added until after all resources have been parsed, because the connector resource +// may not be explicitly defined. +func (p *Parser) addConnectorRef(connector string) postParseHook { + return func(r *Resource) bool { + // check if the connector exists + n := ResourceName{ResourceKindConnector, connector} + if _, ok := p.Resources[n.Normalized()]; !ok { + return false + } + + c := strings.ToLower(connector) + if res, ok := p.resourceNamesForInferredRef[c]; ok { + if _, ok := res[r.Name.Normalized()]; ok { + // already has a ref + return false + } + } + + // add a ref to the connector if it doesn't already exist (due to an explicit ref) + for _, ref := range r.Refs { + if ref.Normalized() == n.Normalized() { + return false + } + } + r.Refs = append(r.Refs, n) + + // index in resourceNamesForInferredRef + resourceMap, ok := p.resourceNamesForInferredRef[c] + if !ok { + resourceMap = make(map[ResourceName]bool) + p.resourceNamesForInferredRef[c] = resourceMap + } + resourceMap[r.Name.Normalized()] = true + return true + } +} + +func (p *Parser) runPostParseHooks(seenResources map[ResourceName]bool) { + for rn, hooks := range p.postParseHooks { + for _, hook := range hooks { + r := p.Resources[rn] + if r == nil { + continue + } + if !hook(r) { + continue + } + // If the hook modified the resource, track it in updatedResources + // check if the resource is already in updatedResources/insertedResources + if _, ok := seenResources[r.Name.Normalized()]; ok { + continue + } + // otherwise, add to updatedResources + p.updatedResources = append(p.updatedResources, r) + } + } +} + // pathIsSQL returns true if the path is a SQL file func pathIsSQL(path string) bool { return strings.HasSuffix(path, ".sql") diff --git a/runtime/parser/parser_test.go b/runtime/parser/parser_test.go index dcfaec8ca44..8b509a64062 100644 --- a/runtime/parser/parser_test.go +++ b/runtime/parser/parser_test.go @@ -255,6 +255,7 @@ schema: default { Name: ResourceName{Kind: ResourceKindModel, Name: "s1"}, Paths: []string{"/sources/s1.yaml"}, + Refs: []ResourceName{{Kind: ResourceKindConnector, Name: "s3"}}, ModelSpec: &runtimev1.ModelSpec{ InputConnector: "s3", OutputConnector: "duckdb", @@ -269,6 +270,7 @@ schema: default { Name: ResourceName{Kind: ResourceKindModel, Name: "s2"}, Paths: []string{"/sources/s2.sql"}, + Refs: []ResourceName{{Kind: ResourceKindConnector, Name: "postgres"}}, ModelSpec: &runtimev1.ModelSpec{ InputConnector: "postgres", OutputConnector: "duckdb", @@ -848,6 +850,49 @@ func TestRefInferrence(t *testing.T) { }, diff) } +func TestConnectorRef(t *testing.T) { + ctx := context.Background() + files := map[string]string{ + // rill.yaml + `rill.yaml`: ``, + // connector duckdb + `connectors/duckdb.yaml`: ` +driver: duckdb +`, + // model m1 + `models/m1.sql`: ` +SELECT 1 +`, + } + resources := []*Resource{ + // m1 + { + Name: ResourceName{Kind: ResourceKindModel, Name: "m1"}, + Paths: []string{"/models/m1.sql"}, + Refs: []ResourceName{{Kind: ResourceKindConnector, Name: "duckdb"}}, + ModelSpec: &runtimev1.ModelSpec{ + RefreshSchedule: &runtimev1.Schedule{RefUpdate: true}, + InputConnector: "duckdb", + InputProperties: must(structpb.NewStruct(map[string]any{"sql": strings.TrimSpace(files["models/m1.sql"])})), + OutputConnector: "duckdb", + ChangeMode: runtimev1.ModelChangeMode_MODEL_CHANGE_MODE_RESET, + }, + }, + // duckdb connector + { + Name: ResourceName{Kind: ResourceKindConnector, Name: "duckdb"}, + Paths: []string{"/connectors/duckdb.yaml"}, + ConnectorSpec: &runtimev1.ConnectorSpec{ + Driver: "duckdb", + }, + }, + } + repo := makeRepo(t, files) + p, err := Parse(ctx, repo, "", "", "duckdb") + require.NoError(t, err) + requireResourcesAndErrors(t, p, resources, nil) +} + func BenchmarkReparse(b *testing.B) { ctx := context.Background() files := map[string]string{