forked from andyhuynh3/target-jsonl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtarget_jsonl.py
More file actions
executable file
·101 lines (78 loc) · 3.05 KB
/
target_jsonl.py
File metadata and controls
executable file
·101 lines (78 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
import argparse
import io
import json
import os
import sys
from datetime import datetime
import singer
from jsonschema import Draft4Validator, FormatChecker
from decimal import Decimal
logger = singer.get_logger()
def emit_state(state):
if state is not None:
line = json.dumps(state)
logger.debug('Emitting state {}'.format(line))
sys.stdout.write("{}\n".format(line))
sys.stdout.flush()
def float_to_decimal(value):
'''Walk the given data structure and turn all instances of float into
double.'''
if isinstance(value, float):
return Decimal(str(value))
if isinstance(value, list):
return [float_to_decimal(child) for child in value]
if isinstance(value, dict):
return {k: float_to_decimal(v) for k, v in value.items()}
return value
def persist_messages(messages, destination_path, do_timestamp_file=True):
state = None
schemas = {}
key_properties = {}
validators = {}
timestamp_file_part = '-' + datetime.now().strftime('%Y%m%dT%H%M%S') if do_timestamp_file else ''
for message in messages:
try:
o = singer.parse_message(message).asdict()
except json.decoder.JSONDecodeError:
logger.error("Unable to parse:\n{}".format(message))
raise
message_type = o['type']
if message_type == 'RECORD':
if o['stream'] not in schemas:
raise Exception(
"A record for stream {}"
"was encountered before a corresponding schema".format(o['stream'])
)
validators[o['stream']].validate(float_to_decimal(o['record']))
filename = o['stream'] + timestamp_file_part + '.jsonl'
filename = os.path.expanduser(os.path.join(destination_path, filename))
with open(filename, 'a', encoding='utf-8') as json_file:
json_file.write(json.dumps(o['record']) + '\n')
state = None
elif message_type == 'STATE':
logger.debug('Setting state to {}'.format(o['value']))
state = o['value']
elif message_type == 'SCHEMA':
stream = o['stream']
schemas[stream] = float_to_decimal(o['schema'])
validators[stream] = Draft4Validator(float_to_decimal(o['schema']))
key_properties[stream] = o['key_properties']
else:
logger.warning("Unknown message type {} in message {}".format(o['type'], o))
return state
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Config file')
args = parser.parse_args()
if args.config:
with open(args.config) as input_json:
config = json.load(input_json)
else:
config = {}
input_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = persist_messages(input_messages, config.get('destination_path', ''), config.get('do_timestamp_file', True))
emit_state(state)
logger.debug("Exiting normally")
if __name__ == '__main__':
main()