-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathrequests.go
More file actions
428 lines (386 loc) · 16 KB
/
requests.go
File metadata and controls
428 lines (386 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
// The structure of how to add new method types to the system.
// -----------------------------------------------------------
// All methods need 3 things:
// - A type definition
// - The type needs a getKind method
// - The type needs a handler method
// Overall structure example shown below.
//
// ---
// type methodCommandCLICommandRequest struct {
// commandOrEvent CommandOrEvent
// }
//
// func (m methodCommandCLICommandRequest) getKind() CommandOrEvent {
// return m.commandOrEvent
// }
//
// func (m methodCommandCLICommandRequest) handler(s *server, message Message, node string) ([]byte, error) {
// ...
// ...
// ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
// return ackMsg, nil
// }
//
// ---
// You also need to make a constant for the Method, and add
// that constant as the key in the MethodsAvailable map, where
// the value is the actual type you want to map it to with a
// handler method. You also specify if it is a Command or Event,
// and if it is ACK or NACK.
//
// Requests used in sub processes should always start with the
// naming REQSUB. Since the method of a sub process are defined
// within the method handler of the owning reqest type we should
// use the methodSUB for these types. The methodSUB handler
// does nothing.
//
// Check out the existing code below for more examples.
package ctrl
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
)
// Method is used to specify the actual function/method that
// is represented in a typed manner.
type Method string
// ------------------------------------------------------------
// The constants that will be used throughout the system for
// when specifying what kind of Method to send or work with.
const (
// Initial parent method used to start other processes.
Initial Method = "initial"
// Get a list of all the running processes.
OpProcessList Method = "opProcessList"
// Start up a process.
OpProcessStart Method = "opProcessStart"
// Stop up a process.
OpProcessStop Method = "opProcessStop"
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
// if it was delivered succesfully. The output of the command
// ran will be delivered back to the node where it was initiated
// as a new message.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
CliCommand Method = "cliCommand"
// REQCliCommandCont same as normal Cli command, but can be used
// when running a command that will take longer time and you want
// to send the output of the command continually back as it is
// generated, and not wait until the command is finished.
CliCommandCont Method = "cliCommandCont"
// Send text to be logged to the console.
// The data field is a slice of strings where the first string
// value should be the command, and the following the arguments.
Console Method = "console"
// Send text logging to some host by appending the output to a
// file, if the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the log file.
FileAppend Method = "fileAppend"
// Send text to some host by overwriting the existing content of
// the fileoutput to a file. If the file do not exist we create it.
// A file with the full subject+hostName will be created on
// the receiving end.
// The data field is a slice of strings where the values of the
// slice will be written to the file.
File Method = "file"
// Initiated by the user.
CopySrc Method = "copySrc"
// Initial request for file copying.
// Generated by the source to send initial information to the destination.
CopyDst Method = "copyDst"
// Read the source file to be copied to some node.
SUBCopySrc Method = "subCopySrc"
// Write the destination copied to some node.
SUBCopyDst Method = "subCopyDst"
PortSrc Method = "portSrc"
PortDst Method = "PortDst"
SUBPortSrc Method = "subPortSrc"
SUBPortDst Method = "subPortDst"
DummySrc Method = "dummySrc"
DummyDst Method = "dummyDst"
SUBDummySrc Method = "subDummySrc"
SUBDummyDst Method = "subDummyDst"
// Hello I'm here message.
Hello Method = "hello"
HelloPublisher Method = "helloPublisher"
// Error log methods to centralError node.
ErrorLog Method = "errorLog"
// Http Get
HttpGet Method = "httpGet"
// Tail file
TailFile Method = "tailFile"
// REQNone is used when there should be no reply.
None Method = "none"
// REQTest is used only for testing to be able to grab the output
// of messages.
Test Method = "test"
// REQPublicKey will get the public ed25519 key from a node.
PublicKey Method = "publicKey"
// KeysUpdateRequest will receive all the messages of the current hash of all public keys
// a node have stored, and send out an update if needed..
KeysUpdateRequest Method = "keysUpdateRequest"
// KeysUpdateReceive,deliver the public key from central to a node.
KeysUpdateReceive Method = "keysUpdateReceive"
// REQKeysAllow
KeysAllow Method = "keysAllow"
// REQKeysDelete
KeysDelete Method = "keysDelete"
// REQAclRequestUpdate will get all node acl's from central if an update is available.
AclRequestUpdate Method = "aclRequestUpdate"
// REQAclDeliverUpdate will deliver the acl from central to a node.
AclDeliverUpdate Method = "aclDeliverUpdate"
// REQAclAddCommand
AclAddCommand = "aclAddCommand"
// REQAclDeleteCommand
AclDeleteCommand = "aclDeleteCommand"
// REQAclDeleteSource
AclDeleteSource = "aclDeleteSource"
// REQGroupNodesAddNode
AclGroupNodesAddNode = "aclGroupNodesAddNode"
// REQAclGroupNodesDeleteNode
AclGroupNodesDeleteNode = "aclGroupNodesDeleteNode"
// REQAclGroupNodesDeleteGroup
AclGroupNodesDeleteGroup = "aclGroupNodesDeleteGroup"
// REQAclGroupCommandsAddCommand
AclGroupCommandsAddCommand = "aclGroupCommandsAddCommand"
// REQAclGroupCommandsDeleteCommand
AclGroupCommandsDeleteCommand = "aclGroupCommandsDeleteCommand"
// REQAclGroupCommandsDeleteGroup
AclGroupCommandsDeleteGroup = "aclGroupCommandsDeleteGroup"
// REQAclExport
AclExport = "aclExport"
// REQAclImport
AclImport = "aclImport"
// WebUI is used to send messages to the web ui.
WebUI Method = "webUI"
// GraphAddNode
GraphAddNode Method = "graphAddNode"
// GraphGetNode
GraphGetNode Method = "graphGetNode"
)
type Handler func(proc process, message Message, node string) ([]byte, error)
// The mapping of all the method constants specified, what type
// it references.
// The primary use of this table is that messages are not able to
// pass the actual type of the request since it is sent as a string,
// so we use the below table to find the actual type based on that
// string type.
func (m Method) GetMethodsAvailable() MethodsAvailable {
ma := MethodsAvailable{
Methodhandlers: map[Method]Handler{
Initial: Handler(methodInitial),
OpProcessList: Handler(methodOpProcessList),
OpProcessStart: Handler(methodOpProcessStart),
OpProcessStop: Handler(methodOpProcessStop),
CliCommand: Handler(methodCliCommand),
CliCommandCont: Handler(methodCliCommandCont),
Console: Handler(methodConsole),
FileAppend: Handler(methodFileAppend),
File: Handler(methodToFile),
CopySrc: Handler(methodCopySrc),
CopyDst: Handler(methodCopyDst),
SUBCopySrc: Handler(methodSUB),
SUBCopyDst: Handler(methodSUB),
PortSrc: Handler(methodPortSrc),
PortDst: Handler(methodPortDst),
SUBPortSrc: Handler(methodSUB),
SUBPortDst: Handler(methodSUB),
Hello: Handler(methodHello),
// The hello publisher will not subscribe for messages, it will
// only start a procFunc, so we we don't need a handler with a method,
// so we set it to nil.
HelloPublisher: Handler(nil),
ErrorLog: Handler(methodErrorLog),
HttpGet: Handler(methodHttpGet),
TailFile: Handler(methodTailFile),
PublicKey: Handler(methodPublicKey),
KeysUpdateRequest: Handler(methodKeysUpdateRequest),
KeysUpdateReceive: Handler(methodKeysUpdateReceive),
KeysAllow: Handler(methodKeysAllow),
KeysDelete: Handler(methodKeysDelete),
AclRequestUpdate: Handler(methodAclRequestUpdate),
AclDeliverUpdate: Handler(methodAclDeliverUpdate),
AclAddCommand: Handler(methodAclAddCommand),
AclDeleteCommand: Handler(methodAclDeleteCommand),
AclDeleteSource: Handler(methodAclDeleteSource),
AclGroupNodesAddNode: Handler(methodAclGroupNodesAddNode),
AclGroupNodesDeleteNode: Handler(methodAclGroupNodesDeleteNode),
AclGroupNodesDeleteGroup: Handler(methodAclGroupNodesDeleteGroup),
AclGroupCommandsAddCommand: Handler(methodAclGroupCommandsAddCommand),
AclGroupCommandsDeleteCommand: Handler(methodAclGroupCommandsDeleteCommand),
AclGroupCommandsDeleteGroup: Handler(methodAclGroupCommandsDeleteGroup),
AclExport: Handler(methodAclExport),
AclImport: Handler(methodAclImport),
Test: Handler(methodTest),
WebUI: Handler(nil),
GraphAddNode: Handler(methodGraphAddNode),
GraphGetNode: Handler(methodGraphGetNode),
},
}
return ma
}
// getHandler will check the methodsAvailable map, and return the
// method handler for the method given
// as input argument.
func (m Method) getHandler(method Method) Handler {
ma := m.GetMethodsAvailable()
mh, _ := ma.CheckIfExists(method)
// mh := ma.Methodhandlers[method]
return mh
}
// getContextForMethodTimeout, will return a context with cancel function
// with the timeout set to the method timeout in the message.
// If the value of timeout is set to -1, we don't want it to stop, so we
// return a context with a timeout set to 200 years.
func getContextForMethodTimeout(ctx context.Context, message Message) (context.Context, context.CancelFunc) {
// If methodTimeout == -1, which means we don't want a timeout, set the
// time out to 200 years.
if message.MethodTimeout == -1 {
return context.WithTimeout(ctx, time.Hour*time.Duration(8760*200))
}
return context.WithTimeout(ctx, time.Second*time.Duration(message.MethodTimeout))
}
// ----
// Initial parent method used to start other processes.
func methodInitial(proc process, message Message, node string) ([]byte, error) {
// proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
// place holder method used for sub processes.
// Methods used in sub processes are defined within the requests
// they are spawned in, so this type is primarily for us to use the
// same logic with sub process requests as we do with normal requests.
func methodSUB(proc process, message Message, node string) ([]byte, error) {
// proc.procFuncCh <- message
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
// MethodsAvailable holds a map of all the different method types and the
// associated handler to that method type.
type MethodsAvailable struct {
Methodhandlers map[Method]Handler
}
// Check if exists will check if the Method is defined. If true the bool
// value will be set to true, and the methodHandler function for that type
// will be returned.
func (ma MethodsAvailable) CheckIfExists(m Method) (Handler, bool) {
// First check if it is a sub process.
if strings.HasPrefix(string(m), "sub") {
// Strip of the uuid after the method name.
sp := strings.Split(string(m), ".")
m = Method(sp[0])
}
mFunc, ok := ma.Methodhandlers[m]
if ok {
return mFunc, true
} else {
return nil, false
}
}
// newReplyMessage will create and send a reply message back to where
// the original provided message came from. The primary use of this
// function is to report back to a node who sent a message with the
// result of the request method of the original message.
//
// The method to use for the reply message when reporting back should
// be specified within a message in the replyMethod field. We will
// pick up that value here, and use it as the method for the new
// request message. If no replyMethod is set we default to the
// REQToFileAppend method type.
//
// There will also be a copy of the original message put in the
// previousMessage field. For the copy of the original message the data
// field will be set to nil before the whole message is put in the
// previousMessage field so we don't copy around the original data in
// the reply response when it is not needed anymore.
func newReplyMessage(proc process, message Message, outData []byte) {
// If REQNone is specified, we don't want to send a reply message
// so we silently just return without sending anything.
if message.ReplyMethod == None || message.IsReply {
return
}
// If no replyMethod is set we default to writing to writing to
// a log file.
if message.ReplyMethod == "" {
message.ReplyMethod = FileAppend
}
// Make a copy of the message as it is right now to use
// in the previous message field, but set the data field
// to nil so we don't copy around the original data when
// we don't need to for the reply message.
thisMsg := message
thisMsg.Data = nil
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
// The ToNodes field is not needed since it is only a concept that exists when messages
// are injected f.ex. on a socket, and there they are directly converted into separate
// node messages. With other words a message in the system are only for single nodes,
// so we don't have to worry about the ToNodes field when creating reply messages.
FromNode: message.ToNode,
Data: outData,
Method: message.ReplyMethod,
MethodArgs: message.ReplyMethodArgs,
MethodTimeout: message.ReplyMethodTimeout,
IsReply: true,
RetryWait: message.RetryWait,
ACKTimeout: message.ReplyACKTimeout,
Retries: message.ReplyRetries,
Directory: message.Directory,
FileName: message.FileName,
MethodInstructions: message.ReplyMethodInstructions,
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
PreviousMessage: &thisMsg,
}
proc.newMessagesCh <- newMsg
}
// selectFileNaming will figure out the correct naming of the file
// structure to use for the reply data.
// It will return the filename, and the tree structure for the folders
// to create.
func selectFileNaming(message Message, proc process) (string, string) {
var fileName string
// As default we set the folder tree to what is specified in the
// message.Directory field. If we don't want that in the checks
// done later we then replace the value with what we want.
folderTree := message.Directory
checkPrefix := func(s string) bool {
if strings.HasPrefix(s, "./") || strings.HasPrefix(s, "/") {
return true
}
return false
}
switch {
case message.PreviousMessage == nil:
// If this was a direct request there are no previous message to take
// information from, so we use the one that are in the current mesage.
fileName = message.FileName
if !checkPrefix(message.Directory) {
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.Directory, string(message.ToNode))
}
case message.PreviousMessage.ToNode != "":
fileName = message.PreviousMessage.FileName
if !checkPrefix(message.PreviousMessage.Directory) {
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.PreviousMessage.ToNode))
}
case message.PreviousMessage.ToNode == "":
fileName = message.PreviousMessage.FileName
if !checkPrefix(message.PreviousMessage.Directory) {
folderTree = filepath.Join(proc.configuration.SubscribersDataFolder, message.PreviousMessage.Directory, string(message.FromNode))
}
}
return fileName, folderTree
}