This repository was archived by the owner on Jun 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloki.go
More file actions
98 lines (77 loc) · 1.97 KB
/
loki.go
File metadata and controls
98 lines (77 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//go:build bd_all || bd_loki || loki
package blackdatura
import (
"bytes"
"encoding/json"
"net/http"
"net/url"
"time"
"go.uber.org/zap"
)
type lokiRequest struct {
Streams []lokiRequestStream `json:"streams"`
}
type lokiRequestStream struct {
Stream map[string]interface{} `json:"stream"`
Values []lokiRequestStreamValue `json:"values"`
}
type lokiRequestStreamValue [2]interface{}
func newLokiRequest() *lokiRequest {
var arg lokiRequest
arg.Streams = make([]lokiRequestStream, 1)
arg.Streams[0].Stream = make(map[string]interface{})
return &arg
}
type LokiSink struct {
Key []string
apiAddr string
httpClient *http.Client // TODO: client pool
}
func NewLoki(httpClient *http.Client, Key []string, apiAddr string) zap.Sink {
return &LokiSink{
Key: Key,
apiAddr: apiAddr,
httpClient: httpClient,
}
}
func (r LokiSink) Sink(*url.URL) (zap.Sink, error) { return r, nil }
// Close implement zap.Sink func Close
func (r LokiSink) Close() error { return nil }
// Write implement zap.Sink func Write
func (r LokiSink) Write(b []byte) (n int, err error) {
var (
req = newLokiRequest()
arg map[string]interface{}
)
if err = json.Unmarshal(b, &arg); err != nil {
return
}
if str, ok := arg["ts"].(string); ok {
var t time.Time
if t, err = time.Parse("2006-01-02T15:04:05.999-0700", str); err != nil {
return
}
req.Streams[0].Values = []lokiRequestStreamValue{{t.UnixNano(), string(b)}}
}
for _, v := range r.Key {
if data, has := arg[v]; has {
req.Streams[0].Stream[v] = data
}
}
var buf bytes.Buffer
if err = json.NewEncoder(&buf).Encode(req); err != nil {
return
}
var hr *http.Request
if hr, err = http.NewRequest("POST", r.apiAddr, &buf); err != nil {
return
}
var resp *http.Response
if resp, err = r.httpClient.Do(hr); err == nil {
_ = resp.Body.Close()
}
return len(b), err
}
// Sync implement zap.Sink func Sync
func (r LokiSink) Sync() error { return nil }
func (r LokiSink) String() string { return "loki" }