Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
5cbce01
dwconnect - tuning, hashed sk, scd bypass
mscranton-CH Oct 12, 2022
e32db11
s3 load, sysdate
mscranton-CH Oct 18, 2022
b6b5b03
bump version
mscranton-CH Oct 20, 2022
421867c
add back code removed erroneously
mscranton-CH Oct 20, 2022
1badf24
add back incorrectly removed code
mscranton-CH Oct 24, 2022
b46e3a0
bug fixes, comments
mscranton-CH Oct 25, 2022
a3dd99e
GitHub action publish-postgres
mscranton-CH Oct 25, 2022
bea9996
move cd to correct spot (I think)
mscranton-CH Oct 25, 2022
395b6d2
trying workflow things
mscranton-CH Oct 25, 2022
2840aea
trying workflow things
mscranton-CH Oct 25, 2022
2cfc109
trying workflow things
mscranton-CH Oct 25, 2022
1e13340
trying workflow things
mscranton-CH Oct 25, 2022
f0a6977
trying workflow things
mscranton-CH Oct 25, 2022
55d06c6
trying workflow things
mscranton-CH Oct 25, 2022
b83c005
trying workflow things
mscranton-CH Oct 25, 2022
abf2cb8
trying workflow things
mscranton-CH Oct 25, 2022
1ce11aa
trying workflow things
mscranton-CH Oct 25, 2022
7569376
trying workflow things
mscranton-CH Oct 25, 2022
b3d83f7
trying workflow things
mscranton-CH Oct 25, 2022
98897fa
trying workflow things
mscranton-CH Oct 25, 2022
229365b
trying workflow things
mscranton-CH Oct 25, 2022
61f4104
fix for supported config check
mscranton-CH Oct 25, 2022
da87a09
seems a comment broke a SQL statement
mscranton-CH Oct 25, 2022
1c1a973
more comments in SQL, probably should have remembered the character f…
mscranton-CH Oct 26, 2022
f3bdc92
Reverting to NPM for package registry
mscranton-CH Oct 27, 2022
8cef900
add beta to version
mscranton-CH Oct 28, 2022
9de767f
* bumped version
Oct 28, 2022
5e05797
ETL rework for high volume scaling
mscranton-CH Nov 7, 2022
9a0dbd7
Merge branch 'redshift-load-optimization' of github.com:LeoPlatform/c…
mscranton-CH Nov 7, 2022
2703950
Fix version after conflict
mscranton-CH Nov 7, 2022
554a6be
moving audit date to proper location
mscranton-CH Nov 8, 2022
76d0728
Various fixes and cleanup
mscranton-CH Dec 15, 2022
cec0732
Bump version
mscranton-CH Dec 15, 2022
1ba3a40
etl fixes and version bump
mscranton-CH Jan 19, 2023
a71ce57
reverting uneeded changes
mscranton-CH Jan 19, 2023
d733d75
sortkey type handling and d_time fix
mscranton-CH Jan 31, 2023
d41d75b
change TIMEFORMAT to auto on copy command
Feb 6, 2023
c3adc4b
bump leo-connector-postgres version to 4.0.12-beta
Feb 7, 2023
c5e8fed
fix for SCD columns
mscranton-CH Feb 10, 2023
b5e6cdd
missed the package-lock
mscranton-CH Feb 10, 2023
e0a64f8
Fix for _deleted on fact tables
mscranton-CH Feb 28, 2023
d443802
fact fix and newline fix
mscranton-CH Mar 1, 2023
6bf2587
package version bump
mscranton-CH Mar 1, 2023
2ea9d69
Fixing bug with related to posting errors to a different queue when l…
czirker Mar 3, 2023
8a74705
fix bad checkpointing
czirker Mar 3, 2023
c3f9811
Newline fix and version bump
mscranton-CH Mar 6, 2023
b7e095b
Merge branch 'redshift-load-optimization' of github.com:LeoPlatform/c…
mscranton-CH Mar 6, 2023
4cb98bf
Add bug fix from development
mscranton-CH Mar 20, 2023
87bd935
version bump
mscranton-CH Mar 20, 2023
29091c0
Version bump again for leo-connector-common
mscranton-CH Mar 20, 2023
96ed636
update schema management for hash keys
mscranton-CH Apr 6, 2023
d4ce694
Fixing postgress loader to pass along errors that occurred before the…
czirker Aug 9, 2023
26051fe
increment postgres package version
czirker Aug 9, 2023
caf9e7d
RSTREAM-209 Fixing issue with sortKey when creating the stating table…
czirker Oct 18, 2023
11f7f62
updated version to 4.0.21-beta
czirker Nov 29, 2023
79e81c4
ES-2352 - bumped version # again
jgrantr Jan 31, 2025
75eed1d
Merge pull request #224 from LeoPlatform/feature/s3-entity-table-loading
jgrantr Jan 31, 2025
e5d9c96
ES-2516 - reset deleted flag on update or insert
jgrantr Jan 31, 2025
833d65d
ES-2516 - updating version tags
jgrantr Jan 31, 2025
9063ae2
Merge branch 'development' into redshift-load-optimization
jgrantr Jan 31, 2025
c343531
Merge pull request #180 from LeoPlatform/redshift-load-optimization
jgrantr Jan 31, 2025
5a9ebb5
Merge branch 'development' into feature/ES-2516-fix-deleted
jgrantr Jan 31, 2025
6f3535a
ES-2516 - fix for new natural key loading
jgrantr Jan 31, 2025
2d7c03e
Merge pull request #225 from LeoPlatform/feature/ES-2516-fix-deleted
jgrantr Jan 31, 2025
516e252
ES-2516 - ran NPM install
jgrantr Jan 31, 2025
f40c904
ES-2516 - ensure we are setting _deleted to false when it is updated
jgrantr Jan 31, 2025
90830af
ES-2516 - don't write deletes to the CSV file/staging table
jgrantr Feb 6, 2025
6662c11
ES-2516 - updated version #'s
jgrantr Feb 6, 2025
9c44258
ES-2516 - use new version of the common connector
jgrantr Feb 6, 2025
1a4a4c8
Merge pull request #226 from LeoPlatform/feature/ES-2516-more-delete-…
jgrantr Feb 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/publish-postgres.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This workflow will do a clean install of node dependencies, cache/restore them, build the source code and run tests across different versions of node
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-nodejs-with-github-actions

