From 27af6d24804ca9a8fa6b3b0aac78bdb378d50d15 Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 5 Oct 2025 12:13:51 +0700 Subject: [PATCH 1/2] Add template_processor --- internal/impl/pure/processor_template.go | 165 +++++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 internal/impl/pure/processor_template.go diff --git a/internal/impl/pure/processor_template.go b/internal/impl/pure/processor_template.go new file mode 100644 index 000000000..6f5ad1b00 --- /dev/null +++ b/internal/impl/pure/processor_template.go @@ -0,0 +1,165 @@ +// Copyright 2025 Redpanda Data, Inc. + +package pure + +import ( + "bytes" + "context" + "errors" + "text/template" + + "github.com/redpanda-data/benthos/v4/internal/bundle" + "github.com/redpanda-data/benthos/v4/internal/component/interop" + "github.com/redpanda-data/benthos/v4/public/bloblang" + "github.com/redpanda-data/benthos/v4/public/service" +) + +func tmplProcConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Categories("Utility"). + Summary("Executes a Go text/template template on the message content."). + Description(`This processor allows you to apply Go text/template templates to the structured content of messages. The template can access the message data as a structured object. Optionally, a Bloblang mapping can be applied first to transform the data before templating. + +For more information on the template syntax, see https://pkg.go.dev/text/template#hdr-Actions`). + Example( + "Execute template", + `This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`, + ` +input: + generate: + count: 1 + mapping: root.foo = "bar" + processors: + - template: + code: "{{ .foo }}" +`). + Example( + "Execute template with mapping", + `This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`, + ` +input: + generate: + count: 1 + mapping: root.foo = "bar" + processors: + - template: + code: "{{ .value }}" + mapping: "root.value = this.foo" +`). + Example( + "Execute template from file", + `This example loads a template from a file and applies it to the message.`, + ` +input: + generate: + count: 1 + mapping: root.foo = "bar" + processors: + - template: + code: | + {{ template "greeting" . }} + files: ["./templates/greeting.tmpl"] +`). + Fields( + service.NewStringField("code"). + Description("The template code to execute. This should be a valid Go text/template string."). + Example("{{.name}}"). + Optional(), + service.NewStringListField("files"). + Description("A list of file paths containing template definitions. Templates from these files will be parsed and available for execution."). + Optional(), + service.NewBloblangField("mapping"). + Description("An optional xref:guides:bloblang/about.adoc[Bloblang] mapping to apply to the message before executing the template. This allows you to transform the data structure before templating."). + Optional(), + ) +} + +func init() { + service.MustRegisterProcessor( + "template", + tmplProcConfig(), + func(conf *service.ParsedConfig, res *service.Resources) (service.Processor, error) { + mgr := interop.UnwrapManagement(res) + return templateFromParsed(conf, mgr) + }, + ) +} + +type tmplProc struct { + tmpl *template.Template + exec *bloblang.Executor +} + +func templateFromParsed(conf *service.ParsedConfig, mgr bundle.NewManagement) (*tmplProc, error) { + code, err := conf.FieldString("code") + if err != nil { + return nil, err + } + + files, err := conf.FieldStringList("files") + if err != nil { + return nil, err + } + + if code == "" && len(files) == 0 { + return nil, errors.New("code or files param must be specified") + } + + t := &tmplProc{tmpl: &template.Template{}} + if len(files) > 0 { + if t.tmpl, err = t.tmpl.ParseFiles(files...); err != nil { + return nil, err + } + } + + if code != "" { + if t.tmpl, err = t.tmpl.New("code").Parse(code); err != nil { + return nil, err + } + } + + if conf.Contains("mapping") { + if t.exec, err = conf.FieldBloblang("mapping"); err != nil { + return nil, err + } + } + + return t, nil +} + +func (t *tmplProc) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { + var data any + var err error + if t.exec != nil { + mapRes, err := msg.BloblangQuery(t.exec) + if err != nil { + return nil, err + } + + data, err = mapRes.AsStructured() + if err != nil { + return nil, err + } + } else { + data, err = msg.AsStructured() + if err != nil { + return nil, err + } + } + + var buf bytes.Buffer + if err := t.tmpl.Execute(&buf, data); err != nil { + return nil, err + } + + msg.SetBytes(buf.Bytes()) + + return service.MessageBatch{msg}, nil +} + +func (t *tmplProc) Close(ctx context.Context) error { + _, err := t.tmpl.Clone() + + return err +} From 25e9537275f84bdc3a7a182f785583f61551566e Mon Sep 17 00:00:00 2001 From: Artem Klevtsov Date: Sun, 5 Oct 2025 12:32:54 +0700 Subject: [PATCH 2/2] Add glob support to the template processor --- internal/impl/pure/processor_template.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/internal/impl/pure/processor_template.go b/internal/impl/pure/processor_template.go index 6f5ad1b00..f9f600220 100644 --- a/internal/impl/pure/processor_template.go +++ b/internal/impl/pure/processor_template.go @@ -39,10 +39,10 @@ input: `This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`, ` input: - generate: + generate: count: 1 mapping: root.foo = "bar" - processors: + processors: - template: code: "{{ .value }}" mapping: "root.value = this.foo" @@ -52,13 +52,13 @@ input: `This example loads a template from a file and applies it to the message.`, ` input: - generate: + generate: count: 1 mapping: root.foo = "bar" - processors: - - template: - code: | - {{ template "greeting" . }} + processors: + - template: + code: | + {{ template "greeting" . }} files: ["./templates/greeting.tmpl"] `). Fields( @@ -67,7 +67,7 @@ input: Example("{{.name}}"). Optional(), service.NewStringListField("files"). - Description("A list of file paths containing template definitions. Templates from these files will be parsed and available for execution."). + Description("A list of file paths containing template definitions. Templates from these files will be parsed and available for execution. Glob patterns are supported, including super globs (double star)."). Optional(), service.NewBloblangField("mapping"). Description("An optional xref:guides:bloblang/about.adoc[Bloblang] mapping to apply to the message before executing the template. This allows you to transform the data structure before templating."). @@ -108,8 +108,10 @@ func templateFromParsed(conf *service.ParsedConfig, mgr bundle.NewManagement) (* t := &tmplProc{tmpl: &template.Template{}} if len(files) > 0 { - if t.tmpl, err = t.tmpl.ParseFiles(files...); err != nil { - return nil, err + for _, f := range files { + if t.tmpl, err = t.tmpl.ParseGlob(f); err != nil { + return nil, err + } } }