diff --git a/cmd/config/configuration.go b/cmd/config/configuration.go
index 337e0af3..32eefb76 100644
--- a/cmd/config/configuration.go
+++ b/cmd/config/configuration.go
@@ -1,15 +1,20 @@
package config
import (
+ "fmt"
"os"
"path"
"path/filepath"
+ "slices"
"strconv"
+ "strings"
"text/template"
"time"
"github.com/YuukanOO/seelf/cmd/serve"
"github.com/YuukanOO/seelf/internal/deployment/domain"
+ "github.com/YuukanOO/seelf/pkg/bus"
+ "github.com/YuukanOO/seelf/pkg/bus/embedded"
"github.com/YuukanOO/seelf/pkg/bytesize"
"github.com/YuukanOO/seelf/pkg/config"
"github.com/YuukanOO/seelf/pkg/crypto"
@@ -18,11 +23,13 @@ import (
"github.com/YuukanOO/seelf/pkg/monad"
"github.com/YuukanOO/seelf/pkg/must"
"github.com/YuukanOO/seelf/pkg/ostools"
+ "github.com/YuukanOO/seelf/pkg/storage"
"github.com/YuukanOO/seelf/pkg/validate"
- "github.com/YuukanOO/seelf/pkg/validate/numbers"
+ "github.com/YuukanOO/seelf/pkg/validate/arrays"
)
var (
+ dotEnvFilenames = []string{".env", ".env.local"}
userConfigDir = must.Panic(os.UserConfigDir())
generatedSecretKey = must.Panic(crypto.RandomKey[string](64))
defaultDataDirectory = filepath.Join(userConfigDir, "seelf")
@@ -31,16 +38,16 @@ var (
)
const (
- databaseConnectionString = "seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate&_synchronous=NORMAL"
- defaultConfigFilename = "conf.yml"
- defaultPort = 8080
- defaultHost = ""
- defaultRunnersPollInterval = "4s"
- defaultRunnersDeploymentCount = 4
- defaultCleanupDeploymentCount = 2
- defaultBalancerDomain = "http://docker.localhost"
- defaultDeploymentDirTemplate = "{{ .Environment }}"
- defaultSourceArchiveMaxSize = "32mb"
+ databaseConnectionString = "seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate&_synchronous=NORMAL"
+ defaultConfigFilename = "conf.yml"
+ defaultPort = 8080
+ defaultHost = ""
+ defaultRunnersPollInterval = "4s"
+ defaultDeploymentWorkersCount uint8 = 4
+ defaultGeneralWorkersCount uint8 = 2
+ defaultBalancerDomain = "http://docker.localhost"
+ defaultDeploymentDirTemplate = "{{ .Environment }}"
+ defaultSourceArchiveMaxSize = "32mb"
)
type (
@@ -60,20 +67,16 @@ type (
Data dataConfiguration
Source sourceConfiguration
Http httpConfiguration
- Runners runnersConfiguration
+ Runners runnersConfiguration `env:"RUNNERS_CONFIGURATION"`
Private internalConfiguration `yaml:"-"`
-
- appExposedUrl monad.Maybe[domain.Url]
- pollInterval time.Duration
- deploymentDirTemplate *template.Template
- logLevel log.Level
- logFormat log.OutputFormat
- sourceArchiveMaxSize int64
}
logConfiguration struct {
Level string `env:"LOG_LEVEL"`
Format string `env:"LOG_FORMAT"`
+
+ level log.Level
+ format log.OutputFormat
}
httpConfiguration struct {
@@ -87,10 +90,14 @@ type (
dataConfiguration struct {
Path string `env:"DATA_PATH"`
DeploymentDirTemplate string `env:"DEPLOYMENT_DIR_TEMPLATE" yaml:"deployment_dir_template"`
+
+ deploymentDirTemplate *template.Template
}
sourceArchiveConfiguration struct {
MaxSize string `env:"SOURCE_ARCHIVE_MAX_SIZE" yaml:"max_size"`
+
+ maxSize int64
}
// Contains configuration related to deployment sources
@@ -98,11 +105,13 @@ type (
Archive sourceArchiveConfiguration
}
- // Configuration related to the async jobs runners.
- runnersConfiguration struct {
- PollInterval string `env:"RUNNERS_POLL_INTERVAL" yaml:"poll_interval"`
- Deployment int `env:"RUNNERS_DEPLOYMENT_COUNT" yaml:"deployment"`
- Cleanup int `env:"RUNNERS_CLEANUP_COUNT" yaml:"cleanup"`
+ // Configuration related to the background runners.
+ runnerConfiguration struct {
+ PollInterval string `yaml:"poll_interval"`
+ Count uint8 `yaml:"count"`
+ Jobs []string `yaml:"jobs,omitempty"`
+
+ pollInterval time.Duration
}
// internalConfiguration fields not read from the configuration file and use only during specific steps
@@ -110,6 +119,8 @@ type (
Email string `env:"SEELF_ADMIN_EMAIL,ADMIN_EMAIL"`
Password string `env:"SEELF_ADMIN_PASSWORD,ADMIN_PASSWORD"`
ExposedOn string `env:"EXPOSED_ON"` // Container name and default target url (ie. http://seelf@docker.localhost)
+
+ exposedUrl monad.Maybe[domain.Url]
}
)
@@ -134,11 +145,11 @@ func Default(builders ...ConfigurationBuilder) Configuration {
Port: defaultPort,
Secret: generatedSecretKey,
},
- Runners: runnersConfiguration{
- PollInterval: defaultRunnersPollInterval,
- Deployment: defaultRunnersDeploymentCount,
- Cleanup: defaultCleanupDeploymentCount,
- },
+ Runners: defaultRunnersConfiguration(
+ defaultRunnersPollInterval,
+ defaultDeploymentWorkersCount,
+ defaultGeneralWorkersCount,
+ ),
}
for _, builder := range builders {
@@ -153,27 +164,30 @@ func Default(builders ...ConfigurationBuilder) Configuration {
}
func (c *configuration) Initialize(logger log.ConfigurableLogger, path string) error {
- exists, err := config.Load(path, c)
+ var configFileFound bool
- if err != nil {
+ if err := config.Load(c,
+ config.FromYAML(path, &configFileFound),
+ config.FromEnvironment(dotEnvFilenames...),
+ ); err != nil {
return err
}
- if err = c.validate(); err != nil {
+ if err := c.validate(); err != nil {
return err
}
// Make sure the data path exists
- if err = ostools.MkdirAll(c.Data.Path); err != nil {
+ if err := ostools.MkdirAll(c.Data.Path); err != nil {
return err
}
// Update logger based on loaded configuration
- if err = logger.Configure(c.logFormat, c.logLevel); err != nil {
+ if err := logger.Configure(c.Log.format, c.Log.level); err != nil {
return err
}
- if exists {
+ if configFileFound {
logger.Infow("configuration loaded",
"path", path)
} else {
@@ -193,17 +207,64 @@ func (c *configuration) Initialize(logger log.ConfigurableLogger, path string) e
return nil
}
-func (c *configuration) DataDir() string { return c.Data.Path }
-func (c *configuration) DeploymentDirTemplate() *template.Template { return c.deploymentDirTemplate }
-func (c *configuration) AppExposedUrl() monad.Maybe[domain.Url] { return c.appExposedUrl }
-func (c *configuration) DefaultEmail() string { return c.Private.Email }
-func (c *configuration) DefaultPassword() string { return c.Private.Password }
-func (c *configuration) Secret() []byte { return []byte(c.Http.Secret) }
-func (c *configuration) RunnersPollInterval() time.Duration { return c.pollInterval }
-func (c *configuration) RunnersDeploymentCount() int { return c.Runners.Deployment }
-func (c *configuration) RunnersCleanupCount() int { return c.Runners.Cleanup }
-func (c *configuration) IsDebug() bool { return c.logLevel == log.DebugLevel }
-func (c *configuration) MaxDeploymentArchiveFileSize() int64 { return c.sourceArchiveMaxSize }
+func (c *configuration) DataDir() string { return c.Data.Path }
+func (c *configuration) DeploymentDirTemplate() *template.Template {
+ return c.Data.deploymentDirTemplate
+}
+func (c *configuration) AppExposedUrl() monad.Maybe[domain.Url] { return c.Private.exposedUrl }
+func (c *configuration) DefaultEmail() string { return c.Private.Email }
+func (c *configuration) DefaultPassword() string { return c.Private.Password }
+func (c *configuration) Secret() []byte { return []byte(c.Http.Secret) }
+func (c *configuration) IsDebug() bool { return c.Log.level == log.DebugLevel }
+func (c *configuration) MaxDeploymentArchiveFileSize() int64 {
+ return c.Source.Archive.maxSize
+}
+
+func (c *configuration) RunnersDefinitions(mapper *storage.DiscriminatedMapper[bus.AsyncRequest]) ([]embedded.RunnerDefinition, error) {
+ definitions := make([]embedded.RunnerDefinition, len(c.Runners))
+ unhandledMessages := mapper.Keys()
+
+ for i, r := range c.Runners {
+ // No specific messages set, handle all messages not seen already.
+ // Since the validate function ensure only the last worker can have an empty list,
+ // we should be good.
+ if len(r.Jobs) == 0 {
+ r.Jobs = unhandledMessages
+ unhandledMessages = nil
+ }
+
+ messages := make([]bus.AsyncRequest, len(r.Jobs))
+
+ for j, msg := range r.Jobs {
+ // Remove the msg from the unhandledMessages
+ msgIdx := slices.Index(unhandledMessages, msg)
+
+ if msgIdx != -1 {
+ unhandledMessages = slices.Delete(unhandledMessages, msgIdx, msgIdx+1)
+ }
+
+ req, err := mapper.From(msg, "{}")
+
+ if err != nil {
+ return nil, fmt.Errorf("unknown job name: %s, must be one of %s", msg, strings.Join(mapper.Keys(), ", "))
+ }
+
+ messages[j] = req
+ }
+
+ definitions[i] = embedded.RunnerDefinition{
+ PollInterval: r.pollInterval,
+ WorkersCount: r.Count,
+ Messages: messages,
+ }
+ }
+
+ if len(unhandledMessages) > 0 {
+ return nil, fmt.Errorf("some background jobs are not handled: %s, please fix your configuration by adding a worker to handle them", strings.Join(unhandledMessages, ", "))
+ }
+
+ return definitions, nil
+}
func (c *configuration) IsSecure() bool {
// If secure has been explicitly isSet, returns it
@@ -211,7 +272,7 @@ func (c *configuration) IsSecure() bool {
return secure
}
- if defaultTargetUrl, isSet := c.appExposedUrl.TryGet(); isSet {
+ if defaultTargetUrl, isSet := c.Private.exposedUrl.TryGet(); isSet {
return defaultTargetUrl.UseSSL()
}
@@ -229,14 +290,21 @@ func (c *configuration) ListenAddress() string {
}
func (c *configuration) validate() error {
+ lastRunnerIdx := len(c.Runners) - 1
+
return validate.Struct(validate.Of{
- "log.level": validate.Value(c.Log.Level, &c.logLevel, log.ParseLevel),
- "log.format": validate.Value(c.Log.Format, &c.logFormat, log.ParseFormat),
- "source.archive.max_size": validate.Value(c.Source.Archive.MaxSize, &c.sourceArchiveMaxSize, bytesize.Parse),
- "data.deployment_dir_template": validate.Value(c.Data.DeploymentDirTemplate, &c.deploymentDirTemplate, template.New("").Parse),
- "runners.poll_interval": validate.Value(c.Runners.PollInterval, &c.pollInterval, time.ParseDuration),
- "runners.deployment": validate.Field(c.Runners.Deployment, numbers.Min(1)),
- "runners.cleanup": validate.Field(c.Runners.Cleanup, numbers.Min(1)),
+ "log.level": validate.Value(c.Log.Level, &c.Log.level, log.ParseLevel),
+ "log.format": validate.Value(c.Log.Format, &c.Log.format, log.ParseFormat),
+ "source.archive.max_size": validate.Value(c.Source.Archive.MaxSize, &c.Source.Archive.maxSize, bytesize.Parse),
+ "data.deployment_dir_template": validate.Value(c.Data.DeploymentDirTemplate, &c.Data.deploymentDirTemplate, template.New("").Parse),
+ "runners": validate.Array(c.Runners, func(runner runnerConfiguration, idx int) error {
+ return validate.Struct(validate.Of{
+ "poll_interval": validate.Value(runner.PollInterval, &c.Runners[idx].pollInterval, time.ParseDuration),
+ "jobs": validate.If(idx != lastRunnerIdx, func() error {
+ return validate.Field(runner.Jobs, arrays.Required)
+ }),
+ })
+ }),
"exposed_as": validate.If(c.Private.ExposedOn != "", func() error {
url, err := domain.UrlFrom(c.Private.ExposedOn)
@@ -244,7 +312,7 @@ func (c *configuration) validate() error {
return err
}
- c.appExposedUrl.Set(url)
+ c.Private.exposedUrl.Set(url)
return nil
}),
diff --git a/cmd/config/configuration_test.go b/cmd/config/configuration_test.go
new file mode 100644
index 00000000..5d5ec3d3
--- /dev/null
+++ b/cmd/config/configuration_test.go
@@ -0,0 +1,217 @@
+package config_test
+
+import (
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/YuukanOO/seelf/cmd/config"
+ "github.com/YuukanOO/seelf/pkg/assert"
+ "github.com/YuukanOO/seelf/pkg/bus"
+ "github.com/YuukanOO/seelf/pkg/log"
+ "github.com/YuukanOO/seelf/pkg/must"
+ "github.com/YuukanOO/seelf/pkg/storage"
+)
+
+var (
+ someOtherMessage = message{name: "some.other.message"}
+ anotherMessage = message{name: "and.another.message"}
+ deployCommand = message{name: "deployment.command.deploy"}
+)
+
+func Test_Configuration(t *testing.T) {
+
+ t.Run("should correctly load a configuration from a yaml file", func(t *testing.T) {
+ mapper := buildMapper(someOtherMessage, anotherMessage, deployCommand)
+ logger, conf, err := loadConfiguration("valid-config.yml", nil)
+
+ assert.Nil(t, err)
+ assert.Equal(t, log.DebugLevel, logger.level)
+ assert.Equal(t, log.OutputJSON, logger.format)
+ assert.True(t, conf.IsDebug())
+ assert.True(t, conf.IsSecure())
+ assert.Equal(t, "localhost:5000", conf.ListenAddress())
+ assert.Equal(t, "file:testdata/seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate&_synchronous=NORMAL", conf.ConnectionString())
+ assert.Equal(t, "testdata", conf.DataDir())
+ assert.Equal(t, 10485760, conf.MaxDeploymentArchiveFileSize())
+ assert.DeepEqual(t, []byte("top_secret"), conf.Secret())
+
+ runners, err := conf.RunnersDefinitions(mapper)
+ assert.Nil(t, err)
+ assert.HasLength(t, 2, runners)
+
+ runner := runners[0]
+ assert.Equal(t, 5*time.Second, runner.PollInterval)
+ assert.Equal(t, 4, runner.WorkersCount)
+ assert.DeepEqual(t, []bus.AsyncRequest{
+ deployCommand,
+ }, runner.Messages)
+
+ runner = runners[1]
+ assert.Equal(t, 10*time.Second, runner.PollInterval)
+ assert.Equal(t, 5, runner.WorkersCount)
+ assert.ArrayEqualFunc(t, []bus.AsyncRequest{
+ someOtherMessage,
+ anotherMessage,
+ }, runner.Messages, compareRequestFunc)
+ })
+
+ t.Run("should correctly handle deprecated runners configuration", func(t *testing.T) {
+ mapper := buildMapper(someOtherMessage, anotherMessage, deployCommand)
+ _, conf, err := loadConfiguration("deprecated-runners-config.yml", nil)
+
+ assert.Nil(t, err)
+
+ runners, err := conf.RunnersDefinitions(mapper)
+ assert.Nil(t, err)
+ assert.HasLength(t, 2, runners)
+
+ runner := runners[0]
+ assert.Equal(t, 10*time.Second, runner.PollInterval)
+ assert.Equal(t, 5, runner.WorkersCount)
+ assert.DeepEqual(t, []bus.AsyncRequest{
+ deployCommand,
+ }, runner.Messages)
+
+ runner = runners[1]
+ assert.Equal(t, 10*time.Second, runner.PollInterval)
+ assert.Equal(t, 3, runner.WorkersCount)
+ assert.ArrayEqualFunc(t, []bus.AsyncRequest{
+ someOtherMessage,
+ anotherMessage,
+ }, runner.Messages, compareRequestFunc)
+ })
+
+ t.Run("should correctly handle environment values as taking precedence", func(t *testing.T) {
+ mapper := buildMapper(someOtherMessage, anotherMessage, deployCommand)
+ logger, conf, err := loadConfiguration("config-to-override.yml", map[string]string{
+ "LOG_LEVEL": "info",
+ "LOG_FORMAT": "console",
+ "DATA_PATH": "testdata",
+ "HTTP_HOST": "192.0.1.68",
+ "HTTP_PORT": "8080",
+ "HTTP_SECRET": "my_secret",
+ "ADMIN_EMAIL": "admin@example.com",
+ "ADMIN_PASSWORD": "mypassword",
+ "SOURCE_ARCHIVE_MAX_SIZE": "20mb",
+ "EXPOSED_ON": "https://seelf.somewhere.com",
+ "RUNNERS_CONFIGURATION": "4s;2;deployment.command.deploy|4s;3;",
+ })
+
+ assert.Nil(t, err)
+ assert.Equal(t, log.InfoLevel, logger.level)
+ assert.Equal(t, log.OutputConsole, logger.format)
+ assert.False(t, conf.IsDebug())
+ assert.True(t, conf.IsSecure())
+ assert.Equal(t, "192.0.1.68:8080", conf.ListenAddress())
+ assert.Equal(t, "file:testdata/seelf.db?_journal=WAL&_timeout=5000&_foreign_keys=yes&_txlock=immediate&_synchronous=NORMAL", conf.ConnectionString())
+ assert.Equal(t, "testdata", conf.DataDir())
+ assert.Equal(t, 20971520, conf.MaxDeploymentArchiveFileSize())
+ assert.DeepEqual(t, []byte("my_secret"), conf.Secret())
+ assert.Equal(t, "admin@example.com", conf.DefaultEmail())
+ assert.Equal(t, "mypassword", conf.DefaultPassword())
+ assert.True(t, conf.AppExposedUrl().HasValue())
+ assert.Equal(t, "https://seelf.somewhere.com", conf.AppExposedUrl().MustGet().String())
+
+ runners, err := conf.RunnersDefinitions(mapper)
+ assert.Nil(t, err)
+ assert.HasLength(t, 2, runners)
+
+ runner := runners[0]
+ assert.Equal(t, 4*time.Second, runner.PollInterval)
+ assert.Equal(t, 2, runner.WorkersCount)
+ assert.DeepEqual(t, []bus.AsyncRequest{
+ deployCommand,
+ }, runner.Messages)
+
+ runner = runners[1]
+ assert.Equal(t, 4*time.Second, runner.PollInterval)
+ assert.Equal(t, 3, runner.WorkersCount)
+ assert.ArrayEqualFunc(t, []bus.AsyncRequest{
+ someOtherMessage,
+ anotherMessage,
+ }, runner.Messages, compareRequestFunc)
+ })
+
+ t.Run("should fail to build runners definition if some jobs are not handled", func(t *testing.T) {
+ mapper := buildMapper(someOtherMessage, anotherMessage, deployCommand)
+ _, config, err := loadConfiguration("valid-config.yml", map[string]string{
+ "RUNNERS_CONFIGURATION": "4s;2;deployment.command.deploy,some.other.message",
+ })
+
+ assert.Nil(t, err)
+
+ _, err = config.RunnersDefinitions(mapper)
+
+ assert.NotNil(t, err)
+ assert.Equal(t, "some background jobs are not handled: and.another.message, please fix your configuration by adding a worker to handle them", err.Error())
+ })
+
+ t.Run("should fail to build runners definition if some jobs does not exist", func(t *testing.T) {
+ mapper := buildMapper(someOtherMessage, anotherMessage, deployCommand)
+ _, config, err := loadConfiguration("valid-config.yml", map[string]string{
+ "RUNNERS_CONFIGURATION": "4s;2;non.existent.job|4s;2;",
+ })
+
+ assert.Nil(t, err)
+
+ _, err = config.RunnersDefinitions(mapper)
+
+ assert.NotNil(t, err)
+ assert.Match(t, "unknown job name: non.existent.job, must be one of .*,.*", err.Error())
+ })
+}
+
+func loadConfiguration(configName string, envValues map[string]string) (*testLogger, config.Configuration, error) {
+ confFilename := filepath.Join("testdata", configName)
+ logger := &testLogger{
+ ConfigurableLogger: must.Panic(log.NewLogger()),
+ }
+
+ os.Clearenv()
+
+ for k, v := range envValues {
+ os.Setenv(k, v)
+ }
+
+ conf := config.Default()
+ err := conf.Initialize(logger, confFilename)
+
+ return logger, conf, err
+}
+
+func buildMapper(messages ...message) *storage.DiscriminatedMapper[bus.AsyncRequest] {
+ mapper := storage.NewDiscriminatedMapper(func(a bus.AsyncRequest) string { return a.Name_() })
+
+ for _, m := range messages {
+ mapper.Register(m, func(s string) (bus.AsyncRequest, error) { return m, nil })
+ }
+
+ return mapper
+}
+
+type testLogger struct {
+ log.ConfigurableLogger
+ format log.OutputFormat
+ level log.Level
+}
+
+func (t *testLogger) Configure(format log.OutputFormat, level log.Level) error {
+ t.format = format
+ t.level = level
+
+ return t.ConfigurableLogger.Configure(format, level)
+}
+
+type message struct {
+ bus.AsyncCommand
+
+ name string
+}
+
+func (m message) Name_() string { return m.name }
+func (m message) Group() string { return "" }
+
+func compareRequestFunc(a, b bus.AsyncRequest) int { return strings.Compare(a.Name_(), b.Name_()) }
diff --git a/cmd/config/runners.go b/cmd/config/runners.go
new file mode 100644
index 00000000..9ee21b56
--- /dev/null
+++ b/cmd/config/runners.go
@@ -0,0 +1,124 @@
+package config
+
+import (
+ "errors"
+ "strconv"
+ "strings"
+
+ "github.com/YuukanOO/seelf/internal/deployment/app/deploy"
+ "gopkg.in/yaml.v3"
+)
+
+const (
+ runnersEnvExpectedParts = 3
+ runnersEnvPartSeparator = ";"
+ runnersEnvJobNamesSeparator = ","
+ runnersEnvSeparator = "|"
+)
+
+type (
+ runnersConfiguration []runnerConfiguration
+
+ deprecatedRunnerConfiguration struct {
+ PollInterval string `yaml:"poll_interval"`
+ Deployment int `yaml:"deployment"`
+ Cleanup int `yaml:"cleanup"`
+ }
+)
+
+var ErrRunnersEnvParseFailed = errors.New("failed to parse runners configuration from environment")
+
+func (r *runnersConfiguration) UnmarshalYAML(value *yaml.Node) error {
+ initialErr := r.tryDecodeCurrentFormat(value)
+
+ if initialErr == nil {
+ return nil
+ }
+
+ if err := r.tryDecodeDeprecatedFormat(value); err != nil {
+ // If there's still an error, just returns the initial one as it's probably the one we want
+ return initialErr
+ }
+
+ return nil
+}
+
+func (r *runnersConfiguration) UnmarshalEnvironmentValue(data string) error {
+ // No need to go further if the value is empty
+ if data == "" {
+ return ErrRunnersEnvParseFailed
+ }
+
+ runners := strings.Split(data, runnersEnvSeparator)
+
+ *r = make(runnersConfiguration, len(runners))
+
+ for i, runnerStr := range runners {
+ parts := strings.SplitN(runnerStr, runnersEnvPartSeparator, runnersEnvExpectedParts)
+
+ if len(parts) != runnersEnvExpectedParts {
+ return ErrRunnersEnvParseFailed
+ }
+
+ conf := runnerConfiguration{
+ PollInterval: parts[0],
+ }
+
+ if parts[2] != "" {
+ conf.Jobs = strings.Split(parts[2], runnersEnvJobNamesSeparator)
+ }
+
+ workersCount, err := strconv.Atoi(parts[1])
+
+ if err != nil {
+ return ErrRunnersEnvParseFailed
+ }
+
+ conf.Count = uint8(workersCount)
+
+ (*r)[i] = conf
+ }
+
+ return nil
+}
+
+func (r *runnersConfiguration) tryDecodeCurrentFormat(value *yaml.Node) error {
+ var runnersConf []runnerConfiguration
+
+ if err := value.Decode(&runnersConf); err != nil {
+ return err
+ }
+
+ *r = runnersConf
+ return nil
+}
+
+func (r *runnersConfiguration) tryDecodeDeprecatedFormat(value *yaml.Node) error {
+ var oldRunnersConfiguration deprecatedRunnerConfiguration
+
+ if err := value.Decode(&oldRunnersConfiguration); err != nil {
+ return err
+ }
+
+ // Migrate to the new configuration
+ *r = defaultRunnersConfiguration(
+ oldRunnersConfiguration.PollInterval,
+ uint8(oldRunnersConfiguration.Deployment),
+ uint8(oldRunnersConfiguration.Cleanup),
+ )
+ return nil
+}
+
+func defaultRunnersConfiguration(pollInterval string, deploymentsCount uint8, generalCount uint8) runnersConfiguration {
+ return runnersConfiguration{
+ {
+ PollInterval: pollInterval,
+ Count: deploymentsCount,
+ Jobs: []string{deploy.Command{}.Name_()},
+ },
+ {
+ PollInterval: pollInterval,
+ Count: generalCount,
+ },
+ }
+}
diff --git a/cmd/config/testdata/config-to-override.yml b/cmd/config/testdata/config-to-override.yml
new file mode 100644
index 00000000..bd50d7c8
--- /dev/null
+++ b/cmd/config/testdata/config-to-override.yml
@@ -0,0 +1,20 @@
+log:
+ level: debug
+ format: json
+data:
+ path: "somewhere_else"
+ deployment_dir_template: "{{ .Environment }}"
+source:
+ archive:
+ max_size: 10mb
+http:
+ host: localhost
+ port: 5000
+ secret: top_secret
+runners:
+ - poll_interval: 5s
+ count: 4
+ jobs:
+ - deployment.command.deploy
+ - poll_interval: 10s
+ count: 5
diff --git a/cmd/config/testdata/deprecated-runners-config.yml b/cmd/config/testdata/deprecated-runners-config.yml
new file mode 100644
index 00000000..206ff7d6
--- /dev/null
+++ b/cmd/config/testdata/deprecated-runners-config.yml
@@ -0,0 +1,18 @@
+log:
+ level: debug
+ format: json
+data:
+ path: "testdata"
+ deployment_dir_template: "{{ .Environment }}"
+source:
+ archive:
+ max_size: 10mb
+http:
+ host: localhost
+ port: 5000
+ secret: top_secret
+ secure: true
+runners:
+ poll_interval: 10s
+ deployment: 5
+ cleanup: 3
diff --git a/cmd/config/testdata/valid-config.yml b/cmd/config/testdata/valid-config.yml
new file mode 100644
index 00000000..66d491fc
--- /dev/null
+++ b/cmd/config/testdata/valid-config.yml
@@ -0,0 +1,21 @@
+log:
+ level: debug
+ format: json
+data:
+ path: "testdata"
+ deployment_dir_template: "{{ .Environment }}"
+source:
+ archive:
+ max_size: 10mb
+http:
+ host: localhost
+ port: 5000
+ secret: top_secret
+ secure: true
+runners:
+ - poll_interval: 5s
+ count: 4
+ jobs:
+ - deployment.command.deploy
+ - poll_interval: 10s
+ count: 5
diff --git a/cmd/startup/server.go b/cmd/startup/server.go
index 8d78d617..14bc76b4 100644
--- a/cmd/startup/server.go
+++ b/cmd/startup/server.go
@@ -2,15 +2,10 @@ package startup
import (
"context"
- "time"
"github.com/YuukanOO/seelf/internal/auth/app/create_first_account"
auth "github.com/YuukanOO/seelf/internal/auth/domain"
authinfra "github.com/YuukanOO/seelf/internal/auth/infra"
- "github.com/YuukanOO/seelf/internal/deployment/app/cleanup_app"
- "github.com/YuukanOO/seelf/internal/deployment/app/cleanup_target"
- "github.com/YuukanOO/seelf/internal/deployment/app/configure_target"
- "github.com/YuukanOO/seelf/internal/deployment/app/deploy"
"github.com/YuukanOO/seelf/internal/deployment/app/expose_seelf_container"
deployment "github.com/YuukanOO/seelf/internal/deployment/domain"
deploymentinfra "github.com/YuukanOO/seelf/internal/deployment/infra"
@@ -19,6 +14,7 @@ import (
bussqlite "github.com/YuukanOO/seelf/pkg/bus/sqlite"
"github.com/YuukanOO/seelf/pkg/log"
"github.com/YuukanOO/seelf/pkg/monad"
+ "github.com/YuukanOO/seelf/pkg/storage"
"github.com/YuukanOO/seelf/pkg/storage/sqlite"
)
@@ -29,17 +25,15 @@ type (
AppExposedUrl() monad.Maybe[deployment.Url]
DefaultEmail() string
DefaultPassword() string
- RunnersPollInterval() time.Duration
- RunnersDeploymentCount() int
- RunnersCleanupCount() int
ConnectionString() string
+ RunnersDefinitions(*storage.DiscriminatedMapper[bus.AsyncRequest]) ([]embedded.RunnerDefinition, error)
}
ServerRoot struct {
- bus bus.Bus
- logger log.Logger
- db *sqlite.Database
- scheduler *embedded.Runner
+ bus bus.Bus
+ logger log.Logger
+ db *sqlite.Database
+ orchestrator *embedded.Orchestrator
}
)
@@ -75,21 +69,6 @@ func Server(options ServerOptions, logger log.Logger) (root *ServerRoot, err err
return
}
- s.scheduler = embedded.NewRunner(jobsStore, s.logger, s.bus, options.RunnersPollInterval(),
- embedded.WorkerGroup{
- Size: options.RunnersDeploymentCount(),
- Requests: []bus.AsyncRequest{deploy.Command{}},
- },
- embedded.WorkerGroup{
- Size: options.RunnersCleanupCount(),
- Requests: []bus.AsyncRequest{
- cleanup_app.Command{},
- cleanup_target.Command{},
- configure_target.Command{},
- },
- },
- )
-
// Setup auth infrastructure
if err = authinfra.Setup(s.logger, s.db, s.bus); err != nil {
return
@@ -106,6 +85,15 @@ func Server(options ServerOptions, logger log.Logger) (root *ServerRoot, err err
return
}
+ // Build the background jobs orchestrator
+ runners, err := options.RunnersDefinitions(bus.Marshallable)
+
+ if err != nil {
+ return
+ }
+
+ s.orchestrator = embedded.NewOrchestrator(jobsStore, s.bus, s.logger, runners...)
+
// Create the first account if needed
uid, err := bus.Send(s.bus, context.Background(), create_first_account.Command{
Email: options.DefaultEmail(),
@@ -131,7 +119,7 @@ func Server(options ServerOptions, logger log.Logger) (root *ServerRoot, err err
}
}
- s.scheduler.Start()
+ s.orchestrator.Start()
root = s
return
}
@@ -139,7 +127,7 @@ func Server(options ServerOptions, logger log.Logger) (root *ServerRoot, err err
func (s *ServerRoot) Cleanup() error {
s.logger.Debug("cleaning server services")
- s.scheduler.Stop()
+ s.orchestrator.Stop()
return s.db.Close()
}
diff --git a/docs/guide/configuration.md b/docs/guide/configuration.md
index c229f51d..a7d4f49c 100644
--- a/docs/guide/configuration.md
+++ b/docs/guide/configuration.md
@@ -23,9 +23,64 @@ Environment variables can also be defined in a `.env` or `.env.local` file in th
| http.port
HTTP_PORT,PORT | Port to listen to | 8080 |
| http.secure
HTTP_SECURE | Wether or not the web server is served over https. If omitted, determine this information from the `EXPOSED_ON` variable. It controls wether or not cookie are set with the `Secure` flag and the scheme used on the `Location` header of created resources | false |
| http.secret
HTTP_SECRET | Secret key to use when signing cookies | <generated if empty> |
-| runners.poll_interval
RUNNERS_POLL_INTERVAL | Interval at which [background jobs](/reference/jobs) are picked. Should be parsable by [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) | 4s |
-| runners.deployment
RUNNERS_DEPLOYMENT_COUNT | How many deployment jobs could be run simultaneously | 4 |
-| runners.cleanup
RUNNERS_CLEANUP_COUNT | How many cleanup jobs could be run simultaneously | 2 |
+| runners
RUNNERS_CONFIGURATION | Array of runners configuration, [see below](#runners) for more information | |
| -
ADMIN_EMAIL | Email of the first user account to create (mandatory if no user account exists yet) | |
| -
ADMIN_PASSWORD | Password of the first user account to create (mandatory if no user account exists yet) | |
| -
EXPOSED_ON | Url at which the seelf container [will be exposed](/guide/installation#exposing-seelf) and default target url. In the form `://@` | |
+
+### Runners configuration {#runners}
+
+::: warning
+Since version `2.5.0`, the runners configuration has changed. **seelf** will try its best to understand your configuration, but you **should** use the newer format:
+
+```yml
+runners:
+ poll_interval: 4s // [!code --]
+ deployment: 4 // [!code --]
+ cleanup: 2 // [!code --]
+ - poll_interval: 4s // [!code ++]
+ count: 4 // [!code ++]
+ jobs: // [!code ++]
+ - deployment.command.deploy // [!code ++]
+ - poll_interval: 4s // [!code ++]
+ count: 2 // [!code ++]
+```
+
+:::
+
+You have full control over how [background jobs](/reference/jobs) are processed as long as each application job is at least handled by one runner. Each **runner** will run in parallel and poll for jobs assigned to it, passing them to their respective **workers**.
+
+The goal of this configuration is to tell **seelf** how many runners should live to treat each job and at which rate they poll for new ones. With this level of configuration, you can make seelf suit your particular infrastructure.
+
+| yaml path | Description |
+| ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| poll_interval | Interval at which [background jobs](/reference/jobs) are picked. Should be parsable by [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) |
+| count | Number of workers, which means how many parallel jobs are allowed inside this runner |
+| jobs | Array of job names handled by this runner. The last runner definition could leave it empty to handle all jobs not already handled), should be one of `deployment.command.deploy`, `deployment.command.cleanup_app`, `deployment.command.configure_target`, `deployment.command.cleanup_target` |
+
+::: info Configuring runners using the environment variable
+You can also use the `RUNNERS_CONFIGURATION` variable to configure runners using the format `;;` with runners separated by `|`.
+
+Here is an example representing the default configuration:
+
+```sh
+RUNNERS_CONFIGURATION="4s;4;deployment.command.deploy|4s;2;" seelf serve
+```
+
+:::
+
+If not specified, the default configuration is as follow:
+
+```yml
+runners:
+ - poll_interval: 4s
+ count: 4
+ jobs:
+ - deployment.command.deploy
+ - poll_interval: 4s
+ count: 2
+```
+
+Meaning **deployment jobs** will be processed by **4** workers (hence deploying a maximum of **4** apps at a time), checking every **4 seconds**, and every other jobs will be handled by **2** workers.
+
+With this in place, deployment jobs will not prevent **non-related jobs** to be processed.
diff --git a/docs/reference/jobs.md b/docs/reference/jobs.md
index 4e21a8f5..d04a09e2 100644
--- a/docs/reference/jobs.md
+++ b/docs/reference/jobs.md
@@ -5,13 +5,19 @@ A number of **background tasks** are managed by **seelf**. You can check the job
For the vast majority of cases, you may never have to look at them as they are processed without issues.
::: info
-By default, **jobs in error** state are retried every **15 seconds**. This is because some errors (such as the `target_configuration_in_progress`) are expected and will delay the job.
+Some jobs can be delayed if some pre-conditions are not met. For example, when deploying an application, if the [target](/reference/targets) is being configured, the deploy job will be postponed.
:::
+## Retrying
+
+Sometimes, things can go wrong.
+
+Jobs in error can be retried manually by pressing the **retry button**.
+
## Cancellation
-Since a target on which you have, in the past, successfully deployed something can be destroyed from your side, **seelf** provides the ability to **cancel some tasks**.
+Since a target on which you have, in the past, successfully deployed something can be destroyed from your side, **seelf** provides the ability to **dismiss some tasks**.
-From the **seelf** perspective, it has effectively deployed something and when, for example, deleting an application, **seelf** will queue a cleanup job which cannot succeed because a target is not reachable anymore.
+From the **seelf** perspective, it has effectively deployed something and when, for example, deleting an application, **seelf** will queue a cleanup job which cannot succeed because a target may not be reachable anymore.
-For that **particular case**, you can press the **cancel button** on a job to allow the deletion to proceed without cleaning up resources.
+For that **particular case**, you can press the **dismiss button** on a job to allow the deletion to proceed without cleaning up resources.
diff --git a/pkg/assert/assert.go b/pkg/assert/assert.go
index dcc54ee1..fbb32896 100644
--- a/pkg/assert/assert.go
+++ b/pkg/assert/assert.go
@@ -1,11 +1,13 @@
package assert
import (
+ "cmp"
"errors"
"fmt"
"os"
"reflect"
"regexp"
+ "slices"
"testing"
"unicode/utf8"
@@ -59,6 +61,28 @@ func Equal[T comparable](t testing.TB, expected, actual T, formatAndMessage ...a
failed(t, "should have been equal", expected, actual, formatAndMessage)
}
+// Assets that the given slices contains the same elements, not necessarily in the same order.
+func ArrayEqual[T cmp.Ordered](t testing.TB, expected, actual []T, formatAndMessage ...any) {
+ ArrayEqualFunc(t, expected, actual, cmp.Compare, formatAndMessage...)
+}
+
+// Same as ArrayEqual but for elements not implementing the cmp.Ordered.
+func ArrayEqualFunc[T any](t testing.TB, expected, actual []T, equal func(T, T) int, formatAndMessage ...any) {
+ if len(expected) != len(actual) {
+ failed(t, "should have same length", len(expected), len(actual), formatAndMessage)
+ return
+ }
+
+ slices.SortFunc(expected, equal)
+ slices.SortFunc(actual, equal)
+
+ if reflect.DeepEqual(expected, actual) {
+ return
+ }
+
+ failed(t, "should have been equal", expected, actual, formatAndMessage)
+}
+
// Asserts that the given values are not equal
func NotEqual[T comparable](t testing.TB, expected, actual T, formatAndMessage ...any) {
if expected != actual {
diff --git a/pkg/assert/assert_test.go b/pkg/assert/assert_test.go
index 2b07e50b..c50f8f86 100644
--- a/pkg/assert/assert_test.go
+++ b/pkg/assert/assert_test.go
@@ -130,6 +130,120 @@ false`)
})
}
+func Test_ArrayEqual(t *testing.T) {
+ t.Run("should correctly fail given wrong number of elements", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqual(mock, []int{1, 2, 3}, []int{1, 2}, "with wrong number of elements")
+
+ shouldHaveFailed(t, mock, `should have same length - with wrong number of elements
+ expected:
+3
+
+ got:
+2`)
+ })
+
+ t.Run("should correctly fail given different elements", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqual(mock, []int{1, 2}, []int{3, 4}, "with different elements")
+
+ shouldHaveFailed(t, mock, `should have been equal - with different elements
+ expected:
+[]int{1, 2}
+
+ got:
+[]int{3, 4}`)
+ })
+
+ t.Run("should correctly pass given same elements in different order", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqual(mock, []int{1, 2}, []int{2, 1}, "with same elements in different order")
+
+ shouldHaveSucceeded(t, mock)
+ })
+
+ t.Run("should correctly pass given same elements", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqual(mock, []int{1, 2}, []int{1, 2}, "with same elements")
+
+ shouldHaveSucceeded(t, mock)
+ })
+
+ t.Run("should correctly pass given empty slices", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqual(mock, []int{}, []int{}, "with empty slices")
+
+ shouldHaveSucceeded(t, mock)
+ })
+}
+
+func Test_ArrayEqualFunc(t *testing.T) {
+ type customType struct {
+ value int
+ }
+
+ compareFunc := func(a, b customType) int {
+ return a.value - b.value
+ }
+
+ t.Run("should correctly fail given different length slices", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqualFunc(mock,
+ []customType{{1}, {2}}, []customType{{1}},
+ compareFunc, "with different lengths")
+
+ shouldHaveFailed(t, mock, `should have same length - with different lengths
+ expected:
+2
+
+ got:
+1`)
+ })
+
+ t.Run("should correctly fail given different slices", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqualFunc(mock, []customType{{1}, {2}}, []customType{{1}, {3}}, compareFunc, "with different values")
+
+ shouldHaveFailed(t, mock, `should have been equal - with different values
+ expected:
+[]assert_test.customType{assert_test.customType{value:1}, assert_test.customType{value:2}}
+
+ got:
+[]assert_test.customType{assert_test.customType{value:1}, assert_test.customType{value:3}}`)
+ })
+
+ t.Run("should correctly pass given same slices in different order", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqualFunc(mock, []customType{{2}, {1}}, []customType{{1}, {2}}, compareFunc, "with same values in different order")
+
+ shouldHaveSucceeded(t, mock)
+ })
+
+ t.Run("should correctly pass given same slices", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqualFunc(mock, []customType{{1}, {2}}, []customType{{1}, {2}}, compareFunc, "with same values")
+
+ shouldHaveSucceeded(t, mock)
+ })
+
+ t.Run("should correctly pass given empty slices", func(t *testing.T) {
+ mock := new(mockT)
+
+ assert.ArrayEqualFunc(mock, []customType{}, []customType{}, compareFunc, "with empty slices")
+
+ shouldHaveSucceeded(t, mock)
+ })
+}
+
func Test_NotEqual(t *testing.T) {
t.Run("should correctly fail given the expected value", func(t *testing.T) {
mock := new(mockT)
diff --git a/pkg/bus/embedded/orchestrator.go b/pkg/bus/embedded/orchestrator.go
new file mode 100644
index 00000000..747838fe
--- /dev/null
+++ b/pkg/bus/embedded/orchestrator.go
@@ -0,0 +1,104 @@
+package embedded
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/YuukanOO/seelf/pkg/bus"
+ "github.com/YuukanOO/seelf/pkg/log"
+)
+
+type (
+ Job interface {
+ ID() string
+ Command() bus.AsyncRequest
+ }
+
+ // Should be raised by the store when a job has been dismissed by a user.
+ JobDismissed struct {
+ bus.Notification
+
+ ID string
+ Command bus.AsyncRequest
+ }
+
+ JobsStore interface {
+ GetNextPendingJobs(context.Context, ...string) ([]Job, error)
+ Failed(context.Context, Job, error) error
+ Delay(context.Context, Job) error
+ Done(context.Context, Job) error
+ }
+
+ // Manage multiple embedded runners.
+ Orchestrator struct {
+ started bool
+ store JobsStore
+ dispatcher bus.Dispatcher
+ logger log.Logger
+ runners []*runner
+ }
+
+ RunnerDefinition struct {
+ PollInterval time.Duration // Interval at which messages are polled from the store
+ WorkersCount uint8 // Number of workers to process incoming messages
+ Messages []bus.AsyncRequest // List of messages types processed by this runner
+ }
+)
+
+func (JobDismissed) Name_() string { return "bus.event.job_dismissed" }
+
+func NewOrchestrator(
+ store JobsStore,
+ dispatcher bus.Dispatcher,
+ logger log.Logger,
+ definitions ...RunnerDefinition,
+) *Orchestrator {
+ o := &Orchestrator{
+ store: store,
+ dispatcher: dispatcher,
+ logger: logger,
+ runners: make([]*runner, len(definitions)),
+ }
+
+ for i, def := range definitions {
+ o.runners[i] = newRunner(o, def)
+ }
+
+ return o
+}
+
+func (o *Orchestrator) Start() {
+ if o.started {
+ return
+ }
+
+ o.started = true
+
+ o.logger.Debug("starting background services")
+
+ for _, r := range o.runners {
+ go r.start()
+ }
+}
+
+func (o *Orchestrator) Stop() {
+ if !o.started {
+ return
+ }
+
+ var wg sync.WaitGroup
+
+ o.logger.Info("waiting for background services to complete")
+
+ for _, r := range o.runners {
+ wg.Add(1)
+ go func(r *runner) {
+ defer wg.Done()
+ r.stop()
+ }(r)
+ }
+
+ wg.Wait()
+ o.started = false
+}
diff --git a/pkg/bus/embedded/runner.go b/pkg/bus/embedded/runner.go
index f2c95e37..af440f6e 100644
--- a/pkg/bus/embedded/runner.go
+++ b/pkg/bus/embedded/runner.go
@@ -6,194 +6,120 @@ import (
"time"
"github.com/YuukanOO/seelf/pkg/bus"
- "github.com/YuukanOO/seelf/pkg/log"
)
type (
- Job interface {
- ID() string
- Command() bus.AsyncRequest
- }
-
- // Should be raised by the store when a job has been dismissed by a user.
- JobDismissed struct {
- bus.Notification
-
- ID string
- Command bus.AsyncRequest
- }
-
- JobsStore interface {
- GetNextPendingJobs(context.Context) ([]Job, error)
- Failed(context.Context, Job, error) error
- Delay(context.Context, Job) error
- Done(context.Context, Job) error
- }
- Runner struct {
- dispatcher bus.Dispatcher
- pollInterval time.Duration
- started bool
- store JobsStore
- logger log.Logger
- done chan struct{}
- exitGroup sync.WaitGroup
- groups []*workerGroup
- messageNameToWorkerIdx map[string]int
- }
-
- workerGroup struct {
- jobs chan Job
- size int
- }
-
- // Represents a worker group configuration used by a scheduler to spawn the appropriate
- // workers.
- WorkerGroup struct {
- Size int // Number of workers to start
- Requests []bus.AsyncRequest // List of message types to handle
+ // In-process runner which process commands in specific worker groups using
+ // goroutines.
+ runner struct {
+ orchestrator *Orchestrator
+ pollInterval time.Duration
+ messages []string
+ workersCount uint8
+ exitGroup sync.WaitGroup
+ jobs chan Job
+ done chan struct{}
}
)
-func (JobDismissed) Name_() string { return "bus.event.job_dismissed" }
-
-// In-process runner which process commands in specific worker groups using
-// goroutines.
-func NewRunner(
- store JobsStore,
- logger log.Logger,
- dispatcher bus.Dispatcher,
- pollInterval time.Duration,
- groups ...WorkerGroup,
-) *Runner {
- s := &Runner{
- dispatcher: dispatcher,
- pollInterval: pollInterval,
- store: store,
- logger: logger,
- groups: make([]*workerGroup, len(groups)),
- messageNameToWorkerIdx: make(map[string]int),
+func newRunner(
+ orchestrator *Orchestrator,
+ definition RunnerDefinition,
+) *runner {
+ s := &runner{
+ orchestrator: orchestrator,
+ pollInterval: definition.PollInterval,
+ workersCount: definition.WorkersCount,
}
- for i, g := range groups {
- // Should always have at least one worker
- if g.Size < 1 {
- g.Size = 1
- }
+ // Should always have at least one worker
+ if s.workersCount < 1 {
+ s.workersCount = 1
+ }
- s.groups[i] = &workerGroup{
- size: g.Size,
- }
+ s.messages = make([]string, len(definition.Messages))
- for _, msg := range g.Requests {
- s.messageNameToWorkerIdx[msg.Name_()] = i
- }
+ for i, msg := range definition.Messages {
+ s.messages[i] = msg.Name_()
}
return s
}
-func (s *Runner) Start() {
- if s.started {
- return
- }
-
- s.started = true
-
- s.done = make(chan struct{}, 1)
+func (s *runner) start() {
+ s.done = make(chan struct{})
+ s.jobs = make(chan Job)
- for _, g := range s.groups {
- g.jobs = make(chan Job)
- }
+ s.orchestrator.logger.Debugw("starting runner",
+ "poll", s.pollInterval,
+ "workers", s.workersCount,
+ "messages", s.messages)
- s.startGroupRunners()
- s.startPolling()
+ s.startWorkers()
+ go s.startPolling()
}
-func (s *Runner) Stop() {
- if !s.started {
- return
- }
-
- s.started = false
-
- s.logger.Info("waiting for current jobs to finish")
+func (s *runner) stop() {
+ s.orchestrator.logger.Debug("waiting for current jobs to finish")
close(s.done)
-
- for _, j := range s.groups {
- close(j.jobs)
- }
+ close(s.jobs)
s.exitGroup.Wait()
}
-// Tiny helper to run a function in a goroutine and keep track of done channels.
-func (s *Runner) run(fn func()) {
+func (s *runner) startPolling() {
s.exitGroup.Add(1)
- go func() {
- defer s.exitGroup.Done()
- fn()
- }()
-}
-
-func (s *Runner) startPolling() {
- s.run(func() {
- var (
- delay time.Duration
- lastRun time.Time = time.Now()
- )
+ defer s.exitGroup.Done()
- for {
- delay = s.pollInterval - time.Since(lastRun)
-
- select {
- case <-s.done:
- return
- case <-time.After(delay):
- }
+ var (
+ delay time.Duration
+ lastRun time.Time = time.Now()
+ )
- lastRun = time.Now()
+ for {
+ delay = s.pollInterval - time.Since(lastRun)
- jobs, err := s.store.GetNextPendingJobs(context.Background())
+ select {
+ case <-s.done:
+ return
+ case <-time.After(delay):
+ }
- if err != nil {
- s.logger.Errorw("error while retrieving pending jobs",
- "error", err)
- continue
- }
+ lastRun = time.Now()
- for _, job := range jobs {
- idx, handled := s.messageNameToWorkerIdx[job.Command().Name_()]
+ jobs, err := s.orchestrator.store.GetNextPendingJobs(context.Background(), s.messages...)
- if !handled {
- s.handleJobReturn(context.Background(), job, bus.AsyncResultProcessed, bus.ErrNoHandlerRegistered)
- continue
- }
+ if err != nil {
+ s.orchestrator.logger.Errorw("error while retrieving pending jobs",
+ "error", err)
+ continue
+ }
- s.groups[idx].jobs <- job
- }
+ for _, job := range jobs {
+ s.jobs <- job
}
- })
+ }
}
-func (s *Runner) startGroupRunners() {
- for _, g := range s.groups {
- group := g
- for i := 0; i < group.size; i++ {
- s.run(func() {
- for job := range group.jobs {
- ctx := context.Background()
- result, err := bus.Send(s.dispatcher, ctx, job.Command())
-
- s.handleJobReturn(ctx, job, result, err)
- }
- })
- }
+func (s *runner) startWorkers() {
+ for range s.workersCount {
+ s.exitGroup.Add(1)
+ go func() {
+ defer s.exitGroup.Done()
+
+ for job := range s.jobs {
+ ctx := context.Background()
+ result, err := bus.Send(s.orchestrator.dispatcher, ctx, job.Command())
+
+ s.handleJobReturn(ctx, job, result, err)
+ }
+ }()
}
}
-func (s *Runner) handleJobReturn(ctx context.Context, job Job, result bus.AsyncResult, err error) {
+func (s *runner) handleJobReturn(ctx context.Context, job Job, result bus.AsyncResult, err error) {
var storeErr error
defer func() {
@@ -201,7 +127,7 @@ func (s *Runner) handleJobReturn(ctx context.Context, job Job, result bus.AsyncR
return
}
- s.logger.Errorw("error while updating job status",
+ s.orchestrator.logger.Errorw("error while updating job status",
"job", job.ID(),
"name", job.Command().Name_(),
"error", storeErr,
@@ -209,8 +135,8 @@ func (s *Runner) handleJobReturn(ctx context.Context, job Job, result bus.AsyncR
}()
if err != nil {
- storeErr = s.store.Failed(ctx, job, err)
- s.logger.Errorw("error while processing job",
+ storeErr = s.orchestrator.store.Failed(ctx, job, err)
+ s.orchestrator.logger.Errorw("error while processing job",
"job", job.ID(),
"name", job.Command().Name_(),
"error", err,
@@ -219,9 +145,9 @@ func (s *Runner) handleJobReturn(ctx context.Context, job Job, result bus.AsyncR
}
if result == bus.AsyncResultDelay {
- storeErr = s.store.Delay(ctx, job)
+ storeErr = s.orchestrator.store.Delay(ctx, job)
return
}
- storeErr = s.store.Done(ctx, job)
+ storeErr = s.orchestrator.store.Done(ctx, job)
}
diff --git a/pkg/bus/embedded/runner_test.go b/pkg/bus/embedded/runner_test.go
index 7cec0684..3cc215b4 100644
--- a/pkg/bus/embedded/runner_test.go
+++ b/pkg/bus/embedded/runner_test.go
@@ -3,6 +3,7 @@ package embedded_test
import (
"context"
"errors"
+ "slices"
"sync"
"testing"
@@ -17,28 +18,28 @@ import (
func Test_Runner(t *testing.T) {
logger := must.Panic(log.NewLogger())
b := embedded.NewBus()
- // Register an handler which will just return the inner cmd error to test how the scheduler behave.
+ // Register an handler which will just return the inner cmd error to test how runners behave.
bus.Register(b, func(_ context.Context, cmd returnCommand) (bus.AsyncResult, error) {
return cmd.result, cmd.err
})
- t.Run("should fail the job if no handler is registered", func(t *testing.T) {
+ t.Run("should handle all jobs if no message types are given", func(t *testing.T) {
var (
adapter adapter
cmd bus.AsyncRequest = unhandledCommand{}
)
- runner := embedded.NewRunner(&adapter, logger, b, 0, embedded.WorkerGroup{
- Requests: []bus.AsyncRequest{returnCommand{}},
+ assert.Nil(t, adapter.Queue(context.Background(), cmd))
+
+ runner := embedded.NewOrchestrator(&adapter, b, logger, embedded.RunnerDefinition{
+ PollInterval: 0,
+ WorkersCount: 1,
})
runner.Start()
defer runner.Stop()
- assert.Nil(t, adapter.Queue(context.Background(), cmd))
-
adapter.wait()
-
assert.HasLength(t, 0, adapter.done)
assert.HasLength(t, 0, adapter.delayed)
assert.HasLength(t, 1, adapter.failed)
@@ -53,15 +54,17 @@ func Test_Runner(t *testing.T) {
cmd bus.AsyncRequest = returnCommand{err: jobErr}
)
- runner := embedded.NewRunner(&adapter, logger, b, 0, embedded.WorkerGroup{
- Requests: []bus.AsyncRequest{returnCommand{}},
+ assert.Nil(t, adapter.Queue(context.Background(), cmd))
+
+ runner := embedded.NewOrchestrator(&adapter, b, logger, embedded.RunnerDefinition{
+ PollInterval: 0,
+ WorkersCount: 1,
+ Messages: []bus.AsyncRequest{returnCommand{}},
})
runner.Start()
defer runner.Stop()
- assert.Nil(t, adapter.Queue(context.Background(), cmd))
-
adapter.wait()
assert.HasLength(t, 0, adapter.done)
@@ -77,17 +80,18 @@ func Test_Runner(t *testing.T) {
cmd bus.AsyncRequest = returnCommand{result: bus.AsyncResultDelay}
)
- runner := embedded.NewRunner(&adapter, logger, b, 0, embedded.WorkerGroup{
- Requests: []bus.AsyncRequest{returnCommand{}},
+ assert.Nil(t, adapter.Queue(context.Background(), cmd))
+
+ runner := embedded.NewOrchestrator(&adapter, b, logger, embedded.RunnerDefinition{
+ PollInterval: 0,
+ WorkersCount: 1,
+ Messages: []bus.AsyncRequest{returnCommand{}},
})
runner.Start()
defer runner.Stop()
- assert.Nil(t, adapter.Queue(context.Background(), cmd))
-
adapter.wait()
-
assert.HasLength(t, 0, adapter.done)
assert.HasLength(t, 1, adapter.delayed)
assert.HasLength(t, 0, adapter.failed)
@@ -101,17 +105,18 @@ func Test_Runner(t *testing.T) {
cmd bus.AsyncRequest = returnCommand{}
)
- runner := embedded.NewRunner(&adapter, logger, b, 0, embedded.WorkerGroup{
- Requests: []bus.AsyncRequest{returnCommand{}},
+ assert.Nil(t, adapter.Queue(context.Background(), cmd))
+
+ runner := embedded.NewOrchestrator(&adapter, b, logger, embedded.RunnerDefinition{
+ PollInterval: 0,
+ WorkersCount: 1,
+ Messages: []bus.AsyncRequest{returnCommand{}},
})
runner.Start()
defer runner.Stop()
- assert.Nil(t, adapter.Queue(context.Background(), cmd))
-
adapter.wait()
-
assert.HasLength(t, 1, adapter.done)
assert.HasLength(t, 0, adapter.delayed)
assert.HasLength(t, 0, adapter.failed)
@@ -188,14 +193,18 @@ func (a *adapter) Failed(ctx context.Context, j embedded.Job, jobErr error) erro
return nil
}
-func (a *adapter) GetNextPendingJobs(context.Context) ([]embedded.Job, error) {
- j := make([]embedded.Job, len(a.jobs))
+func (a *adapter) GetNextPendingJobs(_ context.Context, messageNames ...string) ([]embedded.Job, error) {
+ var j []embedded.Job
- for i, job := range a.jobs {
- j[i] = job
- }
+ for i := 0; i < len(a.jobs); i++ {
+ if len(messageNames) > 0 && !slices.Contains(messageNames, a.jobs[i].Command().Name_()) {
+ continue
+ }
- a.jobs = nil
+ j = append(j, a.jobs[i])
+ a.jobs = append(a.jobs[:i], a.jobs[i+1:]...)
+ i--
+ }
return j, nil
}
diff --git a/pkg/bus/sqlite/jobs.go b/pkg/bus/sqlite/jobs.go
index 81c1085b..2e05827e 100644
--- a/pkg/bus/sqlite/jobs.go
+++ b/pkg/bus/sqlite/jobs.go
@@ -152,7 +152,7 @@ func (s *JobsStore) Queue(
return nil
}
-func (s *JobsStore) GetNextPendingJobs(ctx context.Context) ([]embedded.Job, error) {
+func (s *JobsStore) GetNextPendingJobs(ctx context.Context, messageNames ...string) ([]embedded.Job, error) {
// This query will lock the database to make sure we can't retrieved the same job twice.
return builder.
Query[embedded.Job](`
@@ -161,7 +161,9 @@ func (s *JobsStore) GetNextPendingJobs(ctx context.Context) ([]embedded.Job, err
WHERE id IN (SELECT id FROM (
SELECT id, MIN(not_before) FROM [scheduler.scheduled_jobs] sj
WHERE
- sj.retrieved = false
+ sj.retrieved = false`).
+ S(builder.Array("AND sj.message_name IN", messageNames)).
+ F(`
AND sj.errcode IS NULL
AND sj.not_before <= DATETIME('now')
AND sj.[group] NOT IN (SELECT DISTINCT [group] FROM [scheduler.scheduled_jobs] WHERE retrieved = true)
diff --git a/pkg/config/loader.go b/pkg/config/loader.go
index 8d144c12..e0fe2fc5 100644
--- a/pkg/config/loader.go
+++ b/pkg/config/loader.go
@@ -12,30 +12,26 @@ import (
"gopkg.in/yaml.v3"
)
-var dotenvFilenames = []string{".env", ".env.local"}
-
-// Load the configuration into the target from a yaml file and environment variables.
-//
-// The boolean returned is true if the config file has been found, false otherwise.
-//
-// It will look for dotenv files in the current directory. If no dotenvFiles are given,
-// default ones will be used: .env and .env.local.
-// target can implement the Processable interface to do any stuff after the config has been loaded.
-func Load(configFilePath string, target any, dotenvFiles ...string) (exists bool, err error) {
- if exists, err = loadFromYaml(configFilePath, target); err != nil {
- return
- }
+var ErrNoLoadersGiven = errors.New("no loaders given")
+
+type Loader func(any) error
- if len(dotenvFiles) == 0 {
- dotenvFiles = dotenvFilenames
+// Load the configuration into the target using given loaders.
+func Load(target any, loaders ...Loader) error {
+ if len(loaders) == 0 {
+ return ErrNoLoadersGiven
}
- err = loadFromEnvironment(dotenvFiles, target)
+ for _, loader := range loaders {
+ if err := loader(target); err != nil {
+ return err
+ }
+ }
- return
+ return nil
}
-// Save the given config data in the given file path.
+// Save the given config data in the given yaml file.
func Save(configFilePath string, data any) error {
b, err := yaml.Marshal(data)
@@ -46,27 +42,42 @@ func Save(configFilePath string, data any) error {
return ostools.WriteFile(configFilePath, b)
}
-func loadFromYaml(path string, target any) (bool, error) {
- if _, err := os.Stat(path); errors.Is(err, fs.ErrNotExist) {
- return false, nil
- }
+// Load the configuration from the given yaml file.
+// Update the found argument to true if the file was found.
+func FromYAML(path string, found *bool) Loader {
+ return func(target any) error {
+ if _, err := os.Stat(path); err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ *found = false
+ return nil
+ }
- data, err := os.ReadFile(path)
+ return err
+ }
- if err != nil {
- return true, err
- }
+ *found = true
- return true, yaml.Unmarshal(data, target)
-}
+ data, err := os.ReadFile(path)
-func loadFromEnvironment(filenames []string, target any) error {
- for _, filename := range filenames {
- if err := godotenv.Load(filename); err != nil && !os.IsNotExist(err) {
+ if err != nil {
return err
}
+
+ return yaml.Unmarshal(data, target)
}
+}
- _, err := nenv.UnmarshalFromEnviron(target)
- return err
+// Load the configuration from environment variables, trying to read given
+// .env filenames before.
+func FromEnvironment(dotEnvFilenames ...string) Loader {
+ return func(target any) error {
+ for _, filename := range dotEnvFilenames {
+ if err := godotenv.Load(filename); err != nil && !os.IsNotExist(err) {
+ return err
+ }
+ }
+
+ _, err := nenv.UnmarshalFromEnviron(target)
+ return err
+ }
}
diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go
index 0adff6d3..1727791f 100644
--- a/pkg/config/loader_test.go
+++ b/pkg/config/loader_test.go
@@ -1,6 +1,7 @@
package config_test
import (
+ "errors"
"fmt"
"os"
"testing"
@@ -33,127 +34,159 @@ type (
)
func Test_Load(t *testing.T) {
- // Since for some tests, the monad has the initial value set to true but the
- // env removes it (setting the monad hasValue to false but keeping the initial value)
- unsetMonad := monad.Value(true)
- unsetMonad.Unset()
-
- tests := []struct {
- name string
- conf string
- env string
- expected configuration
- }{
- {
- name: "configuration-only",
- conf: `verbose: true
+ t.Run("should returns an error if no loaders given", func(t *testing.T) {
+ var configuration configuration
+
+ err := config.Load(&configuration)
+
+ assert.ErrorIs(t, config.ErrNoLoadersGiven, err)
+ })
+
+ t.Run("should correctly returns a loader error", func(t *testing.T) {
+ var configuration configuration
+ expectedErr := errors.New("expected error")
+
+ err := config.Load(&configuration, func(a any) error {
+ return expectedErr
+ })
+
+ assert.ErrorIs(t, expectedErr, err)
+ })
+
+ t.Run("should ignore inexistent files from yaml and env loaders", func(t *testing.T) {
+ var (
+ configuration configuration
+ existing bool
+ )
+
+ err := config.Load(&configuration, config.FromYAML("inexistent.yaml", &existing), config.FromEnvironment("inexistent.env"))
+
+ assert.Nil(t, err)
+ assert.False(t, existing)
+ })
+
+ t.Run("should correctly load the configuration from multiple loaders", func(t *testing.T) {
+ // Since for some tests, the monad has the initial value set to true but the
+ // env removes it (setting the monad hasValue to false but keeping the initial value)
+ unsetMonad := monad.Value(true)
+ unsetMonad.Unset()
+
+ tests := []struct {
+ name string
+ conf string
+ env string
+ expected configuration
+ }{
+ {
+ name: "configuration-only",
+ conf: `verbose: true
http:
host: 192.168.1.1
port: 7777`,
- expected: configuration{
- Verbose: true,
- Http: httpConfiguration{
- Host: "192.168.1.1",
- Port: 7777,
+ expected: configuration{
+ Verbose: true,
+ Http: httpConfiguration{
+ Host: "192.168.1.1",
+ Port: 7777,
+ },
},
},
- },
- {
- name: "configuration-and-env",
- conf: `verbose: true
+ {
+ name: "configuration-and-env",
+ conf: `verbose: true
http:
host: 192.168.1.1
port: 7777`,
- env: `BALANCER_DOMAIN=https://some.domain
+ env: `BALANCER_DOMAIN=https://some.domain
ACME_EMAIL=admin@example.com
PORT=9999`,
- expected: configuration{
- Verbose: true,
- Http: httpConfiguration{
- Host: "192.168.1.1",
- Port: 9999,
- },
- Balancer: balancerConfiguration{
- Domain: "https://some.domain",
- AcmeEmail: "admin@example.com",
+ expected: configuration{
+ Verbose: true,
+ Http: httpConfiguration{
+ Host: "192.168.1.1",
+ Port: 9999,
+ },
+ Balancer: balancerConfiguration{
+ Domain: "https://some.domain",
+ AcmeEmail: "admin@example.com",
+ },
},
},
- },
- {
- name: "env-only",
- env: `BALANCER_DOMAIN=https://some.domain
+ {
+ name: "env-only",
+ env: `BALANCER_DOMAIN=https://some.domain
ACME_EMAIL=admin@example.com
PORT=9999`,
- expected: configuration{
- Http: httpConfiguration{
- Port: 9999,
- },
- Balancer: balancerConfiguration{
- Domain: "https://some.domain",
- AcmeEmail: "admin@example.com",
+ expected: configuration{
+ Http: httpConfiguration{
+ Port: 9999,
+ },
+ Balancer: balancerConfiguration{
+ Domain: "https://some.domain",
+ AcmeEmail: "admin@example.com",
+ },
},
},
- },
- {
- name: "conf-with-maybe",
- conf: `http:
+ {
+ name: "conf-with-maybe",
+ conf: `http:
secure: true
http_two: false`,
- expected: configuration{
- Http: httpConfiguration{Secure: monad.Value(true), HttpTwo: monad.Value(false)},
+ expected: configuration{
+ Http: httpConfiguration{Secure: monad.Value(true), HttpTwo: monad.Value(false)},
+ },
},
- },
- {
- name: "env-with-maybe",
- env: `HTTP_SECURE=true
+ {
+ name: "env-with-maybe",
+ env: `HTTP_SECURE=true
HTTP_TWO=false`,
- expected: configuration{
- Http: httpConfiguration{Secure: monad.Value(true), HttpTwo: monad.Value(false)},
+ expected: configuration{
+ Http: httpConfiguration{Secure: monad.Value(true), HttpTwo: monad.Value(false)},
+ },
},
- },
- {
- name: "conf-and-env-with-maybe",
- conf: `http:
+ {
+ name: "conf-and-env-with-maybe",
+ conf: `http:
secure: true
http_two: false`,
- env: `HTTP_SECURE=
+ env: `HTTP_SECURE=
HTTP_TWO=true`,
- expected: configuration{
- Http: httpConfiguration{Secure: unsetMonad, HttpTwo: monad.Value(true)},
+ expected: configuration{
+ Http: httpConfiguration{Secure: unsetMonad, HttpTwo: monad.Value(true)},
+ },
},
- },
- }
+ }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- confFilename := fmt.Sprintf("%s.yml", tt.name)
- envFilename := fmt.Sprintf(".%s.env", tt.name)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ confFilename := fmt.Sprintf("%s.yml", tt.name)
+ envFilename := fmt.Sprintf(".%s.env", tt.name)
- t.Cleanup(func() {
- os.Remove(envFilename)
- os.Remove(confFilename)
- })
+ t.Cleanup(func() {
+ os.Remove(envFilename)
+ os.Remove(confFilename)
+ })
- os.Clearenv()
+ os.Clearenv()
- if tt.conf != "" {
- err := ostools.WriteFile(confFilename, []byte(tt.conf))
- assert.Nil(t, err)
- }
+ if tt.conf != "" {
+ assert.Nil(t, ostools.WriteFile(confFilename, []byte(tt.conf)))
+ }
- if tt.env != "" {
- err := ostools.WriteFile(envFilename, []byte(tt.env))
- assert.Nil(t, err)
- }
+ if tt.env != "" {
+ assert.Nil(t, ostools.WriteFile(envFilename, []byte(tt.env)))
+ }
- var conf configuration
+ var conf configuration
- exists, err := config.Load(confFilename, &conf, envFilename)
- assert.Nil(t, err)
- assert.Equal(t, tt.conf != "", exists)
- assert.DeepEqual(t, tt.expected, conf)
- })
- }
+ var exists bool
+ err := config.Load(&conf, config.FromYAML(confFilename, &exists), config.FromEnvironment(envFilename))
+ assert.Nil(t, err)
+ assert.Equal(t, tt.conf != "", exists)
+ assert.DeepEqual(t, tt.expected, conf)
+ })
+ }
+ })
}
func Test_Save(t *testing.T) {
diff --git a/pkg/http/helpers.go b/pkg/http/helpers.go
index c49c7b88..27afee29 100644
--- a/pkg/http/helpers.go
+++ b/pkg/http/helpers.go
@@ -8,6 +8,7 @@ import (
"github.com/YuukanOO/seelf/pkg/apperr"
"github.com/YuukanOO/seelf/pkg/log"
+ "github.com/YuukanOO/seelf/pkg/storage"
"github.com/gin-gonic/gin"
)
@@ -25,7 +26,7 @@ func Bind[TIn any](s Server, handler func(*gin.Context, TIn) error) gin.HandlerF
var cmd TIn
if err := ctx.ShouldBind(&cmd); err != nil {
- ctx.AbortWithError(http.StatusUnprocessableEntity, err)
+ _ = ctx.AbortWithError(http.StatusUnprocessableEntity, err)
return
}
@@ -86,7 +87,9 @@ func HandleError(s Server, ctx *gin.Context, err error) {
)
// Translates the error type to the appropriate HTTP status code
- if _, isAppErr := apperr.As[apperr.Error](err); isAppErr {
+ if errors.Is(err, storage.ErrConcurrencyUpdate) {
+ status = http.StatusConflict
+ } else if _, isAppErr := apperr.As[apperr.Error](err); isAppErr {
status = http.StatusBadRequest // Default to HTTP 400
data = err
@@ -97,7 +100,7 @@ func HandleError(s Server, ctx *gin.Context, err error) {
s.Logger().Errorw(err.Error(), "error", err)
}
- ctx.Error(err)
+ _ = ctx.Error(err)
ctx.AbortWithStatusJSON(status, data)
}
diff --git a/pkg/storage/discriminated.go b/pkg/storage/discriminated.go
index 8c87197b..a774a5cc 100644
--- a/pkg/storage/discriminated.go
+++ b/pkg/storage/discriminated.go
@@ -1,5 +1,10 @@
package storage
+import (
+ "maps"
+ "slices"
+)
+
type (
// Function used to map from a raw value to a discriminated type.
DiscriminatedMapperFunc[T any] func(string) (T, error)
@@ -53,3 +58,6 @@ func (m *DiscriminatedMapper[T]) From(discriminator, value string) (T, error) {
return mapper(value)
}
+
+// Retrieve the list of known keys available to the mapper.
+func (m *DiscriminatedMapper[T]) Keys() []string { return slices.Collect(maps.Keys(m.known)) }
diff --git a/pkg/storage/discriminated_test.go b/pkg/storage/discriminated_test.go
index dc9dc321..5df8ae2c 100644
--- a/pkg/storage/discriminated_test.go
+++ b/pkg/storage/discriminated_test.go
@@ -48,6 +48,10 @@ func Test_Discriminated(t *testing.T) {
assert.ErrorIs(t, err, storage.ErrCouldNotUnmarshalGivenType)
})
+ t.Run("should return registered keys", func(t *testing.T) {
+ assert.ArrayEqual(t, []string{"type1", "type2"}, mapper.Keys())
+ })
+
t.Run("should return the correct type", func(t *testing.T) {
t1, err := mapper.From("type1", "data1")
diff --git a/pkg/validate/arrays/arrays.go b/pkg/validate/arrays/arrays.go
new file mode 100644
index 00000000..7347caab
--- /dev/null
+++ b/pkg/validate/arrays/arrays.go
@@ -0,0 +1,12 @@
+package arrays
+
+import "github.com/YuukanOO/seelf/pkg/apperr"
+
+var ErrRequired = apperr.New("required")
+
+func Required[T any](value []T) error {
+ if len(value) == 0 {
+ return ErrRequired
+ }
+ return nil
+}
diff --git a/pkg/validate/arrays/arrays_test.go b/pkg/validate/arrays/arrays_test.go
new file mode 100644
index 00000000..e2991d55
--- /dev/null
+++ b/pkg/validate/arrays/arrays_test.go
@@ -0,0 +1,19 @@
+package arrays_test
+
+import (
+ "testing"
+
+ "github.com/YuukanOO/seelf/pkg/assert"
+ "github.com/YuukanOO/seelf/pkg/validate/arrays"
+)
+
+func Test_Required(t *testing.T) {
+ t.Run("should fail on empty arrays", func(t *testing.T) {
+ assert.ErrorIs(t, arrays.ErrRequired, arrays.Required([]string{}))
+ assert.ErrorIs(t, arrays.ErrRequired, arrays.Required[string](nil))
+ })
+
+ t.Run("should succeed on non-empty arrays", func(t *testing.T) {
+ assert.Nil(t, arrays.Required([]string{"good!"}))
+ })
+}
diff --git a/pkg/validate/validate.go b/pkg/validate/validate.go
index 491e9a63..c6b8398d 100644
--- a/pkg/validate/validate.go
+++ b/pkg/validate/validate.go
@@ -1,6 +1,7 @@
package validate
import (
+ "strconv"
"strings"
"github.com/YuukanOO/seelf/pkg/apperr"
@@ -34,7 +35,7 @@ func (e FieldErrors) Flatten() FieldErrors {
}
// Builds a new validation error with given invalid fields.
-// Wraps the FieldErrors inside the ErrValidationFailed.
+// Wraps the FieldErrors inside the ErrValidationFailed one.
func NewError(fieldErrs FieldErrors) error {
return apperr.Wrap(ErrValidationFailed, fieldErrs)
}
@@ -82,6 +83,23 @@ func Field[T any](value T, validators ...Validator[T]) error {
return nil
}
+// Validates an array of values using the given function.
+func Array[T any](values []T, fn func(T, int) error) error {
+ fieldErrs := make(FieldErrors)
+
+ for i, value := range values {
+ if err := fn(value, i); err != nil {
+ fieldErrs[strconv.Itoa(i)] = err
+ }
+ }
+
+ if len(fieldErrs) > 0 {
+ return fieldErrs
+ }
+
+ return nil
+}
+
// Validate object values by calling their factory and writing to the target in
// the same call. It makes it easy to validates and instantiates with one call.
func Value[TRaw, TTarget any](value TRaw, target *TTarget, factory func(TRaw) (TTarget, error)) error {
diff --git a/pkg/validate/validate_test.go b/pkg/validate/validate_test.go
index 35afcc34..0d3b5af9 100644
--- a/pkg/validate/validate_test.go
+++ b/pkg/validate/validate_test.go
@@ -119,6 +119,33 @@ func Test_Struct(t *testing.T) {
})
}
+func Test_Array(t *testing.T) {
+ t.Run("collects validation errors and returns field errors", func(t *testing.T) {
+ err := validate.Array([]string{"john", "doe"}, func(value string, index int) error {
+ if value == "doe" {
+ return errAlwaysFail
+ }
+
+ return nil
+ })
+
+ validationErr, ok := apperr.As[validate.FieldErrors](err)
+
+ assert.True(t, ok)
+ assert.DeepEqual(t, validate.FieldErrors{
+ "1": errAlwaysFail,
+ }, validationErr)
+ })
+
+ t.Run("returns nil if no error exists", func(t *testing.T) {
+ err := validate.Array([]string{"john", "doe"}, func(value string, index int) error {
+ return nil
+ })
+
+ assert.Nil(t, err)
+ })
+}
+
func Test_If(t *testing.T) {
t.Run("return the validation error only if expression is true", func(t *testing.T) {
err := validate.Struct(validate.Of{