-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexporter.go
More file actions
335 lines (303 loc) · 9.57 KB
/
exporter.go
File metadata and controls
335 lines (303 loc) · 9.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package apex
import (
"context"
"errors"
"sync"
"time"
"github.com/microsoft/ApplicationInsights-Go/appinsights"
"github.com/microsoft/ApplicationInsights-Go/appinsights/contracts"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
trace "go.opentelemetry.io/otel/trace"
)
type AppInsightsExporter struct {
client appinsights.TelemetryClient
mtx *sync.RWMutex
closed bool
}
// NewExporter creates a new App Insights Exporter with an app insights
// telemetry client created from an instrumentation key. The exporter uses a
// logger function provided as a callback for logging events.
func NewExporter(
instrumentationKey string,
logger func(msg string) error,
) (*AppInsightsExporter, error) {
client := appinsights.NewTelemetryClient(instrumentationKey)
appinsights.NewDiagnosticsMessageListener(logger)
return &AppInsightsExporter{
client: client,
mtx: &sync.RWMutex{},
closed: false,
}, nil
}
// NewExporterFromConfig creates a new App Insights Exporter with an app
// insights telemetry client created from a telemetry configuration. The
// exporter uses a logger function provided as a callback for logging events.
func NewExporterFromConfig(
cfg *appinsights.TelemetryConfiguration,
logger func(msg string) error,
) (*AppInsightsExporter, error) {
if cfg == nil {
return nil, errors.New("configuration is nil")
}
client := appinsights.NewTelemetryClientFromConfig(cfg)
appinsights.NewDiagnosticsMessageListener(logger)
return &AppInsightsExporter{
client: client,
mtx: &sync.RWMutex{},
closed: false,
}, nil
}
// ExportSpans processes and dispatches an array of Open Telemetry spans
// to Application Insights.
func (exp *AppInsightsExporter) ExportSpans(
ctx context.Context,
spans []sdktrace.ReadOnlySpan,
) error {
exp.mtx.RLock()
defer exp.mtx.RUnlock()
if exp.closed {
return errors.New("exporter closed")
}
for i := range spans {
exp.process(spans[i])
}
return nil
}
// Shutdown closes the exporter and waits until the pending messages are sent
// with up to one minute grace period, or until the context is canceled.
// Grace period might change in the future to be optionable
func (exp *AppInsightsExporter) Shutdown(
ctx context.Context,
) error {
exp.mtx.Lock()
defer exp.mtx.Unlock()
exp.closed = true
select {
case <-exp.client.Channel().Close(time.Minute):
return nil
case <-ctx.Done():
return errors.New("context canceled")
}
}
// processInternal constructs a telemetry for an internal event and dispatches
// it to the application insights telemetry client.
//
// Application Insights specific fields are sourced from custom properties:
// Role = properties["service.name"]
func (exp *AppInsightsExporter) processInternal(
sp sdktrace.ReadOnlySpan,
properties map[string]string,
) {
tele := appinsights.EventTelemetry{
Name: sp.Name(),
BaseTelemetry: appinsights.BaseTelemetry{
Timestamp: sp.StartTime(),
Tags: make(contracts.ContextTags),
Properties: map[string]string{},
},
BaseTelemetryMeasurements: appinsights.BaseTelemetryMeasurements{
Measurements: map[string]float64{},
},
}
pid := sp.Parent().SpanID().String()
if pid == "0000000000000000" {
pid = sp.SpanContext().TraceID().String()
}
tele.Tags.Cloud().SetRole("unknown-service")
if val, ok := properties[string(semconv.ServiceNameKey)]; ok {
delete(properties, string(semconv.ServiceNameKey))
tele.Tags.Cloud().SetRole(val)
}
tele.BaseTelemetry.Properties = properties
tele.Tags.Operation().SetId(sp.SpanContext().TraceID().String())
tele.Tags.Operation().SetParentId(pid)
tele.Tags.Operation().SetName(sp.Name())
exp.client.Track(&tele)
}
// processRequest constructs the telemetry for an incoming http request
// and and dispatches it to the application insights telemetry client.
//
// Application Insights specific fields are sourced from custom properties:
// Role = properties["service.name"]
// Url = properties["url"]
// ResponseCode = properties["responseCode"]
func (exp *AppInsightsExporter) processRequest(
sp sdktrace.ReadOnlySpan,
success bool,
properties map[string]string,
) {
tele := appinsights.RequestTelemetry{
Name: sp.Name(),
Url: "",
Id: sp.SpanContext().SpanID().String(),
Duration: sp.EndTime().Sub(sp.StartTime()),
ResponseCode: "0",
Success: success,
BaseTelemetry: appinsights.BaseTelemetry{
Timestamp: sp.StartTime(),
Tags: make(contracts.ContextTags),
Properties: map[string]string{},
},
BaseTelemetryMeasurements: appinsights.BaseTelemetryMeasurements{
Measurements: map[string]float64{},
},
}
tele.Tags.Cloud().SetRole("unknown-service")
if val, ok := properties[string(semconv.ServiceNameKey)]; ok {
delete(properties, string(semconv.ServiceNameKey))
tele.Tags.Cloud().SetRole(val)
}
if val, ok := properties["url"]; ok {
delete(properties, "url")
tele.Url = val
}
if val, ok := properties["responseCode"]; ok {
delete(properties, "responseCode")
tele.ResponseCode = val
}
tele.BaseTelemetry.Properties = properties
pid := sp.Parent().SpanID().String()
if pid == "0000000000000000" {
pid = sp.SpanContext().TraceID().String()
}
tele.Tags.Operation().SetId(sp.SpanContext().TraceID().String())
tele.Tags.Operation().SetParentId(pid)
tele.Tags.Operation().SetName(sp.Name())
exp.client.Track(&tele)
}
// processEvent constructs the telemetry for an incoming event to be handled
// and and dispatches it to the application insights telemetry client.
//
// Application Insights specific fields are sourced from custom properties:
// Role = properties["service.name"]
// Url = properties["key"]
// ResponseCode = properties["responseCode"]
func (exp *AppInsightsExporter) processEvent(
sp sdktrace.ReadOnlySpan,
success bool,
properties map[string]string,
) {
tele := appinsights.RequestTelemetry{
Name: sp.Name(),
Url: "",
Id: sp.SpanContext().SpanID().String(),
Duration: sp.EndTime().Sub(sp.StartTime()),
ResponseCode: "0",
Success: success,
BaseTelemetry: appinsights.BaseTelemetry{
Timestamp: sp.StartTime(),
Tags: make(contracts.ContextTags),
Properties: map[string]string{},
},
BaseTelemetryMeasurements: appinsights.BaseTelemetryMeasurements{
Measurements: map[string]float64{},
},
}
tele.Tags.Cloud().SetRole("unknown-service")
if val, ok := properties[string(semconv.ServiceNameKey)]; ok {
delete(properties, string(semconv.ServiceNameKey))
tele.Tags.Cloud().SetRole(val)
}
if val, ok := properties["key"]; ok {
delete(properties, "key")
tele.Url = val
}
if val, ok := properties["responseCode"]; ok {
delete(properties, "responseCode")
tele.ResponseCode = val
}
tele.BaseTelemetry.Properties = properties
pid := sp.Parent().SpanID().String()
if pid == "0000000000000000" {
pid = sp.SpanContext().TraceID().String()
}
tele.Tags.Operation().SetId(sp.SpanContext().TraceID().String())
tele.Tags.Operation().SetParentId(pid)
tele.Tags.Operation().SetName(sp.Name())
exp.client.Track(&tele)
}
// processDependency constructs the telemetry for an outgoing dependency
// and and dispatches it to the application insights telemetry client.
//
// Application Insights specific fields are sourced from custom properties:
// Role = properties["source"]
// Type = properties["type"]
// Target = properties["service.name"]
func (exp *AppInsightsExporter) processDependency(
sp sdktrace.ReadOnlySpan,
success bool,
properties map[string]string,
) {
tele := appinsights.RemoteDependencyTelemetry{
Name: sp.Name(),
Id: sp.SpanContext().SpanID().String(),
Type: "",
Target: "",
Duration: sp.EndTime().Sub(sp.StartTime()),
Success: success,
BaseTelemetry: appinsights.BaseTelemetry{
Timestamp: sp.StartTime(),
Tags: make(contracts.ContextTags),
Properties: map[string]string{},
},
BaseTelemetryMeasurements: appinsights.BaseTelemetryMeasurements{
Measurements: map[string]float64{},
},
}
tele.Tags.Cloud().SetRole("unknown-service")
if val, ok := properties["source"]; ok {
delete(properties, "source")
tele.Tags.Cloud().SetRole(val)
}
if val, ok := properties["type"]; ok {
delete(properties, "type")
tele.Type = val
}
tele.Target = "unknown-target"
if val, ok := properties[string(semconv.ServiceNameKey)]; ok {
delete(properties, string(semconv.ServiceNameKey))
tele.Target = val
}
tele.BaseTelemetry.Properties = properties
pid := sp.Parent().SpanID().String()
if pid == "0000000000000000" {
pid = sp.SpanContext().TraceID().String()
}
tele.Tags.Operation().SetId(sp.SpanContext().TraceID().String())
tele.Tags.Operation().SetParentId(pid)
tele.Tags.Operation().SetName(sp.Name())
exp.client.Track(&tele)
}
// process routes the span to different processing functions based on the
// span's kind to be processed appropriately
func (exp *AppInsightsExporter) process(sp sdktrace.ReadOnlySpan) {
success := true
if sp.Status().Code != codes.Ok {
success = false
}
props := map[string]string{}
rattr := sp.Resource().Attributes()
for _, e := range rattr {
props[string(e.Key)] = e.Value.AsString()
}
attr := sp.Attributes()
for _, e := range attr {
props[string(e.Key)] = e.Value.AsString()
}
switch sp.SpanKind() {
case trace.SpanKindUnspecified:
exp.processInternal(sp, props)
case trace.SpanKindInternal:
exp.processInternal(sp, props)
case trace.SpanKindServer:
exp.processRequest(sp, success, props)
case trace.SpanKindClient:
exp.processDependency(sp, success, props)
case trace.SpanKindProducer:
exp.processDependency(sp, success, props)
case trace.SpanKindConsumer:
exp.processEvent(sp, success, props)
}
}