name: Publish-Postgres

on:
release:
types: [published]
# branches: [ $default-branch ]

jobs:
publish-new-release:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./postgres
steps:
- uses: actions/checkout@v2
- name: Node 12
uses: actions/setup-node@v2
with:
node-version: '12.x'
registry-url: 'https://npm.pkg.github.com'
scope: '@leoplatform'
- run: npm ci
- run: npm run build --if-present
# - run: npm run test #Nothing is implemented here currently so this is omitted
# - run: git config user.email "githubactions@commercehub.com" #skipping this for now since we don't have a servie account for LeoPlatform yet
- run: git config user.name "GitHub-Actions"
- run: npm version ${{ github.event.release.tag_name }}
- run: npm publish --access=public
env:
NODE_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
5 changes: 4 additions & 1 deletion common/datawarehouse/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ function combine(file) {
process.exit();
}
if (lastObj && id === lastId) {
if (data.__leo_delete__) {
if (data.__leo_delete__ || lastObj.__leo_delete__) {
// if our current row is a delete, then we throw away the previous updates
// if our previous row was a delete, but the current one is not, we throw away the delete, because it was updated
// after the delete, so it should NOT be deleted.
lastObj = data;
} else {
lastObj = merge(lastObj, data);
Expand Down
16 changes: 8 additions & 8 deletions common/datawarehouse/load.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const async = require('async');
const validate = require('./../utils/validation');
let errorStream;

module.exports = function (ID, source, client, tableConfig, stream, callback) {
module.exports = function(ID, source, client, tableConfig, stream, callback) {

// adding backwards compatibility for ID and source
if (!callback) {
Expand Down Expand Up @@ -39,7 +39,7 @@ module.exports = function (ID, source, client, tableConfig, stream, callback) {
});
});

let checkforDelete = ls.through(function (obj, done) {
let checkforDelete = ls.through(function(obj, done) {
if (obj.payload.type === 'delete') {
let data = obj.payload.data || {};
let ids = data.in || [];
Expand Down Expand Up @@ -68,7 +68,7 @@ module.exports = function (ID, source, client, tableConfig, stream, callback) {
}
});

let validateData = ls.through(function (obj, done) {
let validateData = ls.through(function(obj, done) {
let eventObj = obj.payload.data;
let invalid = false;

Expand Down Expand Up @@ -336,7 +336,7 @@ module.exports = function (ID, source, client, tableConfig, stream, callback) {
* @param eventObj {Object}
* @param error {string}
*/
function handleFailedValidation (ID, source, eventObj, error) {
function handleFailedValidation(ID, source, eventObj, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect Case-Sensitive Property Check Breaks Error Stream Initialization

Same incorrect property check as above. The code checks !errorStream.Writable but should check !errorStream.writable (lowercase w). This will cause the error stream initialization logic to be executed every time handleFailedValidation is called, potentially creating multiple pipelines for the same error stream.

Fix in Cursor Fix in Web

if (!errorStream || !errorStream.Writable) {
errorStream = streams.passthrough({
objectMode: true
Expand All @@ -345,11 +345,11 @@ function handleFailedValidation (ID, source, eventObj, error) {
streams.pipe(
errorStream,

ls.process(ID, obj => {
ls.process(ID, function(obj) {
return obj;
}),

leo.load(ID, `${source}_error`),
}, `${source}_error`),
ls.toLeo(ID),
ls.devnull(),

(err) => {
err && logger.error('GOT ERROR', err);
Expand Down
2 changes: 1 addition & 1 deletion common/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "leo-connector-common",
"version": "4.0.11-rc",
"version": "4.0.13-rc",
"description": "Common package for all Leo Platform database connectors",
"main": "index.js",
"directories": {},
Expand Down
2 changes: 1 addition & 1 deletion entity-table/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion entity-table/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "leo-connector-entity-table",
"version": "3.0.21-rc",
"version": "3.0.22-rc",
"description": "A Leo entity table connector",
"repository": {
"type": "git",
Expand Down
178 changes: 174 additions & 4 deletions postgres/lib/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ var copyFrom = require('pg-copy-streams').from;
var copyTo = require('pg-copy-streams').to;
let csv = require('fast-csv');

// need to confirm there is no issue adding these dependencies
const leo = require('leo-sdk');
const ls = leo.streams;

require('pg').types.setTypeParser(1114, (val) => {
val += 'Z';
logger.debug(val);
return moment(val).unix() + ' ' + moment(val).utc().format();
});

const ls = require('leo-sdk').streams;

let queryCount = 0;
module.exports = function (config) {
const pool = new Pool(Object.assign({
Expand Down Expand Up @@ -571,8 +573,176 @@ function create (pool, parentCache) {
records: opts.records,
});
},
streamToTableFromS3: () => {
// opts = Object.assign({}, opts || {});
streamToTableFromS3: (table, config) => {
const ts = table.split('.');
let schema = 'public';
let shortTable = table;
if (ts.length > 1) {
schema = ts[0];
shortTable = ts[1];
}

let columns = [];
let stream;
let myClient = null;
let pending = null;
let ended = false;
let csvopts = { delimiter: '|' };
let keepS3Files = config.keepS3Files != null ? config.keepS3Files : false;

// Get prefix for S3 file path
let s3prefix =
config.s3prefix ||
process.env.AWS_LAMBDA_FUNCTION_NAME ||
'dw_redshift_ingest';
s3prefix = s3prefix.replace(/^\/*(.*?)\/*$/, '$1'); // Remove leading and trailing '/'

// clean audit date to use in S3 file path
let cleanAuditDate = client.auditdate
.replace(/'/g, '')
.replace(/:/g, '-');
let s3FileName = `files/${s3prefix}/${cleanAuditDate}/${table}.csv`;

client.connect().then(
c => {
client
.describeTable(shortTable, schema)
.then(result => {
if (ended) {
c.release(true);
return;
}
columns = result.map(f => f.column_name);
myClient = c;

stream = ls.toS3(leo.configuration.resources.LeoS3, s3FileName);
// copyFrom uses `end` but s3 `finish` so pipe finish to end
stream.on('finish', () => stream.emit('end'));

stream.on('error', function(err) {
logger.error(`COPY error: ${err.where}`, err);
process.exit();
});
if (pending) {
pending();
}
})
.catch(err => {
logger.error(err);
});
},
err => {
logger.error(err);
}
);

let count = 0;

function nonNull(v) {
if (v === '' || v === null || v === undefined) {
return '\\N';
} else if (typeof v === 'string' && (v.search(/\r/) !== -1 || v.search(/\n/) !== -1)) {
if (config.version !== 'redshift') {
return v.replace(/\r\n?/g, '\n');
} else {
return v.replace(/\r\n?/g, '\n').replace(/\n/g, `\\n`);
}
} else {
return v;
}
}


return ls.pipeline(
csv.createWriteStream({
...csvopts,
headers: false,
transform: (row, done) => {
if (!myClient) {
pending = () => {
done(
null,
columns.map(f => nonNull(row[f]))
);
};
} else {
done(
null,
columns.map(f => nonNull(row[f]))
);
}
},
}),
ls.write(
(r, done) => {
count++;
if (count % 10000 === 0) {
logger.info(table + ': ' + count);
}
if (!stream.write(r)) {
stream.once('drain', done);
} else {
done(null);
}
},
done => {
ended = true;
logger.debug(table + ': stream done');
if (stream) {
stream.on('end', err => {
logger.debug(table + ': stream ended', err || '');

// wrap done callback to release the connection
function innerDone(err) {
myClient.release(true);
logger.debug(table + ': stream client released', err || '');
done(err);
}

if (err) {
innerDone(err);
} else {
// Once the S3 file is complete run copy to load the staging table
let f = columns.map(f => `"${f}"`);
let file = `s3://${leo.configuration.s3}/${s3FileName}`;
let manifest = '';
let role = config.loaderRole;
myClient.query(
`copy ${table} (${f}) from '${file}' ${manifest} ${role ? `credentials 'aws_iam_role=${role}'` : ''
} NULL AS '\\\\N' format csv DELIMITER '|' ACCEPTINVCHARS TRUNCATECOLUMNS ACCEPTANYDATE TIMEFORMAT 'auto' COMPUPDATE OFF`,
copyErr => {
if (keepS3Files) {
innerDone(copyErr);
} else {
// Delete the S3 files when done
ls.s3.deleteObject(
{
Bucket: leo.configuration.s3,
Key: s3FileName,
},
deleteError => {
if (deleteError) {
logger.info(
'file failed to delete:',
s3FileName,
deleteError
);
}
innerDone(copyErr);
}
);
}
}
);
}
});
stream.end();
} else {
done();
}
}
)
);
},
};

Expand Down
Loading