diff --git a/includes/reader-activation/class-integrations.php b/includes/reader-activation/class-integrations.php index ff2b89d609..1ce86b97ad 100644 --- a/includes/reader-activation/class-integrations.php +++ b/includes/reader-activation/class-integrations.php @@ -69,6 +69,7 @@ public static function init() { // Include required files. require_once __DIR__ . '/integrations/class-integration.php'; require_once __DIR__ . '/integrations/class-contact-pull.php'; + require_once __DIR__ . '/integrations/class-contact-cron.php'; add_action( 'init', [ __CLASS__, 'register_integrations' ], 5 ); add_action( 'init', [ __CLASS__, 'schedule_health_check' ] ); @@ -76,7 +77,7 @@ public static function init() { add_filter( 'newspack_data_events_handler_action_group', [ __CLASS__, 'filter_handler_action_group' ], 10, 3 ); add_filter( 'newspack_action_scheduler_group_labels', [ __CLASS__, 'register_group_labels' ] ); - Integrations\Contact_Pull::init(); + Integrations\Contact_Cron::init(); } /** diff --git a/includes/reader-activation/integrations/class-contact-cron.php b/includes/reader-activation/integrations/class-contact-cron.php new file mode 100644 index 0000000000..b41a6071fb --- /dev/null +++ b/includes/reader-activation/integrations/class-contact-cron.php @@ -0,0 +1,257 @@ + self::CRON_INTERVAL, + 'display' => __( 'Newspack Contact Cron Interval', 'newspack-plugin' ), + ]; + return $schedules; + } + + /** + * Enqueue contact data for pull and push for the current logged-in user. + * + * If the last pull is stale (> 24 h), the pull runs synchronously. + */ + public static function maybe_enqueue_contact() { + if ( ! is_user_logged_in() ) { + return; + } + + $user_id = get_current_user_id(); + $last_enqueue = (int) get_user_meta( $user_id, self::LAST_ENQUEUE_META, true ); + + if ( ( time() - $last_enqueue ) < self::CRON_INTERVAL ) { + return; + } + update_user_meta( $user_id, self::LAST_ENQUEUE_META, time() ); + + self::enqueue_for_push( $user_id ); + + if ( Contact_Pull::is_stale( $last_enqueue ) ) { + $result = Contact_Pull::pull_sync(); + if ( is_wp_error( $result ) ) { + self::enqueue_for_pull( $user_id ); + } + } else { + self::enqueue_for_pull( $user_id ); + } + } + + /** + * Add a user to the pull queue. + * + * @param int $user_id WordPress user ID. + */ + public static function enqueue_for_pull( $user_id ) { + self::enqueue( self::PULL_QUEUE_OPTION, $user_id ); + } + + /** + * Add a user to the push queue. + * + * @param int $user_id WordPress user ID. + */ + public static function enqueue_for_push( $user_id ) { + self::enqueue( self::PUSH_QUEUE_OPTION, $user_id ); + } + + /** + * Add a user ID to a queue option. + * + * @param string $option The option key. + * @param int $user_id WordPress user ID. + */ + private static function enqueue( $option, $user_id ) { + $queue = get_option( $option, [] ); + if ( ! in_array( $user_id, $queue, true ) ) { + $queue[] = $user_id; + update_option( $option, $queue, false ); + } + } + + /** + * Ensure the recurring cron event is scheduled. + * + * Respects NEWSPACK_CRON_DISABLE to allow selective disabling. + */ + public static function schedule_cron() { + register_deactivation_hook( NEWSPACK_PLUGIN_FILE, [ __CLASS__, 'deactivate_cron' ] ); + + if ( defined( 'NEWSPACK_CRON_DISABLE' ) && is_array( NEWSPACK_CRON_DISABLE ) && in_array( self::CRON_HOOK, NEWSPACK_CRON_DISABLE, true ) ) { + self::deactivate_cron(); + } elseif ( ! wp_next_scheduled( self::CRON_HOOK ) ) { + wp_schedule_event( time(), self::CRON_SCHEDULE, self::CRON_HOOK ); + } + } + + /** + * Deactivate the cron event. + */ + public static function deactivate_cron() { + wp_clear_scheduled_hook( self::CRON_HOOK ); + } + + /** + * Handle the recurring cron event. + * + * Processes both pull and push queues. + */ + public static function handle_batch() { + self::handle_batch_pull(); + self::handle_batch_push(); + } + + /** + * Process the pull queue. + * + * Reads the queue, pulls all active integrations for each user, + * and removes successfully processed users. + */ + private static function handle_batch_pull() { + $queue = get_option( self::PULL_QUEUE_OPTION, [] ); + if ( empty( $queue ) ) { + return; + } + delete_option( self::PULL_QUEUE_OPTION ); + + Logger::log( 'Batch pull started for ' . count( $queue ) . ' user(s).', self::LOGGER_HEADER ); + + foreach ( $queue as $user_id ) { + if ( ! get_userdata( $user_id ) ) { + Logger::log( 'Batch pull skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); + continue; + } + if ( Contact_Pull::has_pending_retries( $user_id ) ) { + Logger::log( 'Batch pull skipping user ' . $user_id . ': pending pull retries.', self::LOGGER_HEADER ); + continue; + } + $result = Contact_Pull::pull_all( $user_id ); + if ( is_wp_error( $result ) ) { + Logger::error( 'Batch pull failed for user ' . $user_id . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); + } + } + + Logger::log( 'Batch pull completed.', self::LOGGER_HEADER ); + } + + /** + * Process the push queue. + * + * Reads the queue, fetches fresh contact data for each user, + * and pushes to all active integrations. + */ + private static function handle_batch_push() { + $queue = get_option( self::PUSH_QUEUE_OPTION, [] ); + if ( empty( $queue ) ) { + return; + } + delete_option( self::PUSH_QUEUE_OPTION ); + + Logger::log( 'Batch push started for ' . count( $queue ) . ' user(s).', self::LOGGER_HEADER ); + + foreach ( $queue as $user_id ) { + if ( ! get_userdata( $user_id ) ) { + Logger::log( 'Batch push skipping non-existent user ' . $user_id . '.', self::LOGGER_HEADER ); + continue; + } + if ( Contact_Sync::has_pending_retries( $user_id ) ) { + Logger::log( 'Batch push skipping user ' . $user_id . ': pending sync retries.', self::LOGGER_HEADER ); + continue; + } + $result = Contact_Sync::sync_contact( $user_id, 'Recurring sync routine' ); + if ( is_wp_error( $result ) ) { + Logger::error( 'Batch push failed for user ' . $user_id . ': ' . $result->get_error_message(), self::LOGGER_HEADER ); + } + } + + Logger::log( 'Batch push completed.', self::LOGGER_HEADER ); + } +} diff --git a/includes/reader-activation/integrations/class-contact-pull.php b/includes/reader-activation/integrations/class-contact-pull.php index 1c0654faab..acc690b942 100644 --- a/includes/reader-activation/integrations/class-contact-pull.php +++ b/includes/reader-activation/integrations/class-contact-pull.php @@ -1,9 +1,9 @@ ID, self::LAST_PULL_META, true ); - $age = time() - $last_pull; - - if ( $age < self::PULL_INTERVAL ) { - return; - } - - // Set immediately to prevent concurrent pulls from overlapping page loads. - update_user_meta( $user->ID, self::LAST_PULL_META, time() ); - - $active_integrations = Integrations::get_active_integrations(); - - // Data is stale (> 24 h) — pull synchronously, schedule leftovers. - if ( $age >= self::PULL_SYNC_THRESHOLD ) { - self::pull_sync( $user->ID, $active_integrations ); - return; - } - - // Data is relatively fresh — schedule all integrations async. - self::schedule_async_pulls( $user->ID, $active_integrations ); + public static function is_stale( $timestamp ) { + return ( time() - $timestamp ) >= self::PULL_SYNC_THRESHOLD; } /** - * Run synchronous pull via per-integration loopback requests. + * Run synchronous pull for the current user via per-integration loopback requests. * * Each integration is pulled via a blocking wp_remote_post to the AJAX - * endpoint with get_pull_request_timeout. If the request completes, the handler - * has already stored the data. If it times out or fails, the integration is - * scheduled via Action Scheduler as a fallback. + * endpoint. Returns WP_Error if any integration fails, so the caller + * can enqueue the user for the next cron batch. * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. + * @param \Newspack\Reader_Activation\Integration[] $integrations Active integrations to pull from. Defaults to all active integrations. + * @return true|\WP_Error True if all succeeded, WP_Error with combined messages. */ - private static function pull_sync( $user_id, $integrations ) { - $failed = []; + public static function pull_sync( $integrations = [] ) { + if ( empty( $integrations ) ) { + $integrations = Integrations::get_active_integrations(); + } + + Logger::log( 'Synchronous pull started for user "' . get_current_user_id() . '".', self::LOGGER_HEADER ); + $errors = []; foreach ( $integrations as $id => $integration ) { $selected_fields = $integration->get_enabled_incoming_fields(); @@ -153,16 +137,47 @@ private static function pull_sync( $user_id, $integrations ) { if ( is_wp_error( $response ) || 200 !== wp_remote_retrieve_response_code( $response ) ) { $error_message = is_wp_error( $response ) ? $response->get_error_message() : 'Unexpected response code: ' . wp_remote_retrieve_response_code( $response ); - Logger::log( 'Loopback pull failed for ' . $id . '. Scheduling async. Error: ' . $error_message ); - $failed[ $id ] = $integration; + Logger::log( 'Loopback pull failed for ' . $id . '. Error: ' . $error_message, self::LOGGER_HEADER ); + $errors[] = sprintf( '[%s] %s', $id, $error_message ); } else { - Logger::log( 'Loopback pull succeeded for ' . $id . '.' ); + Logger::log( 'Loopback pull succeeded for ' . $id . '.', self::LOGGER_HEADER ); } } - if ( ! empty( $failed ) ) { - self::schedule_async_pulls( $user_id, $failed ); + if ( ! empty( $errors ) ) { + return new \WP_Error( 'newspack_sync_pull_failed', implode( '; ', $errors ) ); } + + return true; + } + + /** + * Pull all active integrations for a user. + * + * @param int $user_id WordPress user ID. + * @return true|\WP_Error True if all succeeded, WP_Error with combined messages. + */ + public static function pull_all( $user_id ) { + $active_integrations = Integrations::get_active_integrations(); + $errors = []; + + foreach ( $active_integrations as $integration ) { + $selected_fields = $integration->get_enabled_incoming_fields(); + if ( empty( $selected_fields ) ) { + continue; + } + $result = self::pull_single_integration( $user_id, $integration ); + if ( is_wp_error( $result ) ) { + self::schedule_integration_retry( $integration->get_id(), $user_id, 0, $result ); + $errors[] = sprintf( '[%s] %s', $integration->get_id(), $result->get_error_message() ); + } + } + + if ( ! empty( $errors ) ) { + return new \WP_Error( 'newspack_contact_pull_failed', implode( '; ', $errors ) ); + } + + return true; } /** @@ -194,9 +209,6 @@ private static function fire_pull_request( $integration_id ) { /** * Handle the AJAX loopback request for pulling a single integration. - * - * Verifies the nonce, looks up the integration, pulls and stores data, - * then returns a JSON response. */ public static function handle_ajax_pull() { if ( ! isset( $_REQUEST['nonce'] ) || ! wp_verify_nonce( sanitize_text_field( $_REQUEST['nonce'] ), self::NONCE_ACTION ) ) { // phpcs:ignore @@ -244,7 +256,7 @@ public static function pull_single_integration( $user_id, $integration ) { $data = $integration->pull_contact_data( $user_id ); if ( is_wp_error( $data ) ) { - Logger::log( 'Pull error from ' . $integration->get_id() . ': ' . $data->get_error_message() ); + Logger::log( 'Pull error from ' . $integration->get_id() . ': ' . $data->get_error_message(), self::LOGGER_HEADER ); return $data; } @@ -265,68 +277,164 @@ function( $field ) { return true; } catch ( \Throwable $e ) { - Logger::log( 'Pull exception from ' . $integration->get_id() . ': ' . $e->getMessage() ); + Logger::log( 'Pull exception from ' . $integration->get_id() . ': ' . $e->getMessage(), self::LOGGER_HEADER ); return new \WP_Error( 'pull_exception', $e->getMessage() ); } } /** - * Schedule async Action Scheduler events for pulling integration data. + * Check if a user has any pending pull retries in ActionScheduler. * - * @param int $user_id WordPress user ID. - * @param \Newspack\Reader_Activation\Integration[] $integrations Integrations to schedule. + * @param int $user_id WordPress user ID. + * @return bool True if there are pending retries. */ - private static function schedule_async_pulls( $user_id, $integrations ) { - if ( ! function_exists( 'as_enqueue_async_action' ) ) { - return; + public static function has_pending_retries( $user_id ) { + if ( ! function_exists( 'as_get_scheduled_actions' ) ) { + return false; } - - foreach ( $integrations as $integration ) { - $selected_fields = $integration->get_enabled_incoming_fields(); - if ( empty( $selected_fields ) ) { - continue; + $actions = \as_get_scheduled_actions( + [ + 'hook' => self::RETRY_HOOK, + 'status' => \ActionScheduler_Store::STATUS_PENDING, + 'per_page' => -1, + ] + ); + foreach ( $actions as $action ) { + $args = $action->get_args(); + if ( ! empty( $args[0]['user_id'] ) && (int) $args[0]['user_id'] === $user_id ) { + return true; } + } + return false; + } - $args = [ - [ - 'user_id' => $user_id, - 'integration_id' => $integration->get_id(), - ], - ]; - - $group = Integrations::get_action_group( $integration->get_id() ); + /** + * Schedule a retry for a failed integration pull via ActionScheduler. + * + * @param string $integration_id The integration ID. + * @param int $user_id The WordPress user ID. + * @param int $retry_count Current retry count (0 = first failure). + * @param string|\WP_Error $error The error from the failure. + */ + private static function schedule_integration_retry( $integration_id, $user_id, $retry_count, $error ) { + if ( ! function_exists( 'as_schedule_single_action' ) ) { + return; + } - if ( function_exists( 'as_has_scheduled_action' ) && \as_has_scheduled_action( self::ASYNC_PULL_HOOK, $args, $group ) ) { - continue; - } + $user = ! empty( $user_id ) ? get_userdata( $user_id ) : false; + if ( ! $user ) { + Logger::log( sprintf( 'Cannot schedule pull retry for integration "%s": user %d not found.', $integration_id, $user_id ), self::LOGGER_HEADER ); + return; + } - \as_enqueue_async_action( - self::ASYNC_PULL_HOOK, - $args, - $group + $error_message = $error instanceof \WP_Error ? $error->get_error_message() : (string) $error; + + $next_retry = $retry_count + 1; + if ( $next_retry > self::MAX_RETRIES ) { + Logger::log( + sprintf( + 'Max pull retries (%d) reached for integration "%s" of user %d. Giving up. Last error: %s', + self::MAX_RETRIES, + $integration_id, + $user_id, + $error_message + ), + self::LOGGER_HEADER ); + return; } + + $backoff_index = min( $retry_count, count( self::RETRY_BACKOFF ) - 1 ); + $backoff_seconds = self::RETRY_BACKOFF[ $backoff_index ]; + + $retry_data = [ + 'integration_id' => $integration_id, + 'user_id' => $user_id, + 'retry_count' => $next_retry, + 'max_retries' => self::MAX_RETRIES, + 'reason' => $error_message, + ]; + + \as_schedule_single_action( + time() + $backoff_seconds, + self::RETRY_HOOK, + [ $retry_data ], + Integrations::get_action_group( $integration_id ) + ); + + Logger::log( + sprintf( + 'Scheduled pull retry %d/%d for integration "%s" of user %d in %ds. Error: %s', + $next_retry, + self::MAX_RETRIES, + $integration_id, + $user_id, + $backoff_seconds, + $error_message + ), + self::LOGGER_HEADER + ); } /** - * Handle an async pull Action Scheduler event. + * Execute an integration pull retry from ActionScheduler. * - * @param array $args { user_id, integration_id }. + * @param array $retry_data The retry data. + * + * @throws \Exception When the final retry fails, so ActionScheduler marks the action as "failed". */ - public static function handle_async_pull( $args ) { - $user_id = $args['user_id'] ?? 0; - $integration_id = $args['integration_id'] ?? ''; + public static function execute_integration_retry( $retry_data ) { + if ( ! is_array( $retry_data ) || empty( $retry_data['integration_id'] ) || empty( $retry_data['user_id'] ) ) { + Logger::log( 'Invalid pull retry data received from Action Scheduler.', self::LOGGER_HEADER, 'error' ); + return; + } + + $integration_id = $retry_data['integration_id']; + $user_id = $retry_data['user_id']; + $retry_count = $retry_data['retry_count'] ?? 1; - if ( ! $user_id || ! $integration_id ) { + $user = \get_userdata( $user_id ); + if ( ! $user ) { + Logger::log( sprintf( 'User %d not found on pull retry %d.', $user_id, $retry_count ), self::LOGGER_HEADER, 'error' ); return; } $integration = Integrations::get_integration( $integration_id ); - if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) { + Logger::log( sprintf( 'Integration "%s" not found or not enabled on pull retry %d.', $integration_id, $retry_count ), self::LOGGER_HEADER, 'error' ); return; } - self::pull_single_integration( $user_id, $integration ); + Logger::log( sprintf( 'Executing pull retry %d/%d for integration "%s" of user %d.', $retry_count, self::MAX_RETRIES, $integration_id, $user_id ), self::LOGGER_HEADER ); + + $result = self::pull_single_integration( $user_id, $integration ); + if ( is_wp_error( $result ) ) { + $error_message = sprintf( + 'Pull retry %d/%d failed for integration "%s" of user %d: %s', + $retry_count, + self::MAX_RETRIES, + $integration_id, + $user_id, + $result->get_error_message() + ); + Logger::log( $error_message, self::LOGGER_HEADER ); + self::schedule_integration_retry( $integration_id, $user_id, $retry_count, $result ); + + if ( $retry_count >= self::MAX_RETRIES ) { + throw new \Exception( esc_html( $error_message ) ); + } + } else { + Logger::log( + sprintf( + 'Pull retry %d/%d succeeded for integration "%s" of user %d.', + $retry_count, + self::MAX_RETRIES, + $integration_id, + $user_id + ), + self::LOGGER_HEADER + ); + } } } +Contact_Pull::init_hooks(); diff --git a/includes/reader-activation/sync/class-contact-sync.php b/includes/reader-activation/sync/class-contact-sync.php index 10d2365766..eacb9c5060 100644 --- a/includes/reader-activation/sync/class-contact-sync.php +++ b/includes/reader-activation/sync/class-contact-sync.php @@ -429,6 +429,32 @@ public static function execute_integration_retry( $retry_data ) { } } + /** + * Check if a user has any pending sync retries in ActionScheduler. + * + * @param int $user_id WordPress user ID. + * @return bool True if there are pending retries. + */ + public static function has_pending_retries( $user_id ) { + if ( ! function_exists( 'as_get_scheduled_actions' ) ) { + return false; + } + $actions = \as_get_scheduled_actions( + [ + 'hook' => self::RETRY_HOOK, + 'status' => \ActionScheduler_Store::STATUS_PENDING, + 'per_page' => -1, + ] + ); + foreach ( $actions as $action ) { + $args = $action->get_args(); + if ( ! empty( $args[0]['user_id'] ) && (int) $args[0]['user_id'] === $user_id ) { + return true; + } + } + return false; + } + /** * Schedule a future sync. * diff --git a/tests/unit-tests/integrations/class-test-integrations.php b/tests/unit-tests/integrations/class-test-integrations.php index 12a48a004b..a61a010c29 100644 --- a/tests/unit-tests/integrations/class-test-integrations.php +++ b/tests/unit-tests/integrations/class-test-integrations.php @@ -10,6 +10,7 @@ use Newspack\Data_Events; use Newspack\Reader_Activation\Integration; use Newspack\Reader_Activation\Integrations; +use Newspack\Reader_Activation\Integrations\Contact_Cron; use Newspack\Reader_Activation\Integrations\Contact_Pull; use Sample_Integration; @@ -31,6 +32,8 @@ class Test_Integrations extends \WP_UnitTestCase { public function set_up() { parent::set_up(); delete_option( Integrations::OPTION_NAME ); + delete_option( Contact_Cron::PULL_QUEUE_OPTION ); + delete_option( Contact_Cron::PUSH_QUEUE_OPTION ); $this->reset_integrations(); $this->reset_handler_map(); Sample_Integration::reset(); @@ -355,44 +358,44 @@ public function test_update_incoming_fields_stores_any_keys() { } /** - * Test pull is skipped when no user is logged in. + * Test enqueue is skipped when no user is logged in. */ - public function test_pull_skipped_when_not_logged_in() { + public function test_enqueue_skipped_when_not_logged_in() { wp_set_current_user( 0 ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); - // No user meta should be written since no one is logged in. - $users = get_users( [ 'meta_key' => Contact_Pull::LAST_PULL_META ] ); - $this->assertEmpty( $users ); + // No queues should have entries since no one is logged in. + $this->assertEmpty( get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ) ); + $this->assertEmpty( get_option( Contact_Cron::PUSH_QUEUE_OPTION, [] ) ); } /** - * Test pull is throttled by the interval. + * Test enqueue is throttled by the cron interval. */ - public function test_pull_throttled_by_interval() { + public function test_enqueue_throttled_by_interval() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - $now = time(); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, $now ); + // Simulate a recent enqueue for this user. + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); - // The meta should remain unchanged (not updated to a newer timestamp). - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); - $this->assertSame( $now, $last_pull ); + // Queues should remain empty because the interval hasn't elapsed. + $this->assertEmpty( get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ) ); + $this->assertEmpty( get_option( Contact_Cron::PUSH_QUEUE_OPTION, [] ) ); } /** - * Test sync pull runs when data is older than 24 hours. + * Test sync pull runs when last cron run is older than 24 hours. */ public function test_sync_pull_when_data_stale() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - // Set last pull to beyond the 24h threshold. - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + // Set last enqueue to beyond the 24h threshold. + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); // Create an integration that returns data from pull. $integration = new class( 'pull-test', 'Pull Test' ) extends Sample_Integration { @@ -412,15 +415,15 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'pull-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // Verify the data was stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_favorite_color', true ); $this->assertSame( wp_json_encode( 'blue' ), $stored ); - // Verify last pull meta was updated. - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); - $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); + // Verify enqueue timestamp was updated. + $last_enqueue = (int) get_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, true ); + $this->assertGreaterThanOrEqual( time() - 2, $last_enqueue ); } /** @@ -430,7 +433,7 @@ public function test_sync_pull_filters_by_incoming_fields() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'filter-test', 'Filter Test' ) extends Sample_Integration { /** @@ -454,7 +457,7 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'filter-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // a and c should be stored. $this->assertSame( wp_json_encode( 'value_a' ), get_user_meta( $user_id, 'newspack_reader_data_item_field_a', true ) ); @@ -471,7 +474,7 @@ public function test_sync_pull_catches_throwable() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'throw-test', 'Throw Test' ) extends Sample_Integration { /** @@ -491,11 +494,11 @@ public function pull_contact_data( $user_id ) { // Should not throw — the routine catches Throwable. $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); - // Last pull meta should still have been set. - $last_pull = (int) get_user_meta( $user_id, Contact_Pull::LAST_PULL_META, true ); - $this->assertGreaterThanOrEqual( time() - 2, $last_pull ); + // Enqueue meta should still have been set. + $last_enqueue = (int) get_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, true ); + $this->assertGreaterThanOrEqual( time() - 2, $last_enqueue ); } /** @@ -505,8 +508,8 @@ public function test_async_pull_scheduled_when_fresh() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - // Last pull 10 minutes ago — past interval but within 24h. - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - 600 ); + // Last enqueue 10 minutes ago — past interval but within 24h. + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - 600 ); $integration = new class( 'async-test', 'Async Test' ) extends Sample_Integration { /** @@ -524,26 +527,21 @@ public function pull_contact_data( $user_id ) { Integrations::register( $integration ); Integrations::enable( 'async-test' ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // Data should NOT have been stored synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_city', true ); $this->assertEmpty( $stored ); - // Verify an AS action was scheduled. - $actions = as_get_scheduled_actions( - [ - 'hook' => Contact_Pull::ASYNC_PULL_HOOK, - 'status' => \ActionScheduler_Store::STATUS_PENDING, - ] - ); - $this->assertNotEmpty( $actions ); + // Verify user was added to the pull queue. + $queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $queue ); } /** - * Test handle_async_pull processes data for a single integration. + * Test handle_batch_pull processes data for queued users. */ - public function test_handle_async_pull() { + public function test_handle_batch_pull() { $user_id = $this->factory()->user->create(); $integration = new class( 'handle-test', 'Handle Test' ) extends Sample_Integration { @@ -562,21 +560,22 @@ public function pull_contact_data( $user_id ) { Integrations::register( $integration ); Integrations::enable( 'handle-test' ); - Contact_Pull::handle_async_pull( - [ - 'user_id' => $user_id, - 'integration_id' => 'handle-test', - ] - ); + // Simulate a queued user. + update_option( Contact_Cron::PULL_QUEUE_OPTION, [ $user_id ] ); + + Contact_Cron::handle_batch(); $stored = get_user_meta( $user_id, 'newspack_reader_data_item_language', true ); $this->assertSame( wp_json_encode( 'PHP' ), $stored ); + + // Queue should be cleared after processing. + $this->assertEmpty( get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ) ); } /** - * Test handle_async_pull skips disabled integration. + * Test handle_batch_pull skips disabled integration. */ - public function test_handle_async_pull_skips_disabled() { + public function test_handle_batch_pull_skips_disabled() { $user_id = $this->factory()->user->create(); $integration = new class( 'disabled-test', 'Disabled Test' ) extends Sample_Integration { @@ -595,12 +594,9 @@ public function pull_contact_data( $user_id ) { Integrations::register( $integration ); // Not enabled. - Contact_Pull::handle_async_pull( - [ - 'user_id' => $user_id, - 'integration_id' => 'disabled-test', - ] - ); + update_option( Contact_Cron::PULL_QUEUE_OPTION, [ $user_id ] ); + + Contact_Cron::handle_batch(); $stored = get_user_meta( $user_id, 'newspack_reader_data_item_pet', true ); $this->assertEmpty( $stored ); @@ -613,7 +609,7 @@ public function test_first_pull_runs_sync() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - // No LAST_PULL_META set — age will be time() - 0, which is > 24h. + // No LAST_ENQUEUE_META set — age will be time() - 0, which is > 24h. $integration = new class( 'first-test', 'First Test' ) extends Sample_Integration { /** @@ -632,7 +628,7 @@ public function pull_contact_data( $user_id ) { Integrations::enable( 'first-test' ); $this->mock_pull_loopback( $user_id ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); // Should have run synchronously. $stored = get_user_meta( $user_id, 'newspack_reader_data_item_first_field', true ); @@ -640,13 +636,13 @@ public function pull_contact_data( $user_id ) { } /** - * Test sync pull schedules async when loopback request fails (simulated timeout). + * Test stale sync pull failure enqueues user for batch pull. */ - public function test_sync_pull_timeout_schedules_async() { + public function test_stale_sync_pull_failure_enqueues_for_batch() { $user_id = $this->factory()->user->create(); wp_set_current_user( $user_id ); - update_user_meta( $user_id, Contact_Pull::LAST_PULL_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); + update_user_meta( $user_id, Contact_Cron::LAST_ENQUEUE_META, time() - Contact_Pull::PULL_SYNC_THRESHOLD - 1 ); $integration = new class( 'timeout-test', 'Timeout Test' ) extends Sample_Integration { /** @@ -673,20 +669,15 @@ public function pull_contact_data( $user_id ) { }; add_filter( 'pre_http_request', $this->loopback_filter, 10, 3 ); - Contact_Pull::maybe_pull_contact_data(); + Contact_Cron::maybe_enqueue_contact(); - // Data should NOT have been stored synchronously. - $stored = get_user_meta( $user_id, 'newspack_reader_data_item_timeout_field', true ); - $this->assertEmpty( $stored ); + // Stale sync pull failed, user should be enqueued for batch pull. + $pull_queue = get_option( Contact_Cron::PULL_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $pull_queue ); - // Verify an AS action was scheduled as fallback. - $actions = as_get_scheduled_actions( - [ - 'hook' => Contact_Pull::ASYNC_PULL_HOOK, - 'status' => \ActionScheduler_Store::STATUS_PENDING, - ] - ); - $this->assertNotEmpty( $actions ); + // User should still be enqueued for push. + $push_queue = get_option( Contact_Cron::PUSH_QUEUE_OPTION, [] ); + $this->assertContains( $user_id, $push_queue ); } /**