Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
!/config/config.yaml
/.idea
/.env
.vscode/settings.json
23 changes: 20 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/romus204/loggerator/internal/config"
"github.com/romus204/loggerator/internal/kube"
"github.com/romus204/loggerator/internal/mattermost"
"github.com/romus204/loggerator/internal/notifier"
"github.com/romus204/loggerator/internal/telegram"
)

Expand All @@ -23,13 +25,28 @@ func main() {
log.Fatal(err)
}

bot := telegram.NewBot(ctx, config.Telegram)
kubeClient := kube.NewCubeClient(ctx, config.Kube, bot)
var targets []notifier.Notifier
for _, name := range config.Notifiers {
switch name {
case "telegram":
targets = append(targets, telegram.NewBot(ctx, config.Telegram))
case "mattermost":
targets = append(targets, mattermost.NewClient(ctx, config.Mattermost))
default:
log.Fatalf("unknown notifier %q: expected \"telegram\" or \"mattermost\"", name)
}
}
if len(targets) == 0 {
log.Fatal("no notifiers configured")
}
n := notifier.NewMultiNotifier(targets...)

kubeClient := kube.NewCubeClient(ctx, config.Kube, n)

wg := sync.WaitGroup{}

kubeClient.Subscribe(&wg)
bot.StartSendWorker(&wg)
n.StartSendWorker(&wg)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
Expand Down
22 changes: 18 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,25 @@ import (
)

type Config struct {
Telegram Telegram `yaml:"telegram"` // telegram chat id
Kube Kube `yaml:"kube"` // kube
Notifiers []string `yaml:"notifiers"` // "telegram", "mattermost"
Telegram Telegram `yaml:"telegram"`
Mattermost Mattermost `yaml:"mattermost"`
Kube Kube `yaml:"kube"`
}

type Telegram struct {
Token string `yaml:"token"` // telegram bot token
Chat int `yaml:"chat"` // cat ids
Token string `yaml:"token"`
Chat int `yaml:"chat"`
Topics map[string]int `yaml:"topics"`
}

type Mattermost struct {
ServerURL string `yaml:"server_url"`
Token string `yaml:"token"`
ChannelID string `yaml:"channel_id"`
Channels map[string]string `yaml:"channels"`
}

type Kube struct {
Target []Target `yaml:"target"` // pod and containers name
KubeConfig string `yaml:"config"` // path to kube config
Expand Down Expand Up @@ -57,6 +66,11 @@ func (c *Config) processEnvVars() {
// Process Telegram fields
c.Telegram.Token = substituteEnvVars(c.Telegram.Token)

// Process Mattermost fields
c.Mattermost.ServerURL = substituteEnvVars(c.Mattermost.ServerURL)
c.Mattermost.Token = substituteEnvVars(c.Mattermost.Token)
c.Mattermost.ChannelID = substituteEnvVars(c.Mattermost.ChannelID)

// Process Kube fields
c.Kube.KubeConfig = substituteEnvVars(c.Kube.KubeConfig)
c.Kube.Namespace = substituteEnvVars(c.Kube.Namespace)
Expand Down
6 changes: 3 additions & 3 deletions internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
corev1 "k8s.io/api/core/v1"

"github.com/romus204/loggerator/internal/config"
"github.com/romus204/loggerator/internal/telegram"
"github.com/romus204/loggerator/internal/notifier"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type Kube struct {
kubeClient *kubernetes.Clientset
cfg config.Kube
bot *telegram.Telegram
bot notifier.Notifier
ctx context.Context
FilterRegex []*regexp.Regexp
Replacements []Replacement
Expand All @@ -37,7 +37,7 @@ type PodContainer struct {
container []string
}

func NewCubeClient(ctx context.Context, cfg config.Kube, bot *telegram.Telegram) *Kube {
func NewCubeClient(ctx context.Context, cfg config.Kube, bot notifier.Notifier) *Kube {
clientset, err := kubernetes.NewForConfig(cfg.Rest)
if err != nil {
log.Fatalf("Error creating kubernetes client: %v", err)
Expand Down
134 changes: 134 additions & 0 deletions internal/mattermost/mattermost.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package mattermost

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

"github.com/romus204/loggerator/internal/config"
)

type postPayload struct {
ChannelID string `json:"channel_id"`
Message string `json:"message"`
}

type sendRequest struct {
payload postPayload
}

type Mattermost struct {
ctx context.Context
apiURL string
token string
channelID string
channels map[string]string
queue chan sendRequest
ticker *time.Ticker
}

func NewClient(ctx context.Context, cfg config.Mattermost) *Mattermost {
return &Mattermost{
ctx: ctx,
apiURL: cfg.ServerURL + "/api/v4/posts",
token: cfg.Token,
channelID: cfg.ChannelID,
channels: cfg.Channels,
queue: make(chan sendRequest, 1000),
ticker: time.NewTicker(time.Minute / 20),
}
}

func (m *Mattermost) StartSendWorker(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()

for req := range m.queue {
<-m.ticker.C
m.sendToAPI(req.payload)
}
}()

wg.Add(1)
go func() {
defer wg.Done()

<-m.ctx.Done()
if m.ticker != nil {
m.ticker.Stop()
}
close(m.queue)
}()
}

func (m *Mattermost) Send(msg string, container string) {
var text string
if isJSON(msg) {
text = fmt.Sprintf("```json\n%s\n```", prettyPrintJSON(msg))
} else {
text = fmt.Sprintf("```\n%s\n```", msg)
}

channelID := m.channelID
if ch, ok := m.channels[container]; ok {
channelID = ch
}

select {
case <-m.ctx.Done():
return
case m.queue <- sendRequest{payload: postPayload{ChannelID: channelID, Message: text}}:
}
}

func (m *Mattermost) sendToAPI(payload postPayload) {
data, err := json.Marshal(payload)
if err != nil {
fmt.Println("mattermost: error marshalling JSON:", err)
return
}

req, err := http.NewRequest(http.MethodPost, m.apiURL, bytes.NewBuffer(data))
if err != nil {
fmt.Println("mattermost: error creating request:", err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", "Bearer "+m.token)

resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Println("mattermost: error sending request:", err)
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
fmt.Println("mattermost: unexpected response:", resp.Status)
}
}

func isJSON(str string) bool {
var js json.RawMessage
return json.Unmarshal([]byte(str), &js) == nil
}

func prettyPrintJSON(str string) string {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(str), &obj); err != nil {
return str
}

prettyJSON, err := json.MarshalIndent(obj, "", " ")
if err != nil {
return str
}

return string(prettyJSON)
}
23 changes: 23 additions & 0 deletions internal/notifier/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package notifier

import "sync"

type MultiNotifier struct {
notifiers []Notifier
}

func NewMultiNotifier(notifiers ...Notifier) *MultiNotifier {
return &MultiNotifier{notifiers: notifiers}
}

func (m *MultiNotifier) Send(msg string, container string) {
for _, n := range m.notifiers {
n.Send(msg, container)
}
}

func (m *MultiNotifier) StartSendWorker(wg *sync.WaitGroup) {
for _, n := range m.notifiers {
n.StartSendWorker(wg)
}
}
8 changes: 8 additions & 0 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package notifier

import "sync"

type Notifier interface {
Send(msg string, container string)
StartSendWorker(wg *sync.WaitGroup)
}