From ab89dacebcf63f9a4d104cc281ef9c832fad941d Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Mon, 5 May 2025 11:44:16 -0400 Subject: [PATCH 1/3] Bug 1964290 - BMO ETL: Update export script to continue on with the max id for certain tables of the same day if the script stopped in the middle for any reason --- docker/bigquery/data.yaml | 4 + extensions/BMO/bin/export_bmo_etl.pl | 236 +++++++++++++++------------ extensions/BMO/t/bmo/bmo_etl.t | 10 -- 3 files changed, 137 insertions(+), 113 deletions(-) diff --git a/docker/bigquery/data.yaml b/docker/bigquery/data.yaml index 13b93aa469..84157bb3a6 100644 --- a/docker/bigquery/data.yaml +++ b/docker/bigquery/data.yaml @@ -85,6 +85,8 @@ projects: type: DATE - id: flags columns: + - name: id + type: INT64 - name: attachment_id type: INT64 - name: bug_id @@ -105,6 +107,8 @@ projects: type: DATE - id: tracking_flags columns: + - name: id + type: INT64 - name: bug_id type: INT64 - name: name diff --git a/extensions/BMO/bin/export_bmo_etl.pl b/extensions/BMO/bin/export_bmo_etl.pl index e25029d6aa..4105ebde5a 100644 --- a/extensions/BMO/bin/export_bmo_etl.pl +++ b/extensions/BMO/bin/export_bmo_etl.pl @@ -34,7 +34,7 @@ # BigQuery API cannot handle payloads larger than 10MB so # we will send data in blocks. -use constant API_BLOCK_COUNT => 1000; +use constant API_BLOCK_COUNT => 1; # Products which we should not send data to ETL such as Legal, etc. use constant EXCLUDE_PRODUCTS => ('Legal',); @@ -94,10 +94,6 @@ # Bugs that are private to one or more groups our %private_bugs = (); -# In order to avoid entering duplicate data, we will first query BigQuery -# to make sure other entries with this date are not already present. -check_for_duplicates(); - # Process each table to be sent to ETL process_bugs(); process_attachments(); @@ -144,13 +140,16 @@ sub process_bugs { my $sth = $dbh->prepare( - 'SELECT bug_id AS id, delta_ts AS modification_time FROM bugs ORDER BY bug_id LIMIT ? OFFSET ?' + 'SELECT bug_id AS id, delta_ts AS modification_time FROM bugs WHERE bug_id > ? ORDER BY bug_id LIMIT ? OFFSET ?' ); + # Retrieve the max ID from BQ in case we didn'complete last time + my $max_id = get_max_id($table_name); + while ($count < $total) { my @bugs = (); - $sth->execute(API_BLOCK_COUNT, $last_offset); + $sth->execute($max_id, API_BLOCK_COUNT, $last_offset); while (my ($id, $mod_time) = $sth->fetchrow_array()) { logger("Processing id $id with mod_time of $mod_time."); @@ -159,7 +158,8 @@ sub process_bugs { my $data = get_cache($id, $table_name, $mod_time); if (!$data) { - logger("$table_name id $id with time $mod_time not found in cache.", DEBUG_OUTPUT); + logger("$table_name id $id with time $mod_time not found in cache.", + DEBUG_OUTPUT); my $obj = Bugzilla::Bug->new($id); @@ -254,13 +254,16 @@ sub process_attachments { my $sth = $dbh->prepare( - 'SELECT attach_id, modification_time FROM attachments ORDER BY attach_id LIMIT ? OFFSET ?' + 'SELECT attach_id, modification_time FROM attachments WHERE attach_id > ? ORDER BY attach_id LIMIT ? OFFSET ?' ); + # Retrieve the max ID from BQ in case we didn'complete last time + my $max_id = get_max_id($table_name); + while ($count < $total) { my @results = (); - $sth->execute(API_BLOCK_COUNT, $last_offset); + $sth->execute($max_id, API_BLOCK_COUNT, $last_offset); while (my ($id, $mod_time) = $sth->fetchrow_array()) { logger("Processing id $id with mod_time of $mod_time."); @@ -269,7 +272,8 @@ sub process_attachments { my $data = get_cache($id, $table_name, $mod_time); if (!$data) { - logger("$table_name id $id with time $mod_time not found in cache." , DEBUG_OUTPUT); + logger("$table_name id $id with time $mod_time not found in cache.", + DEBUG_OUTPUT); my $obj = Bugzilla::Attachment->new($id); @@ -318,13 +322,17 @@ sub process_flags { my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM flags'); logger("Processing $total $table_name."); - my $sth = $dbh->prepare( - 'SELECT id, modification_date FROM flags ORDER BY id LIMIT ? OFFSET ?'); + my $max_id = get_max_id($table_name); + + my $sth + = $dbh->prepare( + 'SELECT id, modification_date FROM flags WHERE id > ? ORDER BY id LIMIT ? OFFSET ?' + ); while ($count < $total) { my @results = (); - $sth->execute(API_BLOCK_COUNT, $last_offset); + $sth->execute($max_id, API_BLOCK_COUNT, $last_offset); while (my ($id, $mod_time) = $sth->fetchrow_array()) { logger("Processing id $id with mod_time of $mod_time."); @@ -333,7 +341,8 @@ sub process_flags { my $data = get_cache($id, $table_name, $mod_time); if (!$data) { - logger("$table_name id $id with time $mod_time not found in cache." , DEBUG_OUTPUT); + logger("$table_name id $id with time $mod_time not found in cache.", + DEBUG_OUTPUT); my $obj = Bugzilla::Flag->new($id); @@ -343,6 +352,7 @@ sub process_flags { } $data = { + id => $obj->id, attachment_id => $obj->attach_id || undef, bug_id => $obj->bug_id, creation_ts => $obj->creation_date, @@ -402,7 +412,8 @@ sub process_flag_state_activity { my $data = get_cache($id, $table_name, $mod_time); if (!$data) { - logger("$table_name id $id with time $mod_time not found in cache.", DEBUG_OUTPUT); + logger("$table_name id $id with time $mod_time not found in cache.", + DEBUG_OUTPUT); my $obj = Bugzilla::Extension::Review::FlagStateActivity->new($id); @@ -452,27 +463,30 @@ sub process_tracking_flags { ); logger("Processing $total $table_name."); + my $max_id = get_max_id($table_name); + my $sth = $dbh->prepare( - 'SELECT tracking_flags.name, tracking_flags_bugs.bug_id, tracking_flags_bugs.value + 'SELECT tracking_flags_bugs.id, tracking_flags.name, tracking_flags_bugs.bug_id, tracking_flags_bugs.value FROM tracking_flags_bugs JOIN tracking_flags ON tracking_flags_bugs.tracking_flag_id = tracking_flags.id - ORDER BY tracking_flags_bugs.id LIMIT ? OFFSET ?' + WHERE tracking_flags_bugs.id > ? + ORDER BY tracking_flags_bugs.id LIMIT ? OFFSET ?' ); while ($count < $total) { my @results = (); - $sth->execute(API_BLOCK_COUNT, $last_offset); + $sth->execute($max_id, API_BLOCK_COUNT, $last_offset); - while (my ($name, $bug_id, $value) = $sth->fetchrow_array()) { + while (my ($id, $name, $bug_id, $value) = $sth->fetchrow_array()) { if ($excluded_bugs{$bug_id}) { $count++; next; } # Standard fields - my $data = {bug_id => $bug_id}; + my $data = {id => $id, bug_id => $bug_id}; # Fields that require custom values based on other criteria if (exists $private_bugs{$bug_id}) { @@ -501,6 +515,11 @@ sub process_keywords { my $count = 0; my $last_offset = 0; + if (check_duplicate_data($table_name)) { + logger("Skipping $table_name due to duplicate data"); + return; + } + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM keywords'); logger("Processing $total $table_name."); @@ -601,13 +620,19 @@ sub process_users { my $sth = $dbh->prepare( - 'SELECT userid, modification_ts FROM profiles ORDER BY userid LIMIT ? OFFSET ?' + 'SELECT userid, modification_ts FROM profiles WHERE userid > ? ORDER BY userid LIMIT ? OFFSET ?' ); + my $max_id = get_max_id($table_name); + + logger("max id: $max_id", DEBUG_OUTPUT); + while ($count < $total) { my @users = (); - $sth->execute(API_BLOCK_COUNT, $last_offset); + logger("last offset: $last_offset", DEBUG_OUTPUT); + + $sth->execute($max_id, API_BLOCK_COUNT, $last_offset); while (my ($id, $mod_time) = $sth->fetchrow_array()) { logger("Processing id $id with mod_time of $mod_time."); @@ -620,16 +645,18 @@ sub process_users { my $data = get_cache($id, $table_name, $mod_time); if (!$data) { - logger("$table_name id $id with time $mod_time not found in cache.", DEBUG_OUTPUT); + logger("$table_name id $id with time $mod_time not found in cache.", + DEBUG_OUTPUT); my $obj = Bugzilla::User->new($id); # Standard fields $data = { id => $obj->id, - last_seen => ($obj->last_seen_date ? $obj->last_seen_date . ' 00:00:00' : undef), - email => $obj->email, - is_new => ($obj->is_new ? true : false), + last_seen => + ($obj->last_seen_date ? $obj->last_seen_date . ' 00:00:00' : undef), + email => $obj->email, + is_new => ($obj->is_new ? true : false), }; # Fields that require custom values based on criteria @@ -667,6 +694,11 @@ sub process_two_columns { my $columns_string = join ', ', @{$column_names}; my $order_by = $column_names->[0]; + if (check_duplicate_data($bq_name)) { + logger("Skipping $table_name due to duplicate data"); + return; + } + my $sth = $dbh->prepare( "SELECT $columns_string FROM $table_name ORDER BY $order_by LIMIT ? OFFSET ?"); @@ -705,23 +737,24 @@ sub get_cache { return undef; } - logger("Retreiving data from $table for $id with time $timestamp.", DEBUG_OUTPUT); + logger("Retreiving data from $table for $id with time $timestamp.", + DEBUG_OUTPUT); try { - # Retrieve compressed JSON from cache table if it exists - my $gzipped_data = $dbh->selectrow_array( - 'SELECT data FROM bmo_etl_cache WHERE id = ? AND table_name = ? AND snapshot_date = ?', - undef, $id, $table, $timestamp - ); - return undef if !$gzipped_data; + # Retrieve compressed JSON from cache table if it exists + my $gzipped_data = $dbh->selectrow_array( + 'SELECT data FROM bmo_etl_cache WHERE id = ? AND table_name = ? AND snapshot_date = ?', + undef, $id, $table, $timestamp + ); + return undef if !$gzipped_data; - # First uncompress the JSON and then decode it back to Perl data - my $data; - unless (gunzip \$gzipped_data => \$data) { - delete_lock(); - die "gunzip failed: $GunzipError\n"; - } - return decode_json($data); + # First uncompress the JSON and then decode it back to Perl data + my $data; + unless (gunzip \$gzipped_data => \$data) { + delete_lock(); + die "gunzip failed: $GunzipError\n"; + } + return decode_json($data); } catch { # Log the failure and return undef @@ -773,7 +806,8 @@ sub store_cache { sub send_data { my ($table, $all_rows, $current_count) = @_; - logger('Sending ' . scalar @{$all_rows} . " rows to table $table using BigQuery API"); + logger( + 'Sending ' . scalar @{$all_rows} . " rows to table $table using BigQuery API"); # Add the same snapshot date to every row sent foreach my $row (@{$all_rows}) { @@ -785,7 +819,7 @@ sub send_data { push @json_rows, {json => $row}; } - my $big_query = {rows => \@json_rows}; + my $query = {rows => \@json_rows}; if ($test) { my $filename @@ -797,7 +831,7 @@ sub send_data { logger("Writing data to $filename."); my $fh = path($filename)->open('>>'); - print $fh encode_json($big_query) . "\n"; + print $fh encode_json($query) . "\n"; unless (close $fh) { delete_lock(); die "Could not close $filename: $!\n"; @@ -806,46 +840,18 @@ sub send_data { return; } - my $http_headers = HTTP::Headers->new; - - # Do not attempt to get access token if running in test environment - if ($base_url !~ /^http:\/\/[^\/]+:9050/) { - my $access_token = _get_access_token(); - $http_headers->header(Authorization => 'Bearer ' . $access_token); - } - - my $full_path = sprintf 'projects/%s/datasets/%s/tables/%s/insertAll', + my $path = sprintf 'projects/%s/datasets/%s/tables/%s/insertAll', $project_id, $dataset_id, $table; - logger("Sending to $base_url/$full_path", DEBUG_OUTPUT); - - my $request = HTTP::Request->new('POST', "$base_url/$full_path", $http_headers); - $request->header('Content-Type' => 'application/json'); - - logger('Encoding content into JSON.', DEBUG_OUTPUT); - - $request->content(encode_json($big_query)); - - logger('Sending request', DEBUG_OUTPUT); - - my $response = $ua->request($request); + my $result = call_big_query('POST', $path, $query); - logger($response->content, DEBUG_OUTPUT); - - my $result = decode_json($response->content); - - if (!$response->is_success - || (exists $result->{insertErrors} && @{$result->{insertErrors}})) - { + if (exists $result->{insertErrors} && @{$result->{insertErrors}}) { delete_lock(); - die "Google Big Query insert failure:\nRequest:\n" - . $request->content - . "\n\nResponse:\n" - . $response->content . "\n"; + die "Google Big Query insert failure: " . encode_json($result); } } -sub _get_access_token { +sub get_access_token { state $access_token; # We should only need to get this once state $token_expiry; @@ -908,7 +914,9 @@ sub check_and_set_lock { logger('Previous lock not found. Setting new one.', DEBUG_OUTPUT); - $dbh_main->do('INSERT INTO bmo_etl_locked (value, creation_ts) VALUES (?, NOW())', undef, 'locked'); + $dbh_main->do( + 'INSERT INTO bmo_etl_locked (value, creation_ts) VALUES (?, NOW())', + undef, 'locked'); } # Delete lock from bmo_etl_locked @@ -917,52 +925,74 @@ sub delete_lock { Bugzilla->dbh_main->do('DELETE FROM bmo_etl_locked'); } -sub check_for_duplicates { - return if $test; # no need if just dumping test files +sub call_big_query { + my ($method, $path, $data) = @_; - logger("Checking for duplicate data for snapshot date $snapshot_date."); + logger("BigQuery request - method: $method, path: $path", DEBUG_OUTPUT); my $http_headers = HTTP::Headers->new; # Do not attempt to get access token if running in test environment if ($base_url !~ /^http:\/\/[^\/]+:9050/) { - my $access_token = _get_access_token(); + my $access_token = get_access_token(); $http_headers->header(Authorization => 'Bearer ' . $access_token); } - my $full_path = "projects/$project_id/queries"; - - logger("Querying $base_url/$full_path", DEBUG_OUTPUT); - - my $query = { - query => - "SELECT count(*) FROM ${project_id}.${dataset_id}.bugs WHERE snapshot_date = '$snapshot_date';", - useLegacySql => false, - }; - - my $request = HTTP::Request->new('POST', "$base_url/$full_path", $http_headers); + my $request = HTTP::Request->new($method, "$base_url/$path", $http_headers); $request->header('Content-Type' => 'application/json'); - $request->content(encode_json($query)); - logger(encode_json($query), DEBUG_OUTPUT); + logger('Encoding content into JSON.', DEBUG_OUTPUT); + logger(encode_json($data), DEBUG_OUTPUT); + $request->content(encode_json($data)); my $res = $ua->request($request); + logger($res->content, DEBUG_OUTPUT); + if (!$res->is_success) { delete_lock(); die 'Google Big Query query failure: ' . $res->content . "\n"; } - logger($res->content, DEBUG_OUTPUT); - my $result = decode_json($res->content); +} - my $row_count = $result->{rows}->[0]->{f}->[0]->{v}; +sub get_max_id { + my ($table) = @_; - # Do not export if we have any rows with this snapshot date. - if ($row_count) { - delete_lock(); - die "Duplicate data found for snapshot date $snapshot_date\n"; - } + return 0 if $test; # no need if just dumping test files + + logger("Retrieving max id for table $table for snapshot date $snapshot_date."); + + my $query = { + query => + "SELECT max(id) FROM ${project_id}.${dataset_id}.${table} WHERE snapshot_date = '$snapshot_date';", + useLegacySql => false + }; + + my $result = call_big_query('POST', "projects/$project_id/queries", $query); + + return $result->{rows}->[0]->{f}->[0]->{v} || 0; +} + +sub check_duplicate_data { + my ($table) = @_; + + return 0 if $test; # no need if just dumping test files + + logger("Checking duplicate data for table $table for snapshot date $snapshot_date."); + + my $query = { + query => + "SELECT count(*) FROM ${project_id}.${dataset_id}.${table} WHERE snapshot_date = '$snapshot_date';", + useLegacySql => false + }; + + my $result = call_big_query('POST', "projects/$project_id/queries", $query); + + use Mojo::Util qw(dumper); + print STDERR dumper $result; + + return $result->{rows}->[0]->{f}->[0]->{v} || 0; } sub get_multi_group_value { diff --git a/extensions/BMO/t/bmo/bmo_etl.t b/extensions/BMO/t/bmo/bmo_etl.t index 0a7a1234f8..04ee27bac5 100644 --- a/extensions/BMO/t/bmo/bmo_etl.t +++ b/extensions/BMO/t/bmo/bmo_etl.t @@ -199,14 +199,4 @@ $t->post_ok( 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) ->status_is(200)->json_is('/rows/0/f/0/v' => $bug_id_1); -### Section 7: Exporting again on the same day (with the same snapshot date) will cause the script to exit - -@cmd = ( - './extensions/BMO/bin/export_bmo_etl.pl', - '--debug', '--snapshot-date', $snapshot_date, -); - -($output, $error, $rv) = capture { system @cmd; }; -ok($rv, 'Duplicate data exported to BigQuery test instance should fail'); - done_testing; From 106fb6445983e64f6b08dc4b1b916d70ba4e7923 Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Tue, 13 May 2025 11:57:33 -0400 Subject: [PATCH 2/3] More fixes --- extensions/BMO/Extension.pm | 12 --- extensions/BMO/bin/export_bmo_etl.pl | 123 ++++++++++----------------- extensions/BMO/t/bmo/bmo_etl.t | 35 ++++++++ 3 files changed, 80 insertions(+), 90 deletions(-) diff --git a/extensions/BMO/Extension.pm b/extensions/BMO/Extension.pm index 2800825075..52f97a48d0 100755 --- a/extensions/BMO/Extension.pm +++ b/extensions/BMO/Extension.pm @@ -1396,12 +1396,6 @@ sub db_schema_abstract_schema { bmo_etl_cache_uniq_idx => {FIELDS => ['id', 'table_name'], TYPE => 'UNIQUE'} ], }; - $args->{schema}->{bmo_etl_locked} = { - FIELDS => [ - value => {TYPE => 'VARCHAR(20)', NOTNULL => 1,}, - creation_ts => {TYPE => 'DATETIME',}, - ], - }; } sub install_update_db { @@ -1589,12 +1583,6 @@ sub install_update_db { }); } - # Add bmo_etl_locked.creation_ts column - if (!$dbh->bz_column_info('bmo_etl_locked', 'creation_ts')) { - $dbh->bz_add_column('bmo_etl_locked', - 'creation_ts' => {TYPE => 'DATETIME'}); - } - # Add unique index for id and table name for bmo_etl_cache $dbh->bz_add_index('bmo_etl_cache', 'bmo_etl_cache_uniq_idx', {FIELDS => ['id', 'table_name'], TYPE => 'UNIQUE'}); diff --git a/extensions/BMO/bin/export_bmo_etl.pl b/extensions/BMO/bin/export_bmo_etl.pl index 4105ebde5a..bcb404ffc1 100644 --- a/extensions/BMO/bin/export_bmo_etl.pl +++ b/extensions/BMO/bin/export_bmo_etl.pl @@ -19,6 +19,7 @@ use Bugzilla::Group; use Bugzilla::Logging; use Bugzilla::User; +use Bugzilla::Util qw(with_writable_database); use Bugzilla::Extension::Review::FlagStateActivity; use HTTP::Headers; @@ -65,11 +66,9 @@ my $dataset_id = Bugzilla->params->{bmo_etl_dataset_id}; $dataset_id || die "Invalid BigQuery dataset ID.\n"; -# Check to make sure another instance is not currently running -check_and_set_lock(); - # Use replica if available my $dbh = Bugzilla->switch_to_shadow_db(); +$dbh->bz_start_transaction(); my $ua = LWP::UserAgent::Determined->new( agent => 'Bugzilla', @@ -125,8 +124,7 @@ ['bug_id', 'duplicate_of_id'] ); -# If we are done, remove the lock -delete_lock(); +$dbh->bz_commit_transaction(); ### Functions @@ -135,7 +133,11 @@ sub process_bugs { my $count = 0; my $last_offset = 0; - my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM bugs'); + # Retrieve the max ID from BQ in case we didn'complete last time + my $max_id = get_max_id($table_name); + + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM bugs WHERE bug_id > ?', + undef, $max_id); logger("Processing $total $table_name"); my $sth @@ -143,9 +145,6 @@ sub process_bugs { 'SELECT bug_id AS id, delta_ts AS modification_time FROM bugs WHERE bug_id > ? ORDER BY bug_id LIMIT ? OFFSET ?' ); - # Retrieve the max ID from BQ in case we didn'complete last time - my $max_id = get_max_id($table_name); - while ($count < $total) { my @bugs = (); @@ -249,7 +248,13 @@ sub process_attachments { my $count = 0; my $last_offset = 0; - my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM attachments'); + # Retrieve the max ID from BQ in case we didn'complete last time + my $max_id = get_max_id($table_name); + + my $total + = $dbh->selectrow_array( + 'SELECT COUNT(*) FROM attachments WHERE attach_id > ?', + undef, $max_id); logger("Processing $total $table_name."); my $sth @@ -257,9 +262,6 @@ sub process_attachments { 'SELECT attach_id, modification_time FROM attachments WHERE attach_id > ? ORDER BY attach_id LIMIT ? OFFSET ?' ); - # Retrieve the max ID from BQ in case we didn'complete last time - my $max_id = get_max_id($table_name); - while ($count < $total) { my @results = (); @@ -319,11 +321,13 @@ sub process_flags { my $count = 0; my $last_offset = 0; - my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM flags'); - logger("Processing $total $table_name."); - + # Retrieve the max ID from BQ in case we didn'complete last time my $max_id = get_max_id($table_name); + my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM flags WHERE id > ?', + undef, $max_id); + logger("Processing $total $table_name."); + my $sth = $dbh->prepare( 'SELECT id, modification_date FROM flags WHERE id > ? ORDER BY id LIMIT ? OFFSET ?' @@ -454,16 +458,19 @@ sub process_tracking_flags { my $count = 0; my $last_offset = 0; + # Retrieve the max ID from BQ in case we didn'complete last time + my $max_id = get_max_id($table_name); + my $total = $dbh->selectrow_array( 'SELECT COUNT(*) FROM tracking_flags_bugs JOIN tracking_flags ON tracking_flags_bugs.tracking_flag_id = tracking_flags.id - ORDER BY tracking_flags_bugs.bug_id' + WHERE tracking_flags_bugs.id > ? + ORDER BY tracking_flags_bugs.bug_id', undef, $max_id ); - logger("Processing $total $table_name."); - my $max_id = get_max_id($table_name); + logger("Processing $total $table_name."); my $sth = $dbh->prepare( 'SELECT tracking_flags_bugs.id, tracking_flags.name, tracking_flags_bugs.bug_id, tracking_flags_bugs.value @@ -615,7 +622,12 @@ sub process_users { my $count = 0; my $last_offset = 0; - my $total = $dbh->selectrow_array('SELECT COUNT(*) FROM profiles'); + # Retrieve the max ID from BQ in case we didn'complete last time + my $max_id = get_max_id($table_name); + + my $total + = $dbh->selectrow_array('SELECT COUNT(*) FROM profiles WHERE userid > ?', + undef, $max_id); logger("Processing $total $table_name."); my $sth @@ -623,8 +635,6 @@ sub process_users { 'SELECT userid, modification_ts FROM profiles WHERE userid > ? ORDER BY userid LIMIT ? OFFSET ?' ); - my $max_id = get_max_id($table_name); - logger("max id: $max_id", DEBUG_OUTPUT); while ($count < $total) { @@ -751,7 +761,6 @@ sub get_cache { # First uncompress the JSON and then decode it back to Perl data my $data; unless (gunzip \$gzipped_data => \$data) { - delete_lock(); die "gunzip failed: $GunzipError\n"; } return decode_json($data); @@ -779,28 +788,21 @@ sub store_cache { # Compress the JSON to save space in the DB my $gzipped_data; unless (gzip \$data => \$gzipped_data) { - delete_lock(); die "gzip failed: $GzipError\n"; } # We need to use the main DB for write operations - my $main_dbh = Bugzilla->dbh_main; - - try { + with_writable_database { # Clean out outdated JSON - $main_dbh->do('DELETE FROM bmo_etl_cache WHERE id = ? AND table_name = ?', + Bugzilla->dbh->do('DELETE FROM bmo_etl_cache WHERE id = ? AND table_name = ?', undef, $id, $table); # Enter new cached JSON - $main_dbh->do( + Bugzilla->dbh->do( 'INSERT INTO bmo_etl_cache (id, table_name, snapshot_date, data) VALUES (?, ?, ?, ?)', undef, $id, $table, $timestamp, $gzipped_data ); } - catch { - # Log the failure - WARN("ERROR: Unable to store cache data in database: $_"); - } } sub send_data { @@ -833,20 +835,18 @@ sub send_data { my $fh = path($filename)->open('>>'); print $fh encode_json($query) . "\n"; unless (close $fh) { - delete_lock(); die "Could not close $filename: $!\n"; } return; } - my $path = sprintf 'projects/%s/datasets/%s/tables/%s/insertAll', - $project_id, $dataset_id, $table; + my $path = sprintf 'projects/%s/datasets/%s/tables/%s/insertAll', $project_id, + $dataset_id, $table; my $result = call_big_query('POST', $path, $query); if (exists $result->{insertErrors} && @{$result->{insertErrors}}) { - delete_lock(); die "Google Big Query insert failure: " . encode_json($result); } } @@ -863,10 +863,10 @@ sub get_access_token { return $access_token; } - # Google Kubernetes allows for the use of Workload Identity. This allows - # us to link two service accounts together and give special access for applications - # running under Kubernetes. We use the special access to get an OAuth2 access_token - # that can then be used for accessing the the Google API such as BigQuery. +# Google Kubernetes allows for the use of Workload Identity. This allows +# us to link two service accounts together and give special access for applications +# running under Kubernetes. We use the special access to get an OAuth2 access_token +# that can then be used for accessing the the Google API such as BigQuery. my $url = sprintf 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/%s/token', @@ -880,7 +880,6 @@ sub get_access_token { my $res = $ua->request($request); if (!$res->is_success) { - delete_lock(); die 'Google access token failure: ' . $res->content . "\n"; } @@ -893,38 +892,6 @@ sub get_access_token { return $access_token; } -# If a previous process is performing an export to BigQuery, then -# we must check the lock table and exit if true. -sub check_and_set_lock { - return if $test; # No need if just dumping test files - - logger('Checking for previous lock or setting new one', DEBUG_OUTPUT); - - my $dbh_main = Bugzilla->dbh_main; - - # Clear out any locks that are greater than 24h old - $dbh_main->do('DELETE FROM bmo_etl_locked WHERE creation_ts < ' - . $dbh_main->sql_date_math('NOW()', '-', 24, 'HOUR')); - - # Now check for any pre-existing locks and do not proceed if one found - my $locked = $dbh_main->selectrow_array('SELECT COUNT(*) FROM bmo_etl_locked'); - if ($locked) { - die "Another process has set a lock. Exiting\n"; - } - - logger('Previous lock not found. Setting new one.', DEBUG_OUTPUT); - - $dbh_main->do( - 'INSERT INTO bmo_etl_locked (value, creation_ts) VALUES (?, NOW())', - undef, 'locked'); -} - -# Delete lock from bmo_etl_locked -sub delete_lock { - logger("Deleting lock in database."); - Bugzilla->dbh_main->do('DELETE FROM bmo_etl_locked'); -} - sub call_big_query { my ($method, $path, $data) = @_; @@ -949,7 +916,6 @@ sub call_big_query { logger($res->content, DEBUG_OUTPUT); if (!$res->is_success) { - delete_lock(); die 'Google Big Query query failure: ' . $res->content . "\n"; } @@ -959,7 +925,7 @@ sub call_big_query { sub get_max_id { my ($table) = @_; - return 0 if $test; # no need if just dumping test files + return 0 if $test; # no need if just dumping test files logger("Retrieving max id for table $table for snapshot date $snapshot_date."); @@ -977,9 +943,10 @@ sub get_max_id { sub check_duplicate_data { my ($table) = @_; - return 0 if $test; # no need if just dumping test files + return 0 if $test; # no need if just dumping test files - logger("Checking duplicate data for table $table for snapshot date $snapshot_date."); + logger( + "Checking duplicate data for table $table for snapshot date $snapshot_date."); my $query = { query => diff --git a/extensions/BMO/t/bmo/bmo_etl.t b/extensions/BMO/t/bmo/bmo_etl.t index 04ee27bac5..b9159176c9 100644 --- a/extensions/BMO/t/bmo/bmo_etl.t +++ b/extensions/BMO/t/bmo/bmo_etl.t @@ -199,4 +199,39 @@ $t->post_ok( 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) ->status_is(200)->json_is('/rows/0/f/0/v' => $bug_id_1); +### Section: 7 - Add an additional bug and then run the export script again +# Run with the same timestamp to make sure it adds the bug. +# Simulate a script stop and resume condition +$t->post_ok($url + . 'rest/bug' => {'X-Bugzilla-API-Key' => $admin_api_key} => json => + $new_bug_1)->status_is(200)->json_has('/id'); + +my $bug_id_3 = $t->tx->res->json->{id}; + +$t->post_ok($url + . "rest/bug/$bug_id_3/attachment" => + {'X-Bugzilla-API-Key' => $admin_api_key} => json => $new_attach_1) + ->status_is(201)->json_has('/attachments'); + +($attach_id) = keys %{$t->tx->res->json->{attachments}}; + +@cmd = ( + './extensions/BMO/bin/export_bmo_etl.pl', + '--snapshot-date', $snapshot_date, +); + +($output, $error, $rv) = capture { system @cmd; }; +ok(!$rv, 'Data exported to BigQuery test instance without error'); + +$query = { + query => 'SELECT summary FROM test.bugzilla.bugs WHERE id = ' + . $bug_id_3 + . ' AND snapshot_date = \'' + . $snapshot_date . '\';', + useLegacySql => false +}; +$t->post_ok( + 'http://bq:9050/bigquery/v2/projects/test/queries' => json => $query) + ->status_is(200)->json_is('/rows/0/f/0/v' => $new_bug_1->{summary}); + done_testing; From e9d5cf0ac398831f5e332b0d7ae862a6cc1e372a Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Wed, 14 May 2025 17:20:23 -0400 Subject: [PATCH 3/3] Updated main dbh calls for writing to cache table --- extensions/BMO/bin/export_bmo_etl.pl | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/extensions/BMO/bin/export_bmo_etl.pl b/extensions/BMO/bin/export_bmo_etl.pl index bcb404ffc1..905d3a1eda 100644 --- a/extensions/BMO/bin/export_bmo_etl.pl +++ b/extensions/BMO/bin/export_bmo_etl.pl @@ -35,7 +35,7 @@ # BigQuery API cannot handle payloads larger than 10MB so # we will send data in blocks. -use constant API_BLOCK_COUNT => 1; +use constant API_BLOCK_COUNT => 1000; # Products which we should not send data to ETL such as Legal, etc. use constant EXCLUDE_PRODUCTS => ('Legal',); @@ -792,17 +792,29 @@ sub store_cache { } # We need to use the main DB for write operations - with_writable_database { + my $dbh_main = Bugzilla->dbh_main; + try { + $dbh_main->bz_start_transaction; + # Clean out outdated JSON - Bugzilla->dbh->do('DELETE FROM bmo_etl_cache WHERE id = ? AND table_name = ?', + $dbh_main->do('DELETE FROM bmo_etl_cache WHERE id = ? AND table_name = ?', undef, $id, $table); # Enter new cached JSON - Bugzilla->dbh->do( + $dbh_main->do( 'INSERT INTO bmo_etl_cache (id, table_name, snapshot_date, data) VALUES (?, ?, ?, ?)', undef, $id, $table, $timestamp, $gzipped_data ); + + $dbh_main->bz_commit_transaction; } + catch { + $dbh_main->bz_rollback_transaction; + + # Log the failure and return undef + WARN("ERROR: Unable to store cached data into database: $_"); + return undef; + }; } sub send_data { @@ -956,9 +968,6 @@ sub check_duplicate_data { my $result = call_big_query('POST', "projects/$project_id/queries", $query); - use Mojo::Util qw(dumper); - print STDERR dumper $result; - return $result->{rows}->[0]->{f}->[0]->{v} || 0; }