forked from LeoPlatform/Nodejs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
127 lines (118 loc) · 4.41 KB
/
index.js
File metadata and controls
127 lines (118 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"use strict";
let leoconfig = require("leo-config");
let ls = require("./lib/stream/leo-stream");
let logging = require("./lib/logging.js");
let LeoConfiguration = require("./lib/configuration.js");
let aws = require("./lib/leo-aws");
const fs = require("fs");
const ini = require('ini');
const execSync = require("child_process").execSync;
function SDK(id, data) {
if (typeof id !== "string") {
data = id;
id = data.id || "default_bot";
}
let configuration = new LeoConfiguration(data);
let awsConfig = leoconfig.leoaws || configuration.aws;
if (awsConfig.profile) {
let profile = awsConfig.profile;
let configFile = `${process.env.HOME || process.env.HOMEPATH}/.aws/config`;
if (fs.existsSync(configFile)) {
let config = ini.parse(fs.readFileSync(configFile, 'utf-8'));
let p = config[`profile ${profile}`];
if (p && p.mfa_serial) {
let cacheFile = `${process.env.HOME || process.env.HOMEPATH}/.aws/cli/cache/${profile}--${p.role_arn.replace(/:/g, '_').replace(/[^A-Za-z0-9\-_]/g, '-')}.json`;
let data = {};
try {
data = JSON.parse(fs.readFileSync(cacheFile));
} catch (e) {
// Ignore error, Referesh Credentials
data = {};
} finally {
console.log("Using cached AWS credentials", profile);
if (!data.Credentials || new Date() >= new Date(data.Credentials.Expiration)) {
execSync('aws sts get-caller-identity --duration-seconds 28800 --profile ' + profile);
data = JSON.parse(fs.readFileSync(cacheFile));
}
}
configuration.credentials = new aws.STS().credentialsFrom(data, data);
} else {
console.log("Switching AWS Profile", profile);
configuration.credentials = new aws.SharedIniFileCredentials(awsConfig);
}
} else {
console.log("Switching AWS Profile", awsConfig.profile);
configuration.credentials = new aws.SharedIniFileCredentials(awsConfig);
}
}
let logger = null;
if (data && data.logging) {
logger = logging(id, configuration);
}
let leoStream = ls(configuration);
return Object.assign((id, data) => {
return new SDK(id, data)
}, {
configuration: configuration,
destroy: (callback) => {
if (logger) {
logger.end(callback);
}
},
/**
* Stream for writing events to a queue
* @param {string} id - The id of the bot
* @param {string} outQueue - The queue into which events will be written
* @param {Object} config - An object that contains config values that control the flow of events to outQueue
* @return {stream} Stream
*/
load: leoStream.load,
/**
* Process events from a queue.
* @param {Object} opts
* @param {string} opts.id - The id of the bot
* @param {string} opts.inQueue - The queue from which events will be read
* @param {Object} opts.config - An object that contains config values that control the flow of events from inQueue
* @param {function} opts.batch - A function to batch data from inQueue (optional)
* @param {function} opts.each - A function to transform data from inQueue or from batch function, and offload from the platform
* @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { }
* @return {stream} Stream
*/
offload: leoStream.offload,
/**
* Enrich events from one queue to another.
* @param {Object} opts
* @param {string} opts.id - The id of the bot
* @param {string} opts.inQueue - The queue from which events will be read
* @param {string} opts.outQueue - The queue into which events will be written
* @param {Object} opts.config - An object that contains config values that control the flow of events from inQueue and to outQueue
* @param {function} opts.transform - A function to transform data from inQueue to outQueue
* @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { }
* @return {stream} Stream
*/
enrich: leoStream.enrich,
read: leoStream.fromLeo,
write: leoStream.toLeo,
put: function(bot_id, queue, payload, callback) {
let stream = this.load(bot_id, queue, {
kinesis: {
records: 1
}
});
stream.write(payload);
stream.end(callback);
},
checkpoint: leoStream.toCheckpoint,
streams: leoStream,
bot: leoStream.cron,
aws: {
dynamodb: leoStream.dynamodb,
s3: leoStream.s3,
cloudformation: new aws.CloudFormation({
region: configuration.aws.region,
credentials: configuration.credentials
})
}
});
}
module.exports = new SDK(false);