Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 234 additions & 15 deletions includes/reader-activation/integrations/class-contact-pull.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class Contact_Pull {
*/
const RETRY_HOOK = 'newspack_contact_pull_retry';

/**
* ActionScheduler hook for retrying a failed bulk integration pull.
*/
const BULK_RETRY_HOOK = 'newspack_bulk_contact_pull_retry';

/**
* Maximum number of retries for a failed integration pull.
*/
Expand All @@ -73,6 +78,7 @@ class Contact_Pull {
public static function init_hooks() {
add_action( 'wp_ajax_' . self::AJAX_ACTION, [ __CLASS__, 'handle_ajax_pull' ] );
add_action( self::RETRY_HOOK, [ __CLASS__, 'execute_integration_retry' ] );
add_action( self::BULK_RETRY_HOOK, [ __CLASS__, 'execute_bulk_retry' ] );
add_filter( 'newspack_action_scheduler_hook_labels', [ __CLASS__, 'register_hook_labels' ] );
}

Expand All @@ -83,7 +89,8 @@ public static function init_hooks() {
* @return array
*/
public static function register_hook_labels( $labels ) {
$labels[ self::RETRY_HOOK ] = __( 'Contact Pull Retry', 'newspack-plugin' );
$labels[ self::RETRY_HOOK ] = __( 'Contact Pull Retry', 'newspack-plugin' );
$labels[ self::BULK_RETRY_HOOK ] = __( 'Bulk Contact Pull Retry', 'newspack-plugin' );
return $labels;
}

Expand Down Expand Up @@ -180,6 +187,55 @@ public static function pull_all( $user_id ) {
return true;
}

/**
* Pull contact data for multiple users from all active integrations in bulk.
*
* Each integration receives all user IDs at once via pull_contacts_data(),
* allowing integrations with native batch read APIs to handle them efficiently.
* Pulled data is filtered by enabled incoming fields and stored via Reader_Data.
*
* @param int[] $user_ids Array of WordPress user IDs.
*
* @return true|\WP_Error True if all succeeded, or WP_Error with combined messages.
*/
public static function bulk_pull_from_integrations( $user_ids ) {
$integrations = Integrations::get_active_integrations();
$errors = [];

foreach ( $integrations as $integration_id => $integration ) {
$selected_fields = $integration->get_enabled_incoming_fields();
if ( empty( $selected_fields ) ) {
continue;
}

Logger::log( sprintf( 'Bulk pulling %d user(s) from integration "%s".', count( $user_ids ), $integration_id ), self::LOGGER_HEADER );

$results = $integration->pull_contacts_data( $user_ids );

if ( \is_wp_error( $results ) ) {
Logger::log( sprintf( 'Bulk pull failed for integration "%s": %s', $integration_id, $results->get_error_message() ), self::LOGGER_HEADER );
self::schedule_bulk_retry( $integration_id, $user_ids, 0, $results );
$errors[] = sprintf( '[%s] Batch failed: %s', $integration_id, $results->get_error_message() );
} else {
foreach ( $results as $user_id => $result ) {
if ( \is_wp_error( $result ) ) {
Logger::log( sprintf( 'Pull failed for user %d from integration "%s": %s', $user_id, $integration_id, $result->get_error_message() ), self::LOGGER_HEADER );
self::schedule_integration_retry( $integration_id, $user_id, 0, $result );
$errors[] = sprintf( '[%s] User %d: %s', $integration_id, $user_id, $result->get_error_message() );
} elseif ( is_array( $result ) ) {
self::store_pulled_data( $user_id, $result, $integration );
}
}
}
}

if ( ! empty( $errors ) ) {
return new \WP_Error( 'newspack_bulk_pull_failed', implode( '; ', $errors ) );
}

return true;
}

/**
* Fire a blocking loopback request to pull data for a single integration.
*
Expand Down Expand Up @@ -239,6 +295,31 @@ public static function handle_ajax_pull() {
wp_send_json_success();
}

/**
* Filter pulled data by enabled incoming fields and store via Reader_Data.
*
* @param int $user_id WordPress user ID.
* @param array $data Raw pulled data (field_key => value).
* @param \Newspack\Reader_Activation\Integration $integration The integration instance.
*/
private static function store_pulled_data( $user_id, $data, $integration ) {
$selected_fields = $integration->get_enabled_incoming_fields();
$selected_keys = array_flip(
array_map(
function( $field ) {
return $field->get_key();
},
$selected_fields
)
);
$data = array_intersect_key( $data, $selected_keys );
Logger::log( 'Pulled data from ' . $integration->get_id() . ' for user ' . $user_id . ': ' . wp_json_encode( $data ) );

foreach ( $data as $key => $value ) {
\Newspack\Reader_Data::update_item( $user_id, $key, wp_json_encode( $value ) );
}
}

/**
* Pull data from a single integration and store selected fields.
*
Expand All @@ -260,20 +341,7 @@ public static function pull_single_integration( $user_id, $integration ) {
return $data;
}

$selected_keys = array_flip(
array_map(
function( $field ) {
return $field->get_key();
},
$selected_fields
)
);
$data = array_intersect_key( $data, $selected_keys );
Logger::log( 'Pulled data from ' . $integration->get_id() . ': ' . wp_json_encode( $data ) );

foreach ( $data as $key => $value ) {
\Newspack\Reader_Data::update_item( $user_id, $key, wp_json_encode( $value ) );
}
self::store_pulled_data( $user_id, $data, $integration );

return true;
} catch ( \Throwable $e ) {
Expand Down Expand Up @@ -376,6 +444,157 @@ private static function schedule_integration_retry( $integration_id, $user_id, $
);
}

/**
* Schedule a retry for a failed bulk integration pull via ActionScheduler.
*
* @param string $integration_id The integration ID.
* @param int[] $user_ids The WordPress user IDs.
* @param int $retry_count Current retry count (0 = first failure).
* @param \WP_Error $error The error from the failure.
*/
private static function schedule_bulk_retry( $integration_id, $user_ids, $retry_count, $error ) {
if ( ! function_exists( 'as_schedule_single_action' ) ) {
return;
}

$error_message = $error->get_error_message();
$next_retry = $retry_count + 1;

if ( $next_retry > self::MAX_RETRIES ) {
Logger::log(
sprintf(
'Max retries (%d) reached for bulk pull of integration "%s" (%d users). Giving up. Last error: %s',
self::MAX_RETRIES,
$integration_id,
count( $user_ids ),
$error_message
),
self::LOGGER_HEADER
);
do_action(
'newspack_bulk_pull_retry_exhausted',
[
'integration_id' => $integration_id,
'user_ids' => $user_ids,
'reason' => $error_message,
]
);
return;
}

$backoff_index = min( $retry_count, count( self::RETRY_BACKOFF ) - 1 );
$backoff_seconds = self::RETRY_BACKOFF[ $backoff_index ];

$retry_data = [
'integration_id' => $integration_id,
'user_ids' => $user_ids,
'retry_count' => $next_retry,
'reason' => $error_message,
];

\as_schedule_single_action(
time() + $backoff_seconds,
self::BULK_RETRY_HOOK,
[ $retry_data ],
Integrations::get_action_group( $integration_id )
);

Logger::log(
sprintf(
'Scheduled bulk pull retry %d/%d for integration "%s" (%d users) in %ds. Error: %s',
$next_retry,
self::MAX_RETRIES,
$integration_id,
count( $user_ids ),
$backoff_seconds,
$error_message
),
self::LOGGER_HEADER
);
}

/**
* Execute a bulk integration pull retry from ActionScheduler.
*
* @param array $retry_data The retry data containing integration_id, user_ids, and retry_count.
*
* @throws \Exception When the final retry fails, so ActionScheduler marks the action as "failed".
*/
public static function execute_bulk_retry( $retry_data ) {
if ( ! is_array( $retry_data ) || empty( $retry_data['integration_id'] ) || empty( $retry_data['user_ids'] ) ) {
Logger::log( 'Invalid bulk pull retry data received from Action Scheduler.', self::LOGGER_HEADER, 'error' );
return;
}

$integration_id = $retry_data['integration_id'];
$user_ids = $retry_data['user_ids'];
$retry_count = $retry_data['retry_count'] ?? 1;

$integration = Integrations::get_integration( $integration_id );
if ( ! $integration || ! Integrations::is_enabled( $integration_id ) ) {
Logger::log( sprintf( 'Integration "%s" not found or not enabled on bulk pull retry %d.', $integration_id, $retry_count ), self::LOGGER_HEADER, 'error' );
return;
}

$selected_fields = $integration->get_enabled_incoming_fields();
if ( empty( $selected_fields ) ) {
Logger::log( sprintf( 'No incoming fields enabled for integration "%s" on bulk pull retry %d.', $integration_id, $retry_count ), self::LOGGER_HEADER );
return;
}

// Filter out stale user IDs.
$valid_user_ids = array_filter(
$user_ids,
function ( $user_id ) {
return (bool) \get_userdata( $user_id );
}
);

if ( empty( $valid_user_ids ) ) {
Logger::log( sprintf( 'Bulk pull retry %d for integration "%s": no valid users remaining.', $retry_count, $integration_id ), self::LOGGER_HEADER );
return;
}

Logger::log( sprintf( 'Executing bulk pull retry %d/%d for integration "%s" (%d users).', $retry_count, self::MAX_RETRIES, $integration_id, count( $valid_user_ids ) ), self::LOGGER_HEADER );

$results = $integration->pull_contacts_data( $valid_user_ids );

if ( \is_wp_error( $results ) ) {
Logger::log( sprintf( 'Bulk pull retry %d failed for integration "%s": %s', $retry_count, $integration_id, $results->get_error_message() ), self::LOGGER_HEADER );
self::schedule_bulk_retry( $integration_id, $valid_user_ids, $retry_count, $results );

if ( $retry_count >= self::MAX_RETRIES ) {
throw new \Exception(
esc_html(
sprintf(
'Bulk pull retry %d/%d failed for integration "%s" (%d users): %s',
$retry_count,
self::MAX_RETRIES,
$integration_id,
count( $valid_user_ids ),
$results->get_error_message()
)
)
);
}
} else {
$failures = 0;
foreach ( $results as $user_id => $result ) {
if ( \is_wp_error( $result ) ) {
self::schedule_integration_retry( $integration_id, $user_id, 0, $result );
$failures++;
} elseif ( is_array( $result ) ) {
self::store_pulled_data( $user_id, $result, $integration );
}
}
if ( $failures > 0 ) {
Logger::log( sprintf( 'Bulk pull retry %d for integration "%s": %d/%d users failed, scheduled individual retries.', $retry_count, $integration_id, $failures, count( $valid_user_ids ) ), self::LOGGER_HEADER );
} else {
Logger::log( sprintf( 'Bulk pull retry %d for integration "%s": all %d users succeeded.', $retry_count, $integration_id, count( $valid_user_ids ) ), self::LOGGER_HEADER );
}
}
}

/**
* Execute an integration pull retry from ActionScheduler.
*
Expand Down
42 changes: 42 additions & 0 deletions includes/reader-activation/integrations/class-integration.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,30 @@ abstract public function can_sync( $return_errors = false );
*/
abstract public function push_contact_data( $contact, $context = '', $existing_contact = null );

/**
* Push multiple contacts to the integration destination.
*
* Default implementation iterates push_contact_data() for each contact.
* Integrations with native batch APIs should override this method.
*
* @param array $contacts Array of contact entries, each with 'contact' and optional 'existing_contact' keys.
* @param string $context Optional. The context of the sync.
*
* @return array|\WP_Error Per-contact results keyed by email (each true|\WP_Error), or WP_Error for total batch failure.
*/
public function push_contacts_data( $contacts, $context = '' ) {
$results = [];
foreach ( $contacts as $contact_data ) {
$email = $contact_data['contact']['email'] ?? '';
$results[ $email ] = $this->push_contact_data(
$contact_data['contact'],
$context,
$contact_data['existing_contact'] ?? null
);
}
return $results;
}

/**
* Register data event handlers for this integration.
*
Expand Down Expand Up @@ -216,6 +240,24 @@ public function pull_contact_data( $user_id ) {
return [];
}

/**
* Pull contact data from the integration for multiple users.
*
* Default implementation iterates pull_contact_data() for each user.
* Integrations with native batch read APIs should override this method.
*
* @param int[] $user_ids Array of WordPress user IDs.
*
* @return array|\WP_Error Per-user results keyed by user ID (each array|\WP_Error), or WP_Error for total batch failure.
*/
public function pull_contacts_data( $user_ids ) {
$results = [];
foreach ( $user_ids as $user_id ) {
$results[ $user_id ] = $this->pull_contact_data( $user_id );
}
return $results;
}

/**
* Get incoming available contact fields from the integration.
*
Expand Down
Loading
Loading