From d7ec7d17190e9dfa50c06938c3f93ba84714e7ad Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 11:40:07 -0300 Subject: [PATCH 01/13] feat(integrations): contact cron for push and pull --- .../reader-activation/class-integrations.php | 4 +- .../integrations/class-contact-cron.php | 474 ++++++++++++++++++ .../integrations/class-contact-pull.php | 325 ------------ .../integrations/class-test-integrations.php | 98 ++-- 4 files changed, 519 insertions(+), 382 deletions(-) create mode 100644 includes/reader-activation/integrations/class-contact-cron.php delete mode 100644 includes/reader-activation/integrations/class-contact-pull.php diff --git a/includes/reader-activation/class-integrations.php b/includes/reader-activation/class-integrations.php index ff2b89d609..611bfe8d28 100644 --- a/includes/reader-activation/class-integrations.php +++ b/includes/reader-activation/class-integrations.php @@ -68,7 +68,7 @@ class Integrations { public static function init() { // Include required files. require_once __DIR__ . '/integrations/class-integration.php'; - require_once __DIR__ . '/integrations/class-contact-pull.php'; + require_once __DIR__ . '/integrations/class-contact-cron.php'; add_action( 'init', [ __CLASS__, 'register_integrations' ], 5 ); add_action( 'init', [ __CLASS__, 'schedule_health_check' ] ); @@ -76,7 +76,7 @@ public static function init() { add_filter( 'newspack_data_events_handler_action_group', [ __CLASS__, 'filter_handler_action_group' ], 10, 3 ); add_filter( 'newspack_action_scheduler_group_labels', [ __CLASS__, 'register_group_labels' ] ); - Integrations\Contact_Pull::init(); + Integrations\Contact_Cron::init(); } /** diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php new file mode 100644 index 0000000000..6b8bea4ee8 --- /dev/null +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -0,0 +1,474 @@ + self::CRON_INTERVAL, + 'display' => __( 'Newspack Contact Cron Interval', 'newspack-plugin' ), + ]; + return $schedules; + } + + /** + * Get the timeout for loopback pull requests. + * + * @return int Timeout in seconds. + */ + private static function get_pull_request_timeout() { + /** + * Newspack Integrations: Filter the max amount of time (in seconds) to allow for a synchronous contact metadata pull request before falling back to async scheduling. + */ + return apply_filters( 'newspack_pull_integration_request_timeout', 1 ); + } + + /** + * Pull contact data from active integrations for the current logged-in user. + * + * If the last pull is older than PULL_SYNC_THRESHOLD (24 h), the pull runs + * synchronously with a time limit. Any integrations that don't finish in time + * are queued for the next cron run. + * + * If the last pull is newer than 24 h (but older than CRON_INTERVAL) the + * user is queued for batch pull. + */ + public static function maybe_pull_contact_data() { + if ( ! is_user_logged_in() ) { + return; + } + + $user = wp_get_current_user(); + $last_pull = (int) get_user_meta( $user->ID, self::LAST_PULL_META, true ); + $age = time() - $last_pull; + + if ( $age < self::CRON_INTERVAL ) { + return; + } + + // Set immediately to prevent concurrent pulls from overlapping page loads. + update_user_meta( $user->ID, self::LAST_PULL_META, time() ); + + // Always enqueue for push. + self::enqueue_for_push( $user->ID ); + + $active_integrations = Integrations::get_active_integrations(); + + // Data is stale (> 24 h) — pull synchronously, schedule leftovers. + if ( $age >= self::PULL_SYNC_THRESHOLD ) { + self::pull_sync( $user->ID, $active_integrations ); + return; + } + + // Data is relatively fresh — enqueue for batch pull. + self::enqueue_for_pull( $user->ID ); + } + + /** + * Run synchronous pull via per-integration loopback requests. + * + * Each integration is pulled via a blocking wp_remote_post to the AJAX + * endpoint with get_pull_request_timeout. If the request completes, the handler + * has already stored the data. If it times out or fails, the user is + * queued for the next cron run. + * + * @param int $user_id WordPress user ID. + * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. + */ + private static function pull_sync( $user_id, $integrations ) { + $failed = []; + + foreach ( $integrations as $id => $integration ) { + $selected_fields = $integration->get_enabled_incoming_fields(); + if ( empty( $selected_fields ) ) { + continue; + } + + $response = self::fire_pull_request( $id ); + + if ( is_wp_error( $response ) || 200 !== wp_remote_retrieve_response_code( $response ) ) { + $error_message = is_wp_error( $response ) ? $response->get_error_message() : 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ); + Logger::log( 'Loopback pull failed for ' . $id . '. Scheduling async. Error: ' . $error_message, self::LOGGER_HEADER ); + $failed[ $id ] = $integration; + } else { + Logger::log( 'Loopback pull succeeded for ' . $id . '.', self::LOGGER_HEADER ); + } + } + + if ( ! empty( $failed ) ) { + self::enqueue_for_pull( $user_id ); + } + } + + /** + * Fire a blocking loopback request to pull data for a single integration. + * + * @param string $integration_id The integration identifier. + * @return array|\WP_Error The response or WP_Error on failure. + */ + private static function fire_pull_request( $integration_id ) { + $url = add_query_arg( + [ + 'action' => self::AJAX_ACTION, + 'nonce' => wp_create_nonce( self::NONCE_ACTION ), + ], + admin_url( 'admin-ajax.php' ) + ); + + return wp_remote_post( + $url, + [ + 'timeout' => self::get_pull_request_timeout(), + 'blocking' => true, + 'body' => [ 'integration_id' => $integration_id ], + 'cookies' => $_COOKIE, // phpcs:ignore + 'sslverify' => apply_filters( 'https_local_ssl_verify', false ), + ] + ); + } + + /** + * Handle the AJAX loopback request for pulling a single integration. + * + * Verifies the nonce, looks up the integration, pulls and stores data, + * then returns a JSON response. + */ + public static function handle_ajax_pull() { + if ( ! isset( $_REQUEST['nonce'] ) || ! wp_verify_nonce( sanitize_text_field( $_REQUEST['nonce'] ), self::NONCE_ACTION ) ) { // phpcs:ignore + wp_send_json_error( 'Invalid nonce.', 403 ); + } + + $integration_id = isset( $_POST['integration_id'] ) ? sanitize_text_field( $_POST['integration_id'] ) : ''; // phpcs:ignore + if ( empty( $integration_id ) ) { + wp_send_json_error( 'Missing integration_id.', 400 ); + } + + $integration = Integrations::get_integration( $integration_id ); + if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) { + wp_send_json_error( 'Integration not found or not enabled.', 404 ); + } + + $user_id = get_current_user_id(); + if ( ! $user_id ) { + wp_send_json_error( 'No user context.', 403 ); + } + + $result = self::pull_single_integration( $user_id, $integration ); + + if ( is_wp_error( $result ) ) { + wp_send_json_error( $result->get_error_message(), 500 ); + } + + wp_send_json_success(); + } + + /** + * Pull data from a single integration and store selected fields. + * + * @param int $user_id WordPress user ID. + * @param \Newspack\Reader_Activation\Integration $integration The integration instance. + * @return true|\WP_Error True on success, WP_Error on failure. + */ + public static function pull_single_integration( $user_id, $integration ) { + $selected_fields = $integration->get_enabled_incoming_fields(); + if ( empty( $selected_fields ) ) { + return new \WP_Error( 'no_selected_incoming_fields', 'No selected incoming fields for ' . $integration->get_id() ); + } + + try { + $data = $integration->pull_contact_data( $user_id ); + + if ( is_wp_error( $data ) ) { + Logger::log( 'Pull error from ' . $integration->get_id() . ': ' . $data->get_error_message(), self::LOGGER_HEADER ); + return $data; + } + + $selected_fields = array_filter( $selected_fields, 'is_string' ); + $selected_keys = array_flip( $selected_fields ); + $data = array_intersect_key( $data, $selected_keys ); + Logger::log( 'Pulled data from ' . $integration->get_id() . ': ' . wp_json_encode( $data ), self::LOGGER_HEADER ); + + foreach ( $data as $key => $value ) { + \Newspack\Reader_Data::update_item( $user_id, $key, wp_json_encode( $value ) ); + } + + return true; + } catch ( \Throwable $e ) { + Logger::log( 'Pull exception from ' . $integration->get_id() . ': ' . $e->getMessage(), self::LOGGER_HEADER ); + return new \WP_Error( 'pull_exception', $e->getMessage() ); + } + } + + /** + * Add a user to the pull queue. + * + * @param int $user_id WordPress user ID. + */ + public static function enqueue_for_pull( $user_id ) { + self::enqueue( self::PULL_QUEUE_OPTION, $user_id ); + } + + /** + * Add a user to the push queue. + * + * @param int $user_id WordPress user ID. + */ + public static function enqueue_for_push( $user_id ) { + self::enqueue( self::PUSH_QUEUE_OPTION, $user_id ); + } + + /** + * Add a user ID to a queue option. + * + * @param string $option The option key. + * @param int $user_id WordPress user ID. + */ + private static function enqueue( $option, $user_id ) { + $queue = get_option( $option, [] ); + if ( ! in_array( $user_id, $queue, true ) ) { + $queue[] = $user_id; + update_option( $option, $queue, false ); + } + } + + /** + * Ensure the recurring cron event is scheduled. + */ + public static function schedule_cron() { + if ( ! wp_next_scheduled( self::CRON_HOOK ) ) { + wp_schedule_event( time(), self::CRON_SCHEDULE, self::CRON_HOOK ); + } + } + + /** + * Handle the recurring cron event. + * + * Processes both pull and push queues. + */ + public static function handle_batch() { + self::handle_batch_pull(); + self::handle_batch_push(); + } + + /** + * Process the pull queue. + * + * Reads the queue, pulls all active integrations for each user, + * and removes successfully processed users. + */ + private static function handle_batch_pull() { + $queue = get_option( self::PULL_QUEUE_OPTION, [] ); + if ( empty( $queue ) ) { + return; + } + + Logger::log( 'Batch pull started for ' . count( $queue ) . ' user(s).', self::LOGGER_HEADER ); + + $active_integrations = Integrations::get_active_integrations(); + if ( empty( $active_integrations ) ) { + Logger::log( 'Batch pull aborted: no active integrations.', self::LOGGER_HEADER ); + return; + } + + $failed_user_ids = []; + + foreach ( $queue as $user_id ) { + if ( ! get_userdata( $user_id ) ) { + Logger::log( 'Batch pull skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); + continue; + } + $user_failed = false; + foreach ( $active_integrations as $integration ) { + $selected_fields = $integration->get_enabled_incoming_fields(); + if ( empty( $selected_fields ) ) { + continue; + } + $result = self::pull_single_integration( $user_id, $integration ); + if ( is_wp_error( $result ) ) { + Logger::error( 'Batch pull failed for user ' . $user_id . ', integration ' . $integration->get_id() . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); + $user_failed = true; + } + } + if ( $user_failed ) { + $failed_user_ids[] = $user_id; + } + } + + self::update_queue_after_processing( self::PULL_QUEUE_OPTION, $queue, $failed_user_ids ); + + if ( ! empty( $failed_user_ids ) ) { + Logger::log( 'Batch pull completed with ' . count( $failed_user_ids ) . ' failed user(s) kept in queue.', self::LOGGER_HEADER ); + } else { + Logger::log( 'Batch pull completed.', self::LOGGER_HEADER ); + } + } + + /** + * Process the push queue. + * + * Reads the queue, fetches fresh contact data for each user, + * and pushes to all active integrations. + */ + private static function handle_batch_push() { + $queue = get_option( self::PUSH_QUEUE_OPTION, [] ); + if ( empty( $queue ) ) { + return; + } + + Logger::log( 'Batch push started for ' . count( $queue ) . ' user(s).', self::LOGGER_HEADER ); + + $failed_user_ids = []; + + foreach ( $queue as $user_id ) { + if ( ! get_userdata( $user_id ) ) { + Logger::log( 'Batch push skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); + continue; + } + $result = Contact_Sync::sync_contact( $user_id, 'Recurring sync routine' ); + if ( is_wp_error( $result ) ) { + Logger::error( 'Batch push failed for user ' . $user_id . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); + $failed_user_ids[] = $user_id; + } + } + + self::update_queue_after_processing( self::PUSH_QUEUE_OPTION, $queue, $failed_user_ids ); + + if ( ! empty( $failed_user_ids ) ) { + Logger::log( 'Batch push completed with ' . count( $failed_user_ids ) . ' failed user(s) kept in queue.', self::LOGGER_HEADER ); + } else { + Logger::log( 'Batch push completed.', self::LOGGER_HEADER ); + } + } + + /** + * Update a queue option after batch processing. + * + * Keeps failed user IDs and any new entries added during processing. + * Removes successfully processed entries. + * + * @param string $option The option key. + * @param array $processed_queue The queue snapshot that was processed. + * @param array $failed_user_ids User IDs that failed processing. + */ + private static function update_queue_after_processing( $option, $processed_queue, $failed_user_ids ) { + $current_queue = get_option( $option, [] ); + $new_entries = array_diff( $current_queue, $processed_queue ); + $remaining = array_unique( array_merge( $failed_user_ids, $new_entries ) ); + + if ( ! empty( $remaining ) ) { + update_option( $option, array_values( $remaining ), false ); + } else { + delete_option( $option ); + } + } +} diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php deleted file mode 100644 index 6015533cc5..0000000000 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ /dev/null @@ -1,325 +0,0 @@ -ID, self::LAST_PULL_META, true ); - $age = time() - $last_pull; - - if ( $age < self::PULL_INTERVAL ) { - return; - } - - // Set immediately to prevent concurrent pulls from overlapping page loads. - update_user_meta( $user->ID, self::LAST_PULL_META, time() ); - - $active_integrations = Integrations::get_active_integrations(); - - // Data is stale (> 24 h) — pull synchronously, schedule leftovers. - if ( $age >= self::PULL_SYNC_THRESHOLD ) { - self::pull_sync( $user->ID, $active_integrations ); - return; - } - - // Data is relatively fresh — schedule all integrations async. - self::schedule_async_pulls( $user->ID, $active_integrations ); - } - - /** - * Run synchronous pull via per-integration loopback requests. - * - * Each integration is pulled via a blocking wp_remote_post to the AJAX - * endpoint with get_pull_request_timeout. If the request completes, the handler - * has already stored the data. If it times out or fails, the integration is - * scheduled via Action Scheduler as a fallback. - * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. - */ - private static function pull_sync( $user_id, $integrations ) { - $failed = []; - - foreach ( $integrations as $id => $integration ) { - $selected_fields = $integration->get_enabled_incoming_fields(); - if ( empty( $selected_fields ) ) { - continue; - } - - $response = self::fire_pull_request( $id ); - - if ( is_wp_error( $response ) || 200 !== wp_remote_retrieve_response_code( $response ) ) { - $error_message = is_wp_error( $response ) ? $response->get_error_message() : 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ); - Logger::log( 'Loopback pull failed for ' . $id . '. Scheduling async. Error: ' . $error_message ); - $failed[ $id ] = $integration; - } else { - Logger::log( 'Loopback pull succeeded for ' . $id . '.' ); - } - } - - if ( ! empty( $failed ) ) { - self::schedule_async_pulls( $user_id, $failed ); - } - } - - /** - * Fire a blocking loopback request to pull data for a single integration. - * - * @param string $integration_id The integration identifier. - * @return array|\WP_Error The response or WP_Error on failure. - */ - private static function fire_pull_request( $integration_id ) { - $url = add_query_arg( - [ - 'action' => self::AJAX_ACTION, - 'nonce' => wp_create_nonce( self::NONCE_ACTION ), - ], - admin_url( 'admin-ajax.php' ) - ); - - return wp_remote_post( - $url, - [ - 'timeout' => self::get_pull_request_timeout(), - 'blocking' => true, - 'body' => [ 'integration_id' => $integration_id ], - 'cookies' => $_COOKIE, // phpcs:ignore - 'sslverify' => apply_filters( 'https_local_ssl_verify', false ), - ] - ); - } - - /** - * Handle the AJAX loopback request for pulling a single integration. - * - * Verifies the nonce, looks up the integration, pulls and stores data, - * then returns a JSON response. - */ - public static function handle_ajax_pull() { - if ( ! isset( $_REQUEST['nonce'] ) || ! wp_verify_nonce( sanitize_text_field( $_REQUEST['nonce'] ), self::NONCE_ACTION ) ) { // phpcs:ignore - wp_send_json_error( 'Invalid nonce.', 403 ); - } - - $integration_id = isset( $_POST['integration_id'] ) ? sanitize_text_field( $_POST['integration_id'] ) : ''; // phpcs:ignore - if ( empty( $integration_id ) ) { - wp_send_json_error( 'Missing integration_id.', 400 ); - } - - $integration = Integrations::get_integration( $integration_id ); - if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) { - wp_send_json_error( 'Integration not found or not enabled.', 404 ); - } - - $user_id = get_current_user_id(); - if ( ! $user_id ) { - wp_send_json_error( 'No user context.', 403 ); - } - - $result = self::pull_single_integration( $user_id, $integration ); - - if ( is_wp_error( $result ) ) { - wp_send_json_error( $result->get_error_message(), 500 ); - } - - wp_send_json_success(); - } - - /** - * Pull data from a single integration and store selected fields. - * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration $integration The integration instance. - * @return true|\WP_Error True on success, WP_Error on failure. - */ - public static function pull_single_integration( $user_id, $integration ) { - $selected_fields = $integration->get_enabled_incoming_fields(); - if ( empty( $selected_fields ) ) { - return new \WP_Error( 'no_selected_incoming_fields', 'No selected incoming fields for ' . $integration->get_id() ); - } - - try { - $data = $integration->pull_contact_data( $user_id ); - - if ( is_wp_error( $data ) ) { - Logger::log( 'Pull error from ' . $integration->get_id() . ': ' . $data->get_error_message() ); - return $data; - } - - $selected_keys = array_flip( $selected_fields ); - $data = array_intersect_key( $data, $selected_keys ); - Logger::log( 'Pulled data from ' . $integration->get_id() . ': ' . wp_json_encode( $data ) ); - - foreach ( $data as $key => $value ) { - \Newspack\Reader_Data::update_item( $user_id, $key, wp_json_encode( $value ) ); - } - - return true; - } catch ( \Throwable $e ) { - Logger::log( 'Pull exception from ' . $integration->get_id() . ': ' . $e->getMessage() ); - return new \WP_Error( 'pull_exception', $e->getMessage() ); - } - } - - /** - * Schedule async Action Scheduler events for pulling integration data. - * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration[] $integrations Integrations to schedule. - */ - private static function schedule_async_pulls( $user_id, $integrations ) { - if ( ! function_exists( 'as_enqueue_async_action' ) ) { - return; - } - - foreach ( $integrations as $integration ) { - $selected_fields = $integration->get_enabled_incoming_fields(); - if ( empty( $selected_fields ) ) { - continue; - } - - $args = [ - [ - 'user_id' => $user_id, - 'integration_id' => $integration->get_id(), - ], - ]; - - $group = Integrations::get_action_group( $integration->get_id() ); - - if ( function_exists( 'as_has_scheduled_action' ) && \as_has_scheduled_action( self::ASYNC_PULL_HOOK, $args, $group ) ) { - continue; - } - - \as_enqueue_async_action( - self::ASYNC_PULL_HOOK, - $args, - $group - ); - } - } - - /** - * Handle an async pull Action Scheduler event. - * - * @param array $args { user_id, integration_id }. - */ - public static function handle_async_pull( $args ) { - $user_id = $args['user_id'] ?? 0; - $integration_id = $args['integration_id'] ?? ''; - - if ( ! $user_id || ! $integration_id ) { - return; - } - - $integration = Integrations::get_integration( $integration_id ); - - if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) { - return; - } - - self::pull_single_integration( $user_id, $integration ); - } -} diff --git a/tests/unit-tests/integrations/class-test-integrations.php b/tests/unit-tests/integrations/class-test-integrations.php index 3e0f92c9bd..dafc98308c 100644 --- a/tests/unit-tests/integrations/class-test-integrations.php +++ b/tests/unit-tests/integrations/class-test-integrations.php @@ -10,7 +10,7 @@ use Newspack\Data_Events; use Newspack\Reader_Activation\Integration; use Newspack\Reader_Activation\Integrations; -use Newspack\Reader_Activation\Integrations\Contact_Pull; +use Newspack\Reader_Activation\Integrations\Contact_Cron; use Sample_Integration; /** @@ -78,7 +78,7 @@ private function reset_handler_map() { */ private function mock_pull_loopback( $user_id ) { $this->loopback_filter = function ( $preempt, $parsed_args, $url ) use ( $user_id ) { - if ( false === strpos( $url, 'action=' . Contact_Pull::AJAX_ACTION ) ) { + if ( false === strpos( $url, 'action=' . Contact_Cron::AJAX_ACTION ) ) { return $preempt; } @@ -89,7 +89,7 @@ private function mock_pull_loopback( $user_id ) { $integration = Integrations::get_integration( $integration_id ); if ( $integration ) { - Contact_Pull::pull_single_integration( $user_id, $integration ); + Contact_Cron::pull_single_integration( $user_id, $integration ); } return [ @@ -356,10 +356,10 @@ public function test_update_incoming_fields_stores_any_keys() { public function test_pull_skipped_when_not_logged_in() { wp_set_current_user( 0 ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // No user meta should be written since no one is logged in. - $users = get_users( [ 'meta_key' => Contact_Pull::LAST_PULL_META ] ); + $users = get_users( [ 'meta_key' => Contact_Cron::LAST_PULL_META ] ); $this->assertEmpty( $users ); } @@ -371,12 +371,12 @@ public function test_pull_throttled_by_interval() { wp_set_current_user( $user_id ); $now = time(); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, $now ); + update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, $now ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // The meta should remain unchanged (not updated to a newer timestamp). - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); + $last_pull = (int) get_user_meta( $user_id, Contact_Cron::LAST_PULL_META, true ); $this->assertSame( $now, $last_pull ); } @@ -388,7 +388,7 @@ public function test_sync_pull_when_data_stale() { wp_set_current_user( $user_id ); // Set last pull to beyond the 24h threshold. - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); // Create an integration that returns data from pull. $integration = new class( 'pull-test', 'Pull Test' ) extends Sample_Integration { @@ -408,14 +408,14 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'pull-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // Verify the data was stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_favorite_color', true ); $this->assertSame( wp_json_encode( 'blue' ), $stored ); // Verify last pull meta was updated. - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); + $last_pull = (int) get_user_meta( $user_id, Contact_Cron::LAST_PULL_META, true ); $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); } @@ -426,7 +426,7 @@ public function test_sync_pull_filters_by_incoming_fields() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'filter-test', 'Filter Test' ) extends Sample_Integration { /** @@ -450,7 +450,7 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'filter-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // a and c should be stored. $this->assertSame( wp_json_encode( 'value_a' ), get_user_meta( $user_id, 'newspack_reader_data_item_field_a', true ) ); @@ -467,7 +467,7 @@ public function test_sync_pull_catches_throwable() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'throw-test', 'Throw Test' ) extends Sample_Integration { /** @@ -487,10 +487,10 @@ public function pull_contact_data( $user_id ) { // Should not throw — the routine catches Throwable. $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // Last pull meta should still have been set. - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); + $last_pull = (int) get_user_meta( $user_id, Contact_Cron::LAST_PULL_META, true ); $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); } @@ -502,7 +502,7 @@ public function test_async_pull_scheduled_when_fresh() { wp_set_current_user( $user_id ); // Last pull 10 minutes ago — past interval but within 24h. - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - 600 ); + update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - 600 ); $integration = new class( 'async-test', 'Async Test' ) extends Sample_Integration { /** @@ -520,26 +520,21 @@ public function pull_contact_data( $user_id ) { Integrations::register( $integration ); Integrations::enable( 'async-test' ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // Data should NOT have been stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_city', true ); $this->assertEmpty( $stored ); - // Verify an AS action was scheduled. - $actions = as_get_scheduled_actions( - [ - 'hook' => Contact_Pull::ASYNC_PULL_HOOK, - 'status' => \ActionScheduler_Store::STATUS_PENDING, - ] - ); - $this->assertNotEmpty( $actions ); + // Verify user was added to the pull queue. + $queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $queue ); } /** - * Test handle_async_pull processes data for a single integration. + * Test handle_batch_pull processes data for queued users. */ - public function test_handle_async_pull() { + public function test_handle_batch_pull() { $user_id = $this->factory()->user->create(); $integration = new class( 'handle-test', 'Handle Test' ) extends Sample_Integration { @@ -558,21 +553,22 @@ public function pull_contact_data( $user_id ) { Integrations::register( $integration ); Integrations::enable( 'handle-test' ); - Contact_Pull::handle_async_pull( - [ - 'user_id' => $user_id, - 'integration_id' => 'handle-test', - ] - ); + // Simulate a queued user. + update_option( Contact_Cron::PULL_QUEUE_OPTION, [ $user_id ] ); + + Contact_Cron::handle_batch(); $stored = get_user_meta( $user_id, 'newspack_reader_data_item_language', true ); $this->assertSame( wp_json_encode( 'PHP' ), $stored ); + + // Queue should be cleared after processing. + $this->assertEmpty( get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ) ); } /** - * Test handle_async_pull skips disabled integration. + * Test handle_batch_pull skips disabled integration. */ - public function test_handle_async_pull_skips_disabled() { + public function test_handle_batch_pull_skips_disabled() { $user_id = $this->factory()->user->create(); $integration = new class( 'disabled-test', 'Disabled Test' ) extends Sample_Integration { @@ -591,12 +587,9 @@ public function pull_contact_data( $user_id ) { Integrations::register( $integration ); // Not enabled. - Contact_Pull::handle_async_pull( - [ - 'user_id' => $user_id, - 'integration_id' => 'disabled-test', - ] - ); + update_option( Contact_Cron::PULL_QUEUE_OPTION, [ $user_id ] ); + + Contact_Cron::handle_batch(); $stored = get_user_meta( $user_id, 'newspack_reader_data_item_pet', true ); $this->assertEmpty( $stored ); @@ -628,7 +621,7 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'first-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // Should have run synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_first_field', true ); @@ -642,7 +635,7 @@ public function test_sync_pull_timeout_schedules_async() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'timeout-test', 'Timeout Test' ) extends Sample_Integration { /** @@ -662,27 +655,22 @@ public function pull_contact_data( $user_id ) { // Simulate a timeout by returning WP_Error from the loopback request. $this->loopback_filter = function ( $preempt, $parsed_args, $url ) { - if ( false === strpos( $url, 'action=' . Contact_Pull::AJAX_ACTION ) ) { + if ( false === strpos( $url, 'action=' . Contact_Cron::AJAX_ACTION ) ) { return $preempt; } return new \WP_Error( 'http_request_failed', 'Connection timed out' ); }; add_filter( 'pre_http_request', $this->loopback_filter, 10, 3 ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_pull_contact_data(); // Data should NOT have been stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_timeout_field', true ); $this->assertEmpty( $stored ); - // Verify an AS action was scheduled as fallback. - $actions = as_get_scheduled_actions( - [ - 'hook' => Contact_Pull::ASYNC_PULL_HOOK, - 'status' => \ActionScheduler_Store::STATUS_PENDING, - ] - ); - $this->assertNotEmpty( $actions ); + // Verify user was added to the pull queue as fallback. + $queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $queue ); } /** @@ -941,7 +929,7 @@ public function pull_contact_data( $user_id ) { // Call pull_single_integration directly — the AJAX handler is thin glue // (nonce + lookup + this call + wp_send_json) and calling it in tests // produces unavoidable output from wp_send_json. - $result = Contact_Pull::pull_single_integration( $user_id, $integration ); + $result = Contact_Cron::pull_single_integration( $user_id, $integration ); $this->assertTrue( $result ); $stored = get_user_meta( $user_id, 'newspack_reader_data_item_ajax_field', true ); From 23fc5dab11ae134c04143478cd2f9d627a8f7291 Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 13:00:17 -0300 Subject: [PATCH 02/13] feat: introduce contact pull retries --- .../reader-activation/class-integrations.php | 1 + .../integrations/class-contact-cron.php | 237 +-------- .../integrations/class-contact-pull.php | 451 ++++++++++++++++++ .../sync/class-contact-sync.php | 26 + .../integrations/class-test-integrations.php | 40 +- 5 files changed, 523 insertions(+), 232 deletions(-) create mode 100644 includes/reader-activation/integrations/class-contact-pull.php diff --git a/includes/reader-activation/class-integrations.php b/includes/reader-activation/class-integrations.php index 611bfe8d28..1ce86b97ad 100644 --- a/includes/reader-activation/class-integrations.php +++ b/includes/reader-activation/class-integrations.php @@ -68,6 +68,7 @@ class Integrations { public static function init() { // Include required files. require_once __DIR__ . '/integrations/class-integration.php'; + require_once __DIR__ . '/integrations/class-contact-pull.php'; require_once __DIR__ . '/integrations/class-contact-cron.php'; add_action( 'init', [ __CLASS__, 'register_integrations' ], 5 ); diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php index 6b8bea4ee8..9948c83f37 100644 --- a/includes/reader-activation/integrations/class-contact-cron.php +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -30,37 +30,6 @@ class Contact_Cron { */ const CRON_INTERVAL = 300; - /** - * Threshold in seconds (24 hours) for synchronous vs async pull. - * - * If the last pull is older than this, the pull runs synchronously. - * Otherwise it is queued for the next cron run. - * - * @var int - */ - const PULL_SYNC_THRESHOLD = 86400; - - /** - * AJAX action name for the loopback pull endpoint. - * - * @var string - */ - const AJAX_ACTION = 'newspack_pull_integration'; - - /** - * Nonce action name for the loopback pull endpoint. - * - * @var string - */ - const NONCE_ACTION = 'newspack_pull_integration_nonce'; - - /** - * User meta key for last pull timestamp. - * - * @var string - */ - const LAST_PULL_META = 'newspack_integrations_last_pull'; - /** * WP-Cron hook for batch processing. * @@ -104,7 +73,6 @@ public static function init() { add_action( 'init', [ __CLASS__, 'maybe_pull_contact_data' ], 20 ); add_action( 'init', [ __CLASS__, 'schedule_cron' ] ); add_action( self::CRON_HOOK, [ __CLASS__, 'handle_batch' ] ); - add_action( 'wp_ajax_' . self::AJAX_ACTION, [ __CLASS__, 'handle_ajax_pull' ] ); } /** @@ -121,192 +89,39 @@ public static function add_cron_schedule( $schedules ) { return $schedules; } - /** - * Get the timeout for loopback pull requests. - * - * @return int Timeout in seconds. - */ - private static function get_pull_request_timeout() { - /** - * Newspack Integrations: Filter the max amount of time (in seconds) to allow for a synchronous contact metadata pull request before falling back to async scheduling. - */ - return apply_filters( 'newspack_pull_integration_request_timeout', 1 ); - } - /** * Pull contact data from active integrations for the current logged-in user. * - * If the last pull is older than PULL_SYNC_THRESHOLD (24 h), the pull runs - * synchronously with a time limit. Any integrations that don't finish in time - * are queued for the next cron run. - * - * If the last pull is newer than 24 h (but older than CRON_INTERVAL) the - * user is queued for batch pull. + * If the last pull is stale (> 24 h), the pull runs synchronously. + * Otherwise the user is queued for both pull and push on the next cron run. */ public static function maybe_pull_contact_data() { if ( ! is_user_logged_in() ) { return; } - $user = wp_get_current_user(); - $last_pull = (int) get_user_meta( $user->ID, self::LAST_PULL_META, true ); - $age = time() - $last_pull; + $user_id = get_current_user_id(); - if ( $age < self::CRON_INTERVAL ) { + if ( ! Contact_Pull::needs_pull( $user_id, self::CRON_INTERVAL ) ) { return; } + $is_stale = Contact_Pull::is_stale( $user_id ); + // Set immediately to prevent concurrent pulls from overlapping page loads. - update_user_meta( $user->ID, self::LAST_PULL_META, time() ); + Contact_Pull::mark_pulled( $user_id ); // Always enqueue for push. - self::enqueue_for_push( $user->ID ); - - $active_integrations = Integrations::get_active_integrations(); + self::enqueue_for_push( $user_id ); - // Data is stale (> 24 h) — pull synchronously, schedule leftovers. - if ( $age >= self::PULL_SYNC_THRESHOLD ) { - self::pull_sync( $user->ID, $active_integrations ); + // Data is stale (> 24 h) — pull synchronously, retries handle leftovers. + if ( $is_stale ) { + Contact_Pull::pull_sync( $user_id, Integrations::get_active_integrations() ); return; } // Data is relatively fresh — enqueue for batch pull. - self::enqueue_for_pull( $user->ID ); - } - - /** - * Run synchronous pull via per-integration loopback requests. - * - * Each integration is pulled via a blocking wp_remote_post to the AJAX - * endpoint with get_pull_request_timeout. If the request completes, the handler - * has already stored the data. If it times out or fails, the user is - * queued for the next cron run. - * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. - */ - private static function pull_sync( $user_id, $integrations ) { - $failed = []; - - foreach ( $integrations as $id => $integration ) { - $selected_fields = $integration->get_enabled_incoming_fields(); - if ( empty( $selected_fields ) ) { - continue; - } - - $response = self::fire_pull_request( $id ); - - if ( is_wp_error( $response ) || 200 !== wp_remote_retrieve_response_code( $response ) ) { - $error_message = is_wp_error( $response ) ? $response->get_error_message() : 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ); - Logger::log( 'Loopback pull failed for ' . $id . '. Scheduling async. Error: ' . $error_message, self::LOGGER_HEADER ); - $failed[ $id ] = $integration; - } else { - Logger::log( 'Loopback pull succeeded for ' . $id . '.', self::LOGGER_HEADER ); - } - } - - if ( ! empty( $failed ) ) { - self::enqueue_for_pull( $user_id ); - } - } - - /** - * Fire a blocking loopback request to pull data for a single integration. - * - * @param string $integration_id The integration identifier. - * @return array|\WP_Error The response or WP_Error on failure. - */ - private static function fire_pull_request( $integration_id ) { - $url = add_query_arg( - [ - 'action' => self::AJAX_ACTION, - 'nonce' => wp_create_nonce( self::NONCE_ACTION ), - ], - admin_url( 'admin-ajax.php' ) - ); - - return wp_remote_post( - $url, - [ - 'timeout' => self::get_pull_request_timeout(), - 'blocking' => true, - 'body' => [ 'integration_id' => $integration_id ], - 'cookies' => $_COOKIE, // phpcs:ignore - 'sslverify' => apply_filters( 'https_local_ssl_verify', false ), - ] - ); - } - - /** - * Handle the AJAX loopback request for pulling a single integration. - * - * Verifies the nonce, looks up the integration, pulls and stores data, - * then returns a JSON response. - */ - public static function handle_ajax_pull() { - if ( ! isset( $_REQUEST['nonce'] ) || ! wp_verify_nonce( sanitize_text_field( $_REQUEST['nonce'] ), self::NONCE_ACTION ) ) { // phpcs:ignore - wp_send_json_error( 'Invalid nonce.', 403 ); - } - - $integration_id = isset( $_POST['integration_id'] ) ? sanitize_text_field( $_POST['integration_id'] ) : ''; // phpcs:ignore - if ( empty( $integration_id ) ) { - wp_send_json_error( 'Missing integration_id.', 400 ); - } - - $integration = Integrations::get_integration( $integration_id ); - if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) { - wp_send_json_error( 'Integration not found or not enabled.', 404 ); - } - - $user_id = get_current_user_id(); - if ( ! $user_id ) { - wp_send_json_error( 'No user context.', 403 ); - } - - $result = self::pull_single_integration( $user_id, $integration ); - - if ( is_wp_error( $result ) ) { - wp_send_json_error( $result->get_error_message(), 500 ); - } - - wp_send_json_success(); - } - - /** - * Pull data from a single integration and store selected fields. - * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration $integration The integration instance. - * @return true|\WP_Error True on success, WP_Error on failure. - */ - public static function pull_single_integration( $user_id, $integration ) { - $selected_fields = $integration->get_enabled_incoming_fields(); - if ( empty( $selected_fields ) ) { - return new \WP_Error( 'no_selected_incoming_fields', 'No selected incoming fields for ' . $integration->get_id() ); - } - - try { - $data = $integration->pull_contact_data( $user_id ); - - if ( is_wp_error( $data ) ) { - Logger::log( 'Pull error from ' . $integration->get_id() . ': ' . $data->get_error_message(), self::LOGGER_HEADER ); - return $data; - } - - $selected_fields = array_filter( $selected_fields, 'is_string' ); - $selected_keys = array_flip( $selected_fields ); - $data = array_intersect_key( $data, $selected_keys ); - Logger::log( 'Pulled data from ' . $integration->get_id() . ': ' . wp_json_encode( $data ), self::LOGGER_HEADER ); - - foreach ( $data as $key => $value ) { - \Newspack\Reader_Data::update_item( $user_id, $key, wp_json_encode( $value ) ); - } - - return true; - } catch ( \Throwable $e ) { - Logger::log( 'Pull exception from ' . $integration->get_id() . ': ' . $e->getMessage(), self::LOGGER_HEADER ); - return new \WP_Error( 'pull_exception', $e->getMessage() ); - } + self::enqueue_for_pull( $user_id ); } /** @@ -374,12 +189,6 @@ private static function handle_batch_pull() { Logger::log( 'Batch pull started for ' . count( $queue ) . ' user(s).', self::LOGGER_HEADER ); - $active_integrations = Integrations::get_active_integrations(); - if ( empty( $active_integrations ) ) { - Logger::log( 'Batch pull aborted: no active integrations.', self::LOGGER_HEADER ); - return; - } - $failed_user_ids = []; foreach ( $queue as $user_id ) { @@ -387,19 +196,13 @@ private static function handle_batch_pull() { Logger::log( 'Batch pull skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); continue; } - $user_failed = false; - foreach ( $active_integrations as $integration ) { - $selected_fields = $integration->get_enabled_incoming_fields(); - if ( empty( $selected_fields ) ) { - continue; - } - $result = self::pull_single_integration( $user_id, $integration ); - if ( is_wp_error( $result ) ) { - Logger::error( 'Batch pull failed for user ' . $user_id . ', integration ' . $integration->get_id() . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); - $user_failed = true; - } + if ( Contact_Pull::has_pending_retries( $user_id ) ) { + Logger::log( 'Batch pull skipping user ' . $user_id . ': pending pull retries.', self::LOGGER_HEADER ); + continue; } - if ( $user_failed ) { + $result = Contact_Pull::pull_all( $user_id ); + if ( is_wp_error( $result ) ) { + Logger::error( 'Batch pull failed for user ' . $user_id . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); $failed_user_ids[] = $user_id; } } @@ -434,6 +237,10 @@ private static function handle_batch_push() { Logger::log( 'Batch push skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); continue; } + if ( Contact_Sync::has_pending_retries( $user_id ) ) { + Logger::log( 'Batch push skipping user ' . $user_id . ': pending sync retries.', self::LOGGER_HEADER ); + continue; + } $result = Contact_Sync::sync_contact( $user_id, 'Recurring sync routine' ); if ( is_wp_error( $result ) ) { Logger::error( 'Batch push failed for user ' . $user_id . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php new file mode 100644 index 0000000000..a2f30a89c7 --- /dev/null +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -0,0 +1,451 @@ += $interval; + } + + /** + * Whether the last pull is stale (older than PULL_SYNC_THRESHOLD). + * + * @param int $user_id WordPress user ID. + * @return bool + */ + public static function is_stale( $user_id ) { + $last_pull = (int) get_user_meta( $user_id, self::LAST_PULL_META, true ); + return ( time() - $last_pull ) >= self::PULL_SYNC_THRESHOLD; + } + + /** + * Mark the user's pull timestamp as now. + * + * @param int $user_id WordPress user ID. + */ + public static function mark_pulled( $user_id ) { + update_user_meta( $user_id, self::LAST_PULL_META, time() ); + } + + /** + * Run synchronous pull via per-integration loopback requests. + * + * Each integration is pulled via a blocking wp_remote_post to the AJAX + * endpoint. If a request fails, the integration is scheduled for retry + * via ActionScheduler. + * + * @param int $user_id WordPress user ID. + * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. + */ + public static function pull_sync( $user_id, $integrations ) { + foreach ( $integrations as $id => $integration ) { + $selected_fields = $integration->get_enabled_incoming_fields(); + if ( empty( $selected_fields ) ) { + continue; + } + + $response = self::fire_pull_request( $id ); + + if ( is_wp_error( $response ) || 200 !== wp_remote_retrieve_response_code( $response ) ) { + $error = is_wp_error( $response ) + ? $response + : new \WP_Error( 'unexpected_response', 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ) ); + Logger::log( 'Loopback pull failed for ' . $id . '. Scheduling retry. Error: ' . $error->get_error_message(), self::LOGGER_HEADER ); + self::schedule_integration_retry( $id, $user_id, 0, $error ); + } else { + Logger::log( 'Loopback pull succeeded for ' . $id . '.', self::LOGGER_HEADER ); + } + } + } + + /** + * Pull all active integrations for a user. + * + * @param int $user_id WordPress user ID. + * @return true|\WP_Error True if all succeeded, WP_Error with combined messages. + */ + public static function pull_all( $user_id ) { + $active_integrations = Integrations::get_active_integrations(); + $errors = []; + + foreach ( $active_integrations as $integration ) { + $result = self::pull_single_integration( $user_id, $integration ); + if ( is_wp_error( $result ) ) { + self::schedule_integration_retry( $integration->get_id(), $user_id, 0, $result ); + $errors[] = sprintf( '[%s] %s', $integration->get_id(), $result->get_error_message() ); + } + } + + if ( ! empty( $errors ) ) { + return new \WP_Error( 'newspack_contact_pull_failed', implode( '; ', $errors ) ); + } + + return true; + } + + /** + * Fire a blocking loopback request to pull data for a single integration. + * + * @param string $integration_id The integration identifier. + * @return array|\WP_Error The response or WP_Error on failure. + */ + private static function fire_pull_request( $integration_id ) { + $url = add_query_arg( + [ + 'action' => self::AJAX_ACTION, + 'nonce' => wp_create_nonce( self::NONCE_ACTION ), + ], + admin_url( 'admin-ajax.php' ) + ); + + return wp_remote_post( + $url, + [ + 'timeout' => self::get_pull_request_timeout(), + 'blocking' => true, + 'body' => [ 'integration_id' => $integration_id ], + 'cookies' => $_COOKIE, // phpcs:ignore + 'sslverify' => apply_filters( 'https_local_ssl_verify', false ), + ] + ); + } + + /** + * Handle the AJAX loopback request for pulling a single integration. + */ + public static function handle_ajax_pull() { + if ( ! isset( $_REQUEST['nonce'] ) || ! wp_verify_nonce( sanitize_text_field( $_REQUEST['nonce'] ), self::NONCE_ACTION ) ) { // phpcs:ignore + wp_send_json_error( 'Invalid nonce.', 403 ); + } + + $integration_id = isset( $_POST['integration_id'] ) ? sanitize_text_field( $_POST['integration_id'] ) : ''; // phpcs:ignore + if ( empty( $integration_id ) ) { + wp_send_json_error( 'Missing integration_id.', 400 ); + } + + $integration = Integrations::get_integration( $integration_id ); + if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) { + wp_send_json_error( 'Integration not found or not enabled.', 404 ); + } + + $user_id = get_current_user_id(); + if ( ! $user_id ) { + wp_send_json_error( 'No user context.', 403 ); + } + + $result = self::pull_single_integration( $user_id, $integration ); + + if ( is_wp_error( $result ) ) { + wp_send_json_error( $result->get_error_message(), 500 ); + } + + wp_send_json_success(); + } + + /** + * Pull data from a single integration and store selected fields. + * + * @param int $user_id WordPress user ID. + * @param \Newspack\Reader_Activation\Integration $integration The integration instance. + * @return true|\WP_Error True on success, WP_Error on failure. + */ + public static function pull_single_integration( $user_id, $integration ) { + $selected_fields = $integration->get_enabled_incoming_fields(); + if ( empty( $selected_fields ) ) { + return new \WP_Error( 'no_selected_incoming_fields', 'No selected incoming fields for ' . $integration->get_id() ); + } + + try { + $data = $integration->pull_contact_data( $user_id ); + + if ( is_wp_error( $data ) ) { + Logger::log( 'Pull error from ' . $integration->get_id() . ': ' . $data->get_error_message(), self::LOGGER_HEADER ); + return $data; + } + + $selected_fields = array_filter( $selected_fields, 'is_string' ); + $selected_keys = array_flip( $selected_fields ); + $data = array_intersect_key( $data, $selected_keys ); + Logger::log( 'Pulled data from ' . $integration->get_id() . ': ' . wp_json_encode( $data ), self::LOGGER_HEADER ); + + foreach ( $data as $key => $value ) { + \Newspack\Reader_Data::update_item( $user_id, $key, wp_json_encode( $value ) ); + } + + return true; + } catch ( \Throwable $e ) { + Logger::log( 'Pull exception from ' . $integration->get_id() . ': ' . $e->getMessage(), self::LOGGER_HEADER ); + return new \WP_Error( 'pull_exception', $e->getMessage() ); + } + } + + /** + * Check if a user has any pending pull retries in ActionScheduler. + * + * @param int $user_id WordPress user ID. + * @return bool True if there are pending retries. + */ + public static function has_pending_retries( $user_id ) { + if ( ! function_exists( 'as_get_scheduled_actions' ) ) { + return false; + } + $actions = \as_get_scheduled_actions( + [ + 'hook' => self::RETRY_HOOK, + 'status' => \ActionScheduler_Store::STATUS_PENDING, + 'per_page' => 1, + ] + ); + foreach ( $actions as $action ) { + $args = $action->get_args(); + if ( ! empty( $args[0]['user_id'] ) && (int) $args[0]['user_id'] === $user_id ) { + return true; + } + } + return false; + } + + /** + * Schedule a retry for a failed integration pull via ActionScheduler. + * + * @param string $integration_id The integration ID. + * @param int $user_id The WordPress user ID. + * @param int $retry_count Current retry count (0 = first failure). + * @param string|\WP_Error $error The error from the failure. + */ + private static function schedule_integration_retry( $integration_id, $user_id, $retry_count, $error ) { + if ( ! function_exists( 'as_schedule_single_action' ) ) { + return; + } + + $user = ! empty( $user_id ) ? get_userdata( $user_id ) : false; + if ( ! $user ) { + static::log( sprintf( 'Cannot schedule pull retry for integration "%s": user %d not found.', $integration_id, $user_id ) ); + return; + } + + $error_message = $error instanceof \WP_Error ? $error->get_error_message() : (string) $error; + + $next_retry = $retry_count + 1; + if ( $next_retry > self::MAX_RETRIES ) { + static::log( + sprintf( + 'Max pull retries (%d) reached for integration "%s" of user %d. Giving up. Last error: %s', + self::MAX_RETRIES, + $integration_id, + $user_id, + $error_message + ) + ); + return; + } + + $backoff_index = min( $retry_count, count( self::RETRY_BACKOFF ) - 1 ); + $backoff_seconds = self::RETRY_BACKOFF[ $backoff_index ]; + + $retry_data = [ + 'integration_id' => $integration_id, + 'user_id' => $user_id, + 'retry_count' => $next_retry, + 'max_retries' => self::MAX_RETRIES, + 'reason' => $error_message, + ]; + + \as_schedule_single_action( + time() + $backoff_seconds, + self::RETRY_HOOK, + [ $retry_data ], + Integrations::get_action_group( $integration_id ) + ); + + static::log( + sprintf( + 'Scheduled pull retry %d/%d for integration "%s" of user %d in %ds. Error: %s', + $next_retry, + self::MAX_RETRIES, + $integration_id, + $user_id, + $backoff_seconds, + $error_message + ) + ); + } + + /** + * Execute an integration pull retry from ActionScheduler. + * + * @param array $retry_data The retry data. + * + * @throws \Exception When the final retry fails, so ActionScheduler marks the action as "failed". + */ + public static function execute_integration_retry( $retry_data ) { + if ( ! is_array( $retry_data ) || empty( $retry_data['integration_id'] ) || empty( $retry_data['user_id'] ) ) { + Logger::log( 'Invalid pull retry data received from Action Scheduler.', self::LOGGER_HEADER, 'error' ); + return; + } + + $integration_id = $retry_data['integration_id']; + $user_id = $retry_data['user_id']; + $retry_count = $retry_data['retry_count'] ?? 1; + + $user = \get_userdata( $user_id ); + if ( ! $user ) { + Logger::log( sprintf( 'User %d not found on pull retry %d.', $user_id, $retry_count ), self::LOGGER_HEADER, 'error' ); + return; + } + + $integration = Integrations::get_integration( $integration_id ); + if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) { + Logger::log( sprintf( 'Integration "%s" not found or not enabled on pull retry %d.', $integration_id, $retry_count ), self::LOGGER_HEADER, 'error' ); + return; + } + + static::log( sprintf( 'Executing pull retry %d/%d for integration "%s" of user %d.', $retry_count, self::MAX_RETRIES, $integration_id, $user_id ) ); + + $result = self::pull_single_integration( $user_id, $integration ); + if ( is_wp_error( $result ) ) { + $error_message = sprintf( + 'Pull retry %d/%d failed for integration "%s" of user %d: %s', + $retry_count, + self::MAX_RETRIES, + $integration_id, + $user_id, + $result->get_error_message() + ); + static::log( $error_message ); + self::schedule_integration_retry( $integration_id, $user_id, $retry_count, $result ); + + if ( $retry_count >= self::MAX_RETRIES ) { + throw new \Exception( esc_html( $error_message ) ); + } + } else { + static::log( + sprintf( + 'Pull retry %d/%d succeeded for integration "%s" of user %d.', + $retry_count, + self::MAX_RETRIES, + $integration_id, + $user_id + ) + ); + } + } +} +Contact_Pull::init_hooks(); diff --git a/includes/reader-activation/sync/class-contact-sync.php b/includes/reader-activation/sync/class-contact-sync.php index 10d2365766..b8b353d289 100644 --- a/includes/reader-activation/sync/class-contact-sync.php +++ b/includes/reader-activation/sync/class-contact-sync.php @@ -429,6 +429,32 @@ public static function execute_integration_retry( $retry_data ) { } } + /** + * Check if a user has any pending sync retries in ActionScheduler. + * + * @param int $user_id WordPress user ID. + * @return bool True if there are pending retries. + */ + public static function has_pending_retries( $user_id ) { + if ( ! function_exists( 'as_get_scheduled_actions' ) ) { + return false; + } + $actions = \as_get_scheduled_actions( + [ + 'hook' => self::RETRY_HOOK, + 'status' => \ActionScheduler_Store::STATUS_PENDING, + 'per_page' => 1, + ] + ); + foreach ( $actions as $action ) { + $args = $action->get_args(); + if ( ! empty( $args[0]['user_id'] ) && (int) $args[0]['user_id'] === $user_id ) { + return true; + } + } + return false; + } + /** * Schedule a future sync. * diff --git a/tests/unit-tests/integrations/class-test-integrations.php b/tests/unit-tests/integrations/class-test-integrations.php index dafc98308c..e474a072c6 100644 --- a/tests/unit-tests/integrations/class-test-integrations.php +++ b/tests/unit-tests/integrations/class-test-integrations.php @@ -11,6 +11,7 @@ use Newspack\Reader_Activation\Integration; use Newspack\Reader_Activation\Integrations; use Newspack\Reader_Activation\Integrations\Contact_Cron; +use Newspack\Reader_Activation\Integrations\Contact_Pull; use Sample_Integration; /** @@ -78,7 +79,7 @@ private function reset_handler_map() { */ private function mock_pull_loopback( $user_id ) { $this->loopback_filter = function ( $preempt, $parsed_args, $url ) use ( $user_id ) { - if ( false === strpos( $url, 'action=' . Contact_Cron::AJAX_ACTION ) ) { + if ( false === strpos( $url, 'action=' . Contact_Pull::AJAX_ACTION ) ) { return $preempt; } @@ -89,7 +90,7 @@ private function mock_pull_loopback( $user_id ) { $integration = Integrations::get_integration( $integration_id ); if ( $integration ) { - Contact_Cron::pull_single_integration( $user_id, $integration ); + Contact_Pull::pull_single_integration( $user_id, $integration ); } return [ @@ -359,7 +360,7 @@ public function test_pull_skipped_when_not_logged_in() { Contact_Cron::maybe_pull_contact_data(); // No user meta should be written since no one is logged in. - $users = get_users( [ 'meta_key' => Contact_Cron::LAST_PULL_META ] ); + $users = get_users( [ 'meta_key' => Contact_Pull::LAST_PULL_META ] ); $this->assertEmpty( $users ); } @@ -371,12 +372,12 @@ public function test_pull_throttled_by_interval() { wp_set_current_user( $user_id ); $now = time(); - update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, $now ); + update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, $now ); Contact_Cron::maybe_pull_contact_data(); // The meta should remain unchanged (not updated to a newer timestamp). - $last_pull = (int) get_user_meta( $user_id, Contact_Cron::LAST_PULL_META, true ); + $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); $this->assertSame( $now, $last_pull ); } @@ -388,7 +389,7 @@ public function test_sync_pull_when_data_stale() { wp_set_current_user( $user_id ); // Set last pull to beyond the 24h threshold. - update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); // Create an integration that returns data from pull. $integration = new class( 'pull-test', 'Pull Test' ) extends Sample_Integration { @@ -415,7 +416,7 @@ public function pull_contact_data( $user_id ) { $this->assertSame( wp_json_encode( 'blue' ), $stored ); // Verify last pull meta was updated. - $last_pull = (int) get_user_meta( $user_id, Contact_Cron::LAST_PULL_META, true ); + $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); } @@ -426,7 +427,7 @@ public function test_sync_pull_filters_by_incoming_fields() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'filter-test', 'Filter Test' ) extends Sample_Integration { /** @@ -467,7 +468,7 @@ public function test_sync_pull_catches_throwable() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'throw-test', 'Throw Test' ) extends Sample_Integration { /** @@ -490,7 +491,7 @@ public function pull_contact_data( $user_id ) { Contact_Cron::maybe_pull_contact_data(); // Last pull meta should still have been set. - $last_pull = (int) get_user_meta( $user_id, Contact_Cron::LAST_PULL_META, true ); + $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); } @@ -502,7 +503,7 @@ public function test_async_pull_scheduled_when_fresh() { wp_set_current_user( $user_id ); // Last pull 10 minutes ago — past interval but within 24h. - update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - 600 ); + update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - 600 ); $integration = new class( 'async-test', 'Async Test' ) extends Sample_Integration { /** @@ -635,7 +636,7 @@ public function test_sync_pull_timeout_schedules_async() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Cron::LAST_PULL_META, time() - Contact_Cron::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'timeout-test', 'Timeout Test' ) extends Sample_Integration { /** @@ -655,7 +656,7 @@ public function pull_contact_data( $user_id ) { // Simulate a timeout by returning WP_Error from the loopback request. $this->loopback_filter = function ( $preempt, $parsed_args, $url ) { - if ( false === strpos( $url, 'action=' . Contact_Cron::AJAX_ACTION ) ) { + if ( false === strpos( $url, 'action=' . Contact_Pull::AJAX_ACTION ) ) { return $preempt; } return new \WP_Error( 'http_request_failed', 'Connection timed out' ); @@ -668,9 +669,14 @@ public function pull_contact_data( $user_id ) { $stored = get_user_meta( $user_id, 'newspack_reader_data_item_timeout_field', true ); $this->assertEmpty( $stored ); - // Verify user was added to the pull queue as fallback. - $queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); - $this->assertContains( $user_id, $queue ); + // Verify a pull retry was scheduled via ActionScheduler. + $actions = as_get_scheduled_actions( + [ + 'hook' => Contact_Pull::RETRY_HOOK, + 'status' => \ActionScheduler_Store::STATUS_PENDING, + ] + ); + $this->assertNotEmpty( $actions ); } /** @@ -929,7 +935,7 @@ public function pull_contact_data( $user_id ) { // Call pull_single_integration directly — the AJAX handler is thin glue // (nonce + lookup + this call + wp_send_json) and calling it in tests // produces unavoidable output from wp_send_json. - $result = Contact_Cron::pull_single_integration( $user_id, $integration ); + $result = Contact_Pull::pull_single_integration( $user_id, $integration ); $this->assertTrue( $result ); $stored = get_user_meta( $user_id, 'newspack_reader_data_item_ajax_field', true ); From ab689862be220a0546ebe821c5cef4d95b3fe578 Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 13:03:25 -0300 Subject: [PATCH 03/13] chore: remove Sync extension --- .../integrations/class-contact-pull.php | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index a2f30a89c7..ed168dc7b9 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -18,13 +18,7 @@ /** * Contact Pull Class. */ -class Contact_Pull extends \Newspack\Reader_Activation\Sync { - /** - * Context of the pull. - * - * @var string - */ - protected static $context = 'Contact Pull'; +class Contact_Pull { /** * Threshold in seconds (24 hours) for synchronous vs async pull. @@ -337,7 +331,7 @@ private static function schedule_integration_retry( $integration_id, $user_id, $ $user = ! empty( $user_id ) ? get_userdata( $user_id ) : false; if ( ! $user ) { - static::log( sprintf( 'Cannot schedule pull retry for integration "%s": user %d not found.', $integration_id, $user_id ) ); + Logger::log( sprintf( 'Cannot schedule pull retry for integration "%s": user %d not found.', $integration_id, $user_id ), self::LOGGER_HEADER ); return; } @@ -345,14 +339,15 @@ private static function schedule_integration_retry( $integration_id, $user_id, $ $next_retry = $retry_count + 1; if ( $next_retry > self::MAX_RETRIES ) { - static::log( + Logger::log( sprintf( 'Max pull retries (%d) reached for integration "%s" of user %d. Giving up. Last error: %s', self::MAX_RETRIES, $integration_id, $user_id, $error_message - ) + ), + self::LOGGER_HEADER ); return; } @@ -375,7 +370,7 @@ private static function schedule_integration_retry( $integration_id, $user_id, $ Integrations::get_action_group( $integration_id ) ); - static::log( + Logger::log( sprintf( 'Scheduled pull retry %d/%d for integration "%s" of user %d in %ds. Error: %s', $next_retry, @@ -384,7 +379,8 @@ private static function schedule_integration_retry( $integration_id, $user_id, $ $user_id, $backoff_seconds, $error_message - ) + ), + self::LOGGER_HEADER ); } @@ -417,7 +413,7 @@ public static function execute_integration_retry( $retry_data ) { return; } - static::log( sprintf( 'Executing pull retry %d/%d for integration "%s" of user %d.', $retry_count, self::MAX_RETRIES, $integration_id, $user_id ) ); + Logger::log( sprintf( 'Executing pull retry %d/%d for integration "%s" of user %d.', $retry_count, self::MAX_RETRIES, $integration_id, $user_id ), self::LOGGER_HEADER ); $result = self::pull_single_integration( $user_id, $integration ); if ( is_wp_error( $result ) ) { @@ -429,21 +425,22 @@ public static function execute_integration_retry( $retry_data ) { $user_id, $result->get_error_message() ); - static::log( $error_message ); + Logger::log( $error_message, self::LOGGER_HEADER ); self::schedule_integration_retry( $integration_id, $user_id, $retry_count, $result ); if ( $retry_count >= self::MAX_RETRIES ) { throw new \Exception( esc_html( $error_message ) ); } } else { - static::log( + Logger::log( sprintf( 'Pull retry %d/%d succeeded for integration "%s" of user %d.', $retry_count, self::MAX_RETRIES, $integration_id, $user_id - ) + ), + self::LOGGER_HEADER ); } } From 5d03ba9ffcb05b0980fe0f62111300d3b0d4eab7 Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 13:58:51 -0300 Subject: [PATCH 04/13] fix: simplify cron run control --- .../integrations/class-contact-cron.php | 32 +++++++++---------- .../integrations/class-contact-pull.php | 32 +++---------------- 2 files changed, 20 insertions(+), 44 deletions(-) diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php index 9948c83f37..b6d13715f7 100644 --- a/includes/reader-activation/integrations/class-contact-cron.php +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -30,6 +30,13 @@ class Contact_Cron { */ const CRON_INTERVAL = 300; + /** + * Last cron run timestamp. + * + * @var int + */ + const LAST_CRON_RUN_META = 'newspack_contact_cron_last_run'; + /** * WP-Cron hook for batch processing. * @@ -70,7 +77,7 @@ class Contact_Cron { */ public static function init() { add_filter( 'cron_schedules', [ __CLASS__, 'add_cron_schedule' ] ); // phpcs:ignore WordPress.WP.CronInterval.ChangeDetected - add_action( 'init', [ __CLASS__, 'maybe_pull_contact_data' ], 20 ); + add_action( 'init', [ __CLASS__, 'maybe_enqueue_contact' ], 20 ); add_action( 'init', [ __CLASS__, 'schedule_cron' ] ); add_action( self::CRON_HOOK, [ __CLASS__, 'handle_batch' ] ); } @@ -90,38 +97,29 @@ public static function add_cron_schedule( $schedules ) { } /** - * Pull contact data from active integrations for the current logged-in user. + * Enqueue contact data for pull and push for the current logged-in user. * * If the last pull is stale (> 24 h), the pull runs synchronously. - * Otherwise the user is queued for both pull and push on the next cron run. */ - public static function maybe_pull_contact_data() { + public static function maybe_enqueue_contact() { if ( ! is_user_logged_in() ) { return; } $user_id = get_current_user_id(); - if ( ! Contact_Pull::needs_pull( $user_id, self::CRON_INTERVAL ) ) { + $last_cron_run = get_option( self::LAST_CRON_RUN_META, 0 ); + if ( time() - $last_cron_run < self::CRON_INTERVAL ) { return; } + update_option( self::LAST_CRON_RUN_META, time() ); - $is_stale = Contact_Pull::is_stale( $user_id ); - - // Set immediately to prevent concurrent pulls from overlapping page loads. - Contact_Pull::mark_pulled( $user_id ); - - // Always enqueue for push. self::enqueue_for_push( $user_id ); + self::enqueue_for_pull( $user_id ); - // Data is stale (> 24 h) — pull synchronously, retries handle leftovers. - if ( $is_stale ) { + if ( Contact_Pull::is_stale( $last_cron_run ) ) { Contact_Pull::pull_sync( $user_id, Integrations::get_active_integrations() ); - return; } - - // Data is relatively fresh — enqueue for batch pull. - self::enqueue_for_pull( $user_id ); } /** diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index ed168dc7b9..27398b91dc 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -107,35 +107,13 @@ private static function get_pull_request_timeout() { } /** - * Determine if a user needs a pull based on the last pull timestamp. + * Whether the timestamp is stale (older than PULL_SYNC_THRESHOLD). * - * @param int $user_id WordPress user ID. - * @param int $interval The minimum interval in seconds between pulls. - * @return bool True if the user needs a pull. - */ - public static function needs_pull( $user_id, $interval ) { - $last_pull = (int) get_user_meta( $user_id, self::LAST_PULL_META, true ); - return ( time() - $last_pull ) >= $interval; - } - - /** - * Whether the last pull is stale (older than PULL_SYNC_THRESHOLD). - * - * @param int $user_id WordPress user ID. - * @return bool - */ - public static function is_stale( $user_id ) { - $last_pull = (int) get_user_meta( $user_id, self::LAST_PULL_META, true ); - return ( time() - $last_pull ) >= self::PULL_SYNC_THRESHOLD; - } - - /** - * Mark the user's pull timestamp as now. - * - * @param int $user_id WordPress user ID. + * @param int $timestamp Timestamp. + * @return bool True if the timestamp is stale. */ - public static function mark_pulled( $user_id ) { - update_user_meta( $user_id, self::LAST_PULL_META, time() ); + public static function is_stale( $timestamp ) { + return ( time() - $timestamp ) >= self::PULL_SYNC_THRESHOLD; } /** From 068d4c63543c4794433ceb2c8117444b3d5923bf Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 14:06:21 -0300 Subject: [PATCH 05/13] fix: per-user throttling --- .../integrations/class-contact-cron.php | 16 ++--- .../integrations/class-test-integrations.php | 70 +++++++++---------- 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php index b6d13715f7..71a57ca9ae 100644 --- a/includes/reader-activation/integrations/class-contact-cron.php +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -31,11 +31,11 @@ class Contact_Cron { const CRON_INTERVAL = 300; /** - * Last cron run timestamp. + * User meta key for last enqueue timestamp. * - * @var int + * @var string */ - const LAST_CRON_RUN_META = 'newspack_contact_cron_last_run'; + const LAST_ENQUEUE_META = 'newspack_contact_cron_last_enqueue'; /** * WP-Cron hook for batch processing. @@ -106,18 +106,18 @@ public static function maybe_enqueue_contact() { return; } - $user_id = get_current_user_id(); + $user_id = get_current_user_id(); + $last_enqueue = (int) get_user_meta( $user_id, self::LAST_ENQUEUE_META, true ); - $last_cron_run = get_option( self::LAST_CRON_RUN_META, 0 ); - if ( time() - $last_cron_run < self::CRON_INTERVAL ) { + if ( ( time() - $last_enqueue ) < self::CRON_INTERVAL ) { return; } - update_option( self::LAST_CRON_RUN_META, time() ); + update_user_meta( $user_id, self::LAST_ENQUEUE_META, time() ); self::enqueue_for_push( $user_id ); self::enqueue_for_pull( $user_id ); - if ( Contact_Pull::is_stale( $last_cron_run ) ) { + if ( Contact_Pull::is_stale( $last_enqueue ) ) { Contact_Pull::pull_sync( $user_id, Integrations::get_active_integrations() ); } } diff --git a/tests/unit-tests/integrations/class-test-integrations.php b/tests/unit-tests/integrations/class-test-integrations.php index e474a072c6..ca7e519553 100644 --- a/tests/unit-tests/integrations/class-test-integrations.php +++ b/tests/unit-tests/integrations/class-test-integrations.php @@ -352,44 +352,44 @@ public function test_update_incoming_fields_stores_any_keys() { } /** - * Test pull is skipped when no user is logged in. + * Test enqueue is skipped when no user is logged in. */ - public function test_pull_skipped_when_not_logged_in() { + public function test_enqueue_skipped_when_not_logged_in() { wp_set_current_user( 0 ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); - // No user meta should be written since no one is logged in. - $users = get_users( [ 'meta_key' => Contact_Pull::LAST_PULL_META ] ); - $this->assertEmpty( $users ); + // No queues should have entries since no one is logged in. + $this->assertEmpty( get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ) ); + $this->assertEmpty( get_option( Contact_Cron::PUSH_QUEUE_OPTION, [] ) ); } /** - * Test pull is throttled by the interval. + * Test enqueue is throttled by the cron interval. */ - public function test_pull_throttled_by_interval() { + public function test_enqueue_throttled_by_interval() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - $now = time(); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, $now ); + // Simulate a recent enqueue for this user. + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); - // The meta should remain unchanged (not updated to a newer timestamp). - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); - $this->assertSame( $now, $last_pull ); + // Queues should remain empty because the interval hasn't elapsed. + $this->assertEmpty( get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ) ); + $this->assertEmpty( get_option( Contact_Cron::PUSH_QUEUE_OPTION, [] ) ); } /** - * Test sync pull runs when data is older than 24 hours. + * Test sync pull runs when last cron run is older than 24 hours. */ public function test_sync_pull_when_data_stale() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - // Set last pull to beyond the 24h threshold. - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + // Set last enqueue to beyond the 24h threshold. + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); // Create an integration that returns data from pull. $integration = new class( 'pull-test', 'Pull Test' ) extends Sample_Integration { @@ -409,15 +409,15 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'pull-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // Verify the data was stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_favorite_color', true ); $this->assertSame( wp_json_encode( 'blue' ), $stored ); - // Verify last pull meta was updated. - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); - $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); + // Verify enqueue timestamp was updated. + $last_enqueue = (int) get_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, true ); + $this->assertGreaterThanOrEqual( time() - 2, $last_enqueue ); } /** @@ -427,7 +427,7 @@ public function test_sync_pull_filters_by_incoming_fields() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'filter-test', 'Filter Test' ) extends Sample_Integration { /** @@ -451,7 +451,7 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'filter-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // a and c should be stored. $this->assertSame( wp_json_encode( 'value_a' ), get_user_meta( $user_id, 'newspack_reader_data_item_field_a', true ) ); @@ -468,7 +468,7 @@ public function test_sync_pull_catches_throwable() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'throw-test', 'Throw Test' ) extends Sample_Integration { /** @@ -488,11 +488,11 @@ public function pull_contact_data( $user_id ) { // Should not throw — the routine catches Throwable. $this->mock_pull_loopback( $user_id ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); - // Last pull meta should still have been set. - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); - $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); + // Enqueue meta should still have been set. + $last_enqueue = (int) get_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, true ); + $this->assertGreaterThanOrEqual( time() - 2, $last_enqueue ); } /** @@ -502,8 +502,8 @@ public function test_async_pull_scheduled_when_fresh() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - // Last pull 10 minutes ago — past interval but within 24h. - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - 600 ); + // Last enqueue 10 minutes ago — past interval but within 24h. + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - 600 ); $integration = new class( 'async-test', 'Async Test' ) extends Sample_Integration { /** @@ -521,7 +521,7 @@ public function pull_contact_data( $user_id ) { Integrations::register( $integration ); Integrations::enable( 'async-test' ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // Data should NOT have been stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_city', true ); @@ -603,7 +603,7 @@ public function test_first_pull_runs_sync() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - // No LAST_PULL_META set — age will be time() - 0, which is > 24h. + // No LAST_ENQUEUE_META set — age will be time() - 0, which is > 24h. $integration = new class( 'first-test', 'First Test' ) extends Sample_Integration { /** @@ -622,7 +622,7 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'first-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // Should have run synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_first_field', true ); @@ -636,7 +636,7 @@ public function test_sync_pull_timeout_schedules_async() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'timeout-test', 'Timeout Test' ) extends Sample_Integration { /** @@ -663,7 +663,7 @@ public function pull_contact_data( $user_id ) { }; add_filter( 'pre_http_request', $this->loopback_filter, 10, 3 ); - Contact_Cron::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // Data should NOT have been stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_timeout_field', true ); From 9bb150bace063aefdca3648d36bb3ba6b1e52c6c Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 14:15:45 -0300 Subject: [PATCH 06/13] fix: simplify sync pull method and remove retry --- .../integrations/class-contact-cron.php | 2 +- .../integrations/class-contact-pull.php | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php index 71a57ca9ae..0f1edf7a23 100644 --- a/includes/reader-activation/integrations/class-contact-cron.php +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -118,7 +118,7 @@ public static function maybe_enqueue_contact() { self::enqueue_for_pull( $user_id ); if ( Contact_Pull::is_stale( $last_enqueue ) ) { - Contact_Pull::pull_sync( $user_id, Integrations::get_active_integrations() ); + Contact_Pull::pull_sync(); } } diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index 27398b91dc..24e35abe1d 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -123,10 +123,13 @@ public static function is_stale( $timestamp ) { * endpoint. If a request fails, the integration is scheduled for retry * via ActionScheduler. * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. + * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. Defaults to all active integrations. */ - public static function pull_sync( $user_id, $integrations ) { + public static function pull_sync( $integrations = [] ) { + if ( empty( $integrations ) ) { + $integrations = Integrations::get_active_integrations(); + } + foreach ( $integrations as $id => $integration ) { $selected_fields = $integration->get_enabled_incoming_fields(); if ( empty( $selected_fields ) ) { @@ -136,11 +139,8 @@ public static function pull_sync( $user_id, $integrations ) { $response = self::fire_pull_request( $id ); if ( is_wp_error( $response ) || 200 !== wp_remote_retrieve_response_code( $response ) ) { - $error = is_wp_error( $response ) - ? $response - : new \WP_Error( 'unexpected_response', 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ) ); - Logger::log( 'Loopback pull failed for ' . $id . '. Scheduling retry. Error: ' . $error->get_error_message(), self::LOGGER_HEADER ); - self::schedule_integration_retry( $id, $user_id, 0, $error ); + $error_message = is_wp_error( $response ) ? $response->get_error_message() : 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ); + Logger::log( 'Loopback pull failed for ' . $id . '. Error: ' . $error_message, self::LOGGER_HEADER ); } else { Logger::log( 'Loopback pull succeeded for ' . $id . '.', self::LOGGER_HEADER ); } From a0bf7bc4d48fc891604b05ea7420f3ccb86f7e4b Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 14:24:48 -0300 Subject: [PATCH 07/13] chore: remove deprecated const --- .../reader-activation/integrations/class-contact-pull.php | 7 ------- 1 file changed, 7 deletions(-) diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index 24e35abe1d..180ab3e4f3 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -44,13 +44,6 @@ class Contact_Pull { */ const NONCE_ACTION = 'newspack_pull_integration_nonce'; - /** - * User meta key for last pull timestamp. - * - * @var string - */ - const LAST_PULL_META = 'newspack_integrations_last_pull'; - /** * ActionScheduler hook for retrying a failed integration pull. */ From 7fb56e0c6c0625226f638fce7b9e5cc126c2c531 Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 14:25:02 -0300 Subject: [PATCH 08/13] chore: update tests --- .../integrations/class-test-integrations.php | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/tests/unit-tests/integrations/class-test-integrations.php b/tests/unit-tests/integrations/class-test-integrations.php index ca7e519553..b233a24069 100644 --- a/tests/unit-tests/integrations/class-test-integrations.php +++ b/tests/unit-tests/integrations/class-test-integrations.php @@ -630,9 +630,9 @@ public function pull_contact_data( $user_id ) { } /** - * Test sync pull schedules async when loopback request fails (simulated timeout). + * Test sync pull failure keeps user in pull queue for batch processing. */ - public function test_sync_pull_timeout_schedules_async() { + public function test_sync_pull_timeout_keeps_user_in_queue() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); @@ -669,14 +669,9 @@ public function pull_contact_data( $user_id ) { $stored = get_user_meta( $user_id, 'newspack_reader_data_item_timeout_field', true ); $this->assertEmpty( $stored ); - // Verify a pull retry was scheduled via ActionScheduler. - $actions = as_get_scheduled_actions( - [ - 'hook' => Contact_Pull::RETRY_HOOK, - 'status' => \ActionScheduler_Store::STATUS_PENDING, - ] - ); - $this->assertNotEmpty( $actions ); + // User should still be in the pull queue for the batch handler to process. + $queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $queue ); } /** From 6630a11f1d76b0a8197fe55e3471fbacd350aa9b Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 15:39:21 -0300 Subject: [PATCH 09/13] feat: simplify queue handling to rely on retries --- .../integrations/class-contact-cron.php | 46 ++----------------- 1 file changed, 4 insertions(+), 42 deletions(-) diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php index 0f1edf7a23..e6ff3f3b3e 100644 --- a/includes/reader-activation/integrations/class-contact-cron.php +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -184,11 +184,10 @@ private static function handle_batch_pull() { if ( empty( $queue ) ) { return; } + delete_option( self::PULL_QUEUE_OPTION ); Logger::log( 'Batch pull started for ' . count( $queue ) . ' user(s).', self::LOGGER_HEADER ); - $failed_user_ids = []; - foreach ( $queue as $user_id ) { if ( ! get_userdata( $user_id ) ) { Logger::log( 'Batch pull skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); @@ -201,17 +200,10 @@ private static function handle_batch_pull() { $result = Contact_Pull::pull_all( $user_id ); if ( is_wp_error( $result ) ) { Logger::error( 'Batch pull failed for user ' . $user_id . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); - $failed_user_ids[] = $user_id; } } - self::update_queue_after_processing( self::PULL_QUEUE_OPTION, $queue, $failed_user_ids ); - - if ( ! empty( $failed_user_ids ) ) { - Logger::log( 'Batch pull completed with ' . count( $failed_user_ids ) . ' failed user(s) kept in queue.', self::LOGGER_HEADER ); - } else { - Logger::log( 'Batch pull completed.', self::LOGGER_HEADER ); - } + Logger::log( 'Batch pull completed.', self::LOGGER_HEADER ); } /** @@ -225,11 +217,10 @@ private static function handle_batch_push() { if ( empty( $queue ) ) { return; } + delete_option( self::PUSH_QUEUE_OPTION ); Logger::log( 'Batch push started for ' . count( $queue ) . ' user(s).', self::LOGGER_HEADER ); - $failed_user_ids = []; - foreach ( $queue as $user_id ) { if ( ! get_userdata( $user_id ) ) { Logger::log( 'Batch push skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); @@ -242,38 +233,9 @@ private static function handle_batch_push() { $result = Contact_Sync::sync_contact( $user_id, 'Recurring sync routine' ); if ( is_wp_error( $result ) ) { Logger::error( 'Batch push failed for user ' . $user_id . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); - $failed_user_ids[] = $user_id; } } - self::update_queue_after_processing( self::PUSH_QUEUE_OPTION, $queue, $failed_user_ids ); - - if ( ! empty( $failed_user_ids ) ) { - Logger::log( 'Batch push completed with ' . count( $failed_user_ids ) . ' failed user(s) kept in queue.', self::LOGGER_HEADER ); - } else { - Logger::log( 'Batch push completed.', self::LOGGER_HEADER ); - } - } - - /** - * Update a queue option after batch processing. - * - * Keeps failed user IDs and any new entries added during processing. - * Removes successfully processed entries. - * - * @param string $option The option key. - * @param array $processed_queue The queue snapshot that was processed. - * @param array $failed_user_ids User IDs that failed processing. - */ - private static function update_queue_after_processing( $option, $processed_queue, $failed_user_ids ) { - $current_queue = get_option( $option, [] ); - $new_entries = array_diff( $current_queue, $processed_queue ); - $remaining = array_unique( array_merge( $failed_user_ids, $new_entries ) ); - - if ( ! empty( $remaining ) ) { - update_option( $option, array_values( $remaining ), false ); - } else { - delete_option( $option ); - } + Logger::log( 'Batch push completed.', self::LOGGER_HEADER ); } } From a7761911757fa4e6bddeeb62dc1db17ed89b8f8c Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 15:40:11 -0300 Subject: [PATCH 10/13] chore: remove unused import --- includes/reader-activation/integrations/class-contact-cron.php | 1 - 1 file changed, 1 deletion(-) diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php index e6ff3f3b3e..fdb577c62f 100644 --- a/includes/reader-activation/integrations/class-contact-cron.php +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -9,7 +9,6 @@ namespace Newspack\Reader_Activation\Integrations; -use Newspack\Reader_Activation\Integrations; use Newspack\Reader_Activation\Contact_Sync; use Newspack\Logger; From 578167d3e8acbda8ee36204403a8757bef7abbee Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 16:25:04 -0300 Subject: [PATCH 11/13] fix: clarify synchronous pull logging for current user --- includes/reader-activation/integrations/class-contact-pull.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index 180ab3e4f3..a4d86dd85b 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -110,7 +110,7 @@ public static function is_stale( $timestamp ) { } /** - * Run synchronous pull via per-integration loopback requests. + * Run synchronous pull for the current user via per-integration loopback requests. * * Each integration is pulled via a blocking wp_remote_post to the AJAX * endpoint. If a request fails, the integration is scheduled for retry @@ -123,6 +123,7 @@ public static function pull_sync( $integrations = [] ) { $integrations = Integrations::get_active_integrations(); } + Logger::log( 'Synchronous pull started for user "' . get_current_user_id() . '".', self::LOGGER_HEADER ); foreach ( $integrations as $id => $integration ) { $selected_fields = $integration->get_enabled_incoming_fields(); if ( empty( $selected_fields ) ) { From 8ce69784ac4f786fd078707775a5698c8aabe23f Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 16:54:21 -0300 Subject: [PATCH 12/13] fix: address PR review feedback - Fix has_pending_retries() to fetch all pending actions (per_page -1) - Fix stale pull not enqueueing for batch on failure - Add pull_sync() error return for cron fallback - Update pull_sync docblock to match behavior - Add deactivation hook and NEWSPACK_CRON_DISABLE support - Only enqueue for pull when not stale; stale failures fall back to queue --- .../integrations/class-contact-cron.php | 23 ++++++++++++++++--- .../integrations/class-contact-pull.php | 16 ++++++++++--- .../sync/class-contact-sync.php | 2 +- .../integrations/class-test-integrations.php | 16 ++++++------- 4 files changed, 42 insertions(+), 15 deletions(-) diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php index fdb577c62f..b41a6071fb 100644 --- a/includes/reader-activation/integrations/class-contact-cron.php +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -114,10 +114,14 @@ public static function maybe_enqueue_contact() { update_user_meta( $user_id, self::LAST_ENQUEUE_META, time() ); self::enqueue_for_push( $user_id ); - self::enqueue_for_pull( $user_id ); if ( Contact_Pull::is_stale( $last_enqueue ) ) { - Contact_Pull::pull_sync(); + $result = Contact_Pull::pull_sync(); + if ( is_wp_error( $result ) ) { + self::enqueue_for_pull( $user_id ); + } + } else { + self::enqueue_for_pull( $user_id ); } } @@ -155,13 +159,26 @@ private static function enqueue( $option, $user_id ) { /** * Ensure the recurring cron event is scheduled. + * + * Respects NEWSPACK_CRON_DISABLE to allow selective disabling. */ public static function schedule_cron() { - if ( ! wp_next_scheduled( self::CRON_HOOK ) ) { + register_deactivation_hook( NEWSPACK_PLUGIN_FILE, [ __CLASS__, 'deactivate_cron' ] ); + + if ( defined( 'NEWSPACK_CRON_DISABLE' ) && is_array( NEWSPACK_CRON_DISABLE ) && in_array( self::CRON_HOOK, NEWSPACK_CRON_DISABLE, true ) ) { + self::deactivate_cron(); + } elseif ( ! wp_next_scheduled( self::CRON_HOOK ) ) { wp_schedule_event( time(), self::CRON_SCHEDULE, self::CRON_HOOK ); } } + /** + * Deactivate the cron event. + */ + public static function deactivate_cron() { + wp_clear_scheduled_hook( self::CRON_HOOK ); + } + /** * Handle the recurring cron event. * diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index a4d86dd85b..4f9acf7c40 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -113,10 +113,11 @@ public static function is_stale( $timestamp ) { * Run synchronous pull for the current user via per-integration loopback requests. * * Each integration is pulled via a blocking wp_remote_post to the AJAX - * endpoint. If a request fails, the integration is scheduled for retry - * via ActionScheduler. + * endpoint. Returns WP_Error if any integration fails, so the caller + * can enqueue the user for the next cron batch. * * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. Defaults to all active integrations. + * @return true|\WP_Error True if all succeeded, WP_Error with combined messages. */ public static function pull_sync( $integrations = [] ) { if ( empty( $integrations ) ) { @@ -124,6 +125,8 @@ public static function pull_sync( $integrations = [] ) { } Logger::log( 'Synchronous pull started for user "' . get_current_user_id() . '".', self::LOGGER_HEADER ); + $errors = []; + foreach ( $integrations as $id => $integration ) { $selected_fields = $integration->get_enabled_incoming_fields(); if ( empty( $selected_fields ) ) { @@ -135,10 +138,17 @@ public static function pull_sync( $integrations = [] ) { if ( is_wp_error( $response ) || 200 !== wp_remote_retrieve_response_code( $response ) ) { $error_message = is_wp_error( $response ) ? $response->get_error_message() : 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ); Logger::log( 'Loopback pull failed for ' . $id . '. Error: ' . $error_message, self::LOGGER_HEADER ); + $errors[] = sprintf( '[%s] %s', $id, $error_message ); } else { Logger::log( 'Loopback pull succeeded for ' . $id . '.', self::LOGGER_HEADER ); } } + + if ( ! empty( $errors ) ) { + return new \WP_Error( 'newspack_sync_pull_failed', implode( '; ', $errors ) ); + } + + return true; } /** @@ -276,7 +286,7 @@ public static function has_pending_retries( $user_id ) { [ 'hook' => self::RETRY_HOOK, 'status' => \ActionScheduler_Store::STATUS_PENDING, - 'per_page' => 1, + 'per_page' => -1, ] ); foreach ( $actions as $action ) { diff --git a/includes/reader-activation/sync/class-contact-sync.php b/includes/reader-activation/sync/class-contact-sync.php index b8b353d289..eacb9c5060 100644 --- a/includes/reader-activation/sync/class-contact-sync.php +++ b/includes/reader-activation/sync/class-contact-sync.php @@ -443,7 +443,7 @@ public static function has_pending_retries( $user_id ) { [ 'hook' => self::RETRY_HOOK, 'status' => \ActionScheduler_Store::STATUS_PENDING, - 'per_page' => 1, + 'per_page' => -1, ] ); foreach ( $actions as $action ) { diff --git a/tests/unit-tests/integrations/class-test-integrations.php b/tests/unit-tests/integrations/class-test-integrations.php index b233a24069..fe0c0b2f5a 100644 --- a/tests/unit-tests/integrations/class-test-integrations.php +++ b/tests/unit-tests/integrations/class-test-integrations.php @@ -630,9 +630,9 @@ public function pull_contact_data( $user_id ) { } /** - * Test sync pull failure keeps user in pull queue for batch processing. + * Test stale sync pull failure enqueues user for batch pull. */ - public function test_sync_pull_timeout_keeps_user_in_queue() { + public function test_stale_sync_pull_failure_enqueues_for_batch() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); @@ -665,13 +665,13 @@ public function pull_contact_data( $user_id ) { Contact_Cron::maybe_enqueue_contact(); - // Data should NOT have been stored synchronously. - $stored = get_user_meta( $user_id, 'newspack_reader_data_item_timeout_field', true ); - $this->assertEmpty( $stored ); + // Stale sync pull failed, user should be enqueued for batch pull. + $pull_queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $pull_queue ); - // User should still be in the pull queue for the batch handler to process. - $queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); - $this->assertContains( $user_id, $queue ); + // User should still be enqueued for push. + $push_queue = get_option( Contact_Cron::PUSH_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $push_queue ); } /** From d80df92b983c898360426d3b0ce8ff12517892c1 Mon Sep 17 00:00:00 2001 From: Miguel Peixe Date: Mon, 30 Mar 2026 17:05:04 -0300 Subject: [PATCH 13/13] fix: skip no-field integrations in pull_all, clear queues in test setUp --- .../reader-activation/integrations/class-contact-pull.php | 4 ++++ tests/unit-tests/integrations/class-test-integrations.php | 2 ++ 2 files changed, 6 insertions(+) diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index 4f9acf7c40..0c4312f4d8 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -162,6 +162,10 @@ public static function pull_all( $user_id ) { $errors = []; foreach ( $active_integrations as $integration ) { + $selected_fields = $integration->get_enabled_incoming_fields(); + if ( empty( $selected_fields ) ) { + continue; + } $result = self::pull_single_integration( $user_id, $integration ); if ( is_wp_error( $result ) ) { self::schedule_integration_retry( $integration->get_id(), $user_id, 0, $result ); diff --git a/tests/unit-tests/integrations/class-test-integrations.php b/tests/unit-tests/integrations/class-test-integrations.php index fe0c0b2f5a..0ed37c9a88 100644 --- a/tests/unit-tests/integrations/class-test-integrations.php +++ b/tests/unit-tests/integrations/class-test-integrations.php @@ -32,6 +32,8 @@ class Test_Integrations extends \WP_UnitTestCase { public function set_up() { parent::set_up(); delete_option( Integrations::OPTION_NAME ); + delete_option( Contact_Cron::PULL_QUEUE_OPTION ); + delete_option( Contact_Cron::PUSH_QUEUE_OPTION ); $this->reset_integrations(); $this->reset_handler_map(); Sample_Integration::reset();