From 4d43e9e5fa8bf7e47bd7d81c1253c22602d6615c Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Mon, 11 May 2026 23:41:08 +0700 Subject: [PATCH 1/2] Add Mattermost notifier via REST API --- .gitignore | 1 + cmd/main.go | 14 +++- internal/config/config.go | 21 ++++- internal/kube/kube.go | 6 +- internal/mattermost/mattermost.go | 134 ++++++++++++++++++++++++++++++ internal/notifier/notifier.go | 8 ++ 6 files changed, 174 insertions(+), 10 deletions(-) create mode 100644 internal/mattermost/mattermost.go create mode 100644 internal/notifier/notifier.go diff --git a/.gitignore b/.gitignore index 6b616b0..0eda732 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ !/config/config.yaml /.idea /.env +.vscode/settings.json diff --git a/cmd/main.go b/cmd/main.go index dff94cd..31d5b50 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,6 +12,8 @@ import ( "github.com/romus204/loggerator/internal/config" "github.com/romus204/loggerator/internal/kube" + "github.com/romus204/loggerator/internal/mattermost" + "github.com/romus204/loggerator/internal/notifier" "github.com/romus204/loggerator/internal/telegram" ) @@ -23,13 +25,19 @@ func main() { log.Fatal(err) } - bot := telegram.NewBot(ctx, config.Telegram) - kubeClient := kube.NewCubeClient(ctx, config.Kube, bot) + var n notifier.Notifier + if config.Mattermost.Token != "" { + n = mattermost.NewClient(ctx, config.Mattermost) + } else { + n = telegram.NewBot(ctx, config.Telegram) + } + + kubeClient := kube.NewCubeClient(ctx, config.Kube, n) wg := sync.WaitGroup{} kubeClient.Subscribe(&wg) - bot.StartSendWorker(&wg) + n.StartSendWorker(&wg) quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) diff --git a/internal/config/config.go b/internal/config/config.go index d140ff8..a59d9cc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,16 +11,24 @@ import ( ) type Config struct { - Telegram Telegram `yaml:"telegram"` // telegram chat id - Kube Kube `yaml:"kube"` // kube + Telegram Telegram `yaml:"telegram"` + Mattermost Mattermost `yaml:"mattermost"` + Kube Kube `yaml:"kube"` } type Telegram struct { - Token string `yaml:"token"` // telegram bot token - Chat int `yaml:"chat"` // cat ids + Token string `yaml:"token"` + Chat int `yaml:"chat"` Topics map[string]int `yaml:"topics"` } +type Mattermost struct { + ServerURL string `yaml:"server_url"` + Token string `yaml:"token"` + ChannelID string `yaml:"channel_id"` + Channels map[string]string `yaml:"channels"` +} + type Kube struct { Target []Target `yaml:"target"` // pod and containers name KubeConfig string `yaml:"config"` // path to kube config @@ -57,6 +65,11 @@ func (c *Config) processEnvVars() { // Process Telegram fields c.Telegram.Token = substituteEnvVars(c.Telegram.Token) + // Process Mattermost fields + c.Mattermost.ServerURL = substituteEnvVars(c.Mattermost.ServerURL) + c.Mattermost.Token = substituteEnvVars(c.Mattermost.Token) + c.Mattermost.ChannelID = substituteEnvVars(c.Mattermost.ChannelID) + // Process Kube fields c.Kube.KubeConfig = substituteEnvVars(c.Kube.KubeConfig) c.Kube.Namespace = substituteEnvVars(c.Kube.Namespace) diff --git a/internal/kube/kube.go b/internal/kube/kube.go index 17e16f6..4d6bf3b 100644 --- a/internal/kube/kube.go +++ b/internal/kube/kube.go @@ -13,7 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" "github.com/romus204/loggerator/internal/config" - "github.com/romus204/loggerator/internal/telegram" + "github.com/romus204/loggerator/internal/notifier" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -21,7 +21,7 @@ import ( type Kube struct { kubeClient *kubernetes.Clientset cfg config.Kube - bot *telegram.Telegram + bot notifier.Notifier ctx context.Context FilterRegex []*regexp.Regexp Replacements []Replacement @@ -37,7 +37,7 @@ type PodContainer struct { container []string } -func NewCubeClient(ctx context.Context, cfg config.Kube, bot *telegram.Telegram) *Kube { +func NewCubeClient(ctx context.Context, cfg config.Kube, bot notifier.Notifier) *Kube { clientset, err := kubernetes.NewForConfig(cfg.Rest) if err != nil { log.Fatalf("Error creating kubernetes client: %v", err) diff --git a/internal/mattermost/mattermost.go b/internal/mattermost/mattermost.go new file mode 100644 index 0000000..3d88701 --- /dev/null +++ b/internal/mattermost/mattermost.go @@ -0,0 +1,134 @@ +package mattermost + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/romus204/loggerator/internal/config" +) + +type postPayload struct { + ChannelID string `json:"channel_id"` + Message string `json:"message"` +} + +type sendRequest struct { + payload postPayload +} + +type Mattermost struct { + ctx context.Context + apiURL string + token string + channelID string + channels map[string]string + queue chan sendRequest + ticker *time.Ticker +} + +func NewClient(ctx context.Context, cfg config.Mattermost) *Mattermost { + return &Mattermost{ + ctx: ctx, + apiURL: cfg.ServerURL + "/api/v4/posts", + token: cfg.Token, + channelID: cfg.ChannelID, + channels: cfg.Channels, + queue: make(chan sendRequest, 1000), + ticker: time.NewTicker(time.Minute / 20), + } +} + +func (m *Mattermost) StartSendWorker(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + + for req := range m.queue { + <-m.ticker.C + m.sendToAPI(req.payload) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + <-m.ctx.Done() + if m.ticker != nil { + m.ticker.Stop() + } + close(m.queue) + }() +} + +func (m *Mattermost) Send(msg string, container string) { + var text string + if isJSON(msg) { + text = fmt.Sprintf("```json\n%s\n```", prettyPrintJSON(msg)) + } else { + text = fmt.Sprintf("```\n%s\n```", msg) + } + + channelID := m.channelID + if ch, ok := m.channels[container]; ok { + channelID = ch + } + + select { + case <-m.ctx.Done(): + return + case m.queue <- sendRequest{payload: postPayload{ChannelID: channelID, Message: text}}: + } +} + +func (m *Mattermost) sendToAPI(payload postPayload) { + data, err := json.Marshal(payload) + if err != nil { + fmt.Println("mattermost: error marshalling JSON:", err) + return + } + + req, err := http.NewRequest(http.MethodPost, m.apiURL, bytes.NewBuffer(data)) + if err != nil { + fmt.Println("mattermost: error creating request:", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + req.Header.Set("Authorization", "Bearer "+m.token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Println("mattermost: error sending request:", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + fmt.Println("mattermost: unexpected response:", resp.Status) + } +} + +func isJSON(str string) bool { + var js json.RawMessage + return json.Unmarshal([]byte(str), &js) == nil +} + +func prettyPrintJSON(str string) string { + var obj map[string]interface{} + if err := json.Unmarshal([]byte(str), &obj); err != nil { + return str + } + + prettyJSON, err := json.MarshalIndent(obj, "", " ") + if err != nil { + return str + } + + return string(prettyJSON) +} diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go new file mode 100644 index 0000000..da0cf73 --- /dev/null +++ b/internal/notifier/notifier.go @@ -0,0 +1,8 @@ +package notifier + +import "sync" + +type Notifier interface { + Send(msg string, container string) + StartSendWorker(wg *sync.WaitGroup) +} From 3e0e5d1a98f264ea0620a285b1c3c98340c08ca4 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Mon, 11 May 2026 23:59:16 +0700 Subject: [PATCH 2/2] Add multi-notifier support with Telegram and Mattermost --- cmd/main.go | 19 ++++++++++++++----- internal/config/config.go | 1 + internal/notifier/multi.go | 23 +++++++++++++++++++++++ 3 files changed, 38 insertions(+), 5 deletions(-) create mode 100644 internal/notifier/multi.go diff --git a/cmd/main.go b/cmd/main.go index 31d5b50..ad17082 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -25,12 +25,21 @@ func main() { log.Fatal(err) } - var n notifier.Notifier - if config.Mattermost.Token != "" { - n = mattermost.NewClient(ctx, config.Mattermost) - } else { - n = telegram.NewBot(ctx, config.Telegram) + var targets []notifier.Notifier + for _, name := range config.Notifiers { + switch name { + case "telegram": + targets = append(targets, telegram.NewBot(ctx, config.Telegram)) + case "mattermost": + targets = append(targets, mattermost.NewClient(ctx, config.Mattermost)) + default: + log.Fatalf("unknown notifier %q: expected \"telegram\" or \"mattermost\"", name) + } } + if len(targets) == 0 { + log.Fatal("no notifiers configured") + } + n := notifier.NewMultiNotifier(targets...) kubeClient := kube.NewCubeClient(ctx, config.Kube, n) diff --git a/internal/config/config.go b/internal/config/config.go index a59d9cc..3aef569 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,6 +11,7 @@ import ( ) type Config struct { + Notifiers []string `yaml:"notifiers"` // "telegram", "mattermost" Telegram Telegram `yaml:"telegram"` Mattermost Mattermost `yaml:"mattermost"` Kube Kube `yaml:"kube"` diff --git a/internal/notifier/multi.go b/internal/notifier/multi.go new file mode 100644 index 0000000..e72c93f --- /dev/null +++ b/internal/notifier/multi.go @@ -0,0 +1,23 @@ +package notifier + +import "sync" + +type MultiNotifier struct { + notifiers []Notifier +} + +func NewMultiNotifier(notifiers ...Notifier) *MultiNotifier { + return &MultiNotifier{notifiers: notifiers} +} + +func (m *MultiNotifier) Send(msg string, container string) { + for _, n := range m.notifiers { + n.Send(msg, container) + } +} + +func (m *MultiNotifier) StartSendWorker(wg *sync.WaitGroup) { + for _, n := range m.notifiers { + n.StartSendWorker(wg) + } +}