Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove queue based on the condition #76

Merged
merged 38 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
eb5395a
remove queue only if there is no error
ramyakrishnai Jun 7, 2024
037af8e
linting fixes
ramyakrishnai Jun 7, 2024
5ded7d6
Added retry limit to 3
ramyakrishnai Jun 10, 2024
8ae7ca8
changes
ramyakrishnai Jun 10, 2024
414f918
changes
ramyakrishnai Jun 10, 2024
9ef95fc
linting fixes
ramyakrishnai Jun 12, 2024
3731711
added delay of 15 sec to push to the queue again
ramyakrishnai Jun 13, 2024
0143f62
added 5 sec
ramyakrishnai Jun 13, 2024
63e5ac0
added retry
ramyakrishnai Jun 13, 2024
8d080fe
linting issues
ramyakrishnai Jun 17, 2024
7e861c8
Merge branch 'main' into removeQueue
BrianHenryIE Jun 19, 2024
74a4130
Add tests for `EventMananager::init()`
BrianHenryIE Jun 19, 2024
c78de68
Add tests for `EventManager::init()`
BrianHenryIE Jun 19, 2024
7b4562d
Delete rough-work test
BrianHenryIE Jun 19, 2024
9849c34
Do not remove events from BatchQueue if sending them to Hiive fails
BrianHenryIE Jun 19, 2024
c786076
`EventManager::shutdown()` should `BatchQueue::push()` when fails to …
BrianHenryIE Jun 19, 2024
a420021
Correctly return `WP_Error` in test
BrianHenryIE Jun 19, 2024
cc90c94
lint / comment / typehint
BrianHenryIE Jun 19, 2024
c2f719c
Merge branch 'main' into removeQueue
BrianHenryIE Jun 19, 2024
b97add5
Use `assertConditionsMet` not `expectNotToPerformAssertions`
BrianHenryIE Jun 19, 2024
4a23a7a
lint / use Mockery to skip some paths
BrianHenryIE Jun 19, 2024
b4d9523
Add `@hooked nfd_data_sync_cron` comment to `EventManager::send_batch()`
BrianHenryIE Jun 19, 2024
ccd5f1a
lint
BrianHenryIE Jun 19, 2024
eb13b46
Fix: `HiiveConnection::notify()` when `WP_Error` returned in request
BrianHenryIE Jun 20, 2024
f36013f
Do not preserve events after non-blocking requests
BrianHenryIE Jun 20, 2024
dd5fdaf
Add test for `HiiveConnection::notify()` non-blocking behavior
BrianHenryIE Jun 20, 2024
1f037a3
Merge branch 'main' into removeQueue
BrianHenryIE Jun 21, 2024
c1071a5
Extract and test `Data::delete_token_on_401_response()`, hooked `http…
BrianHenryIE Jul 8, 2024
fb72ba7
Add `forceWpMockStrictModeOn()`/`forceWpMockStrictModeOff()`
BrianHenryIE Jul 8, 2024
b43d7e4
Saved queued events on failure
BrianHenryIE Jul 9, 2024
af2865b
Add `Hiive_Connection::hiive_request`, use `::notify` for v2 events, …
BrianHenryIE Jul 9, 2024
233e71f
Update `newfold-data/v1/events` to expect notifications from `Hiive_C…
BrianHenryIE Jul 9, 2024
5f79f55
Correct order of error message / code
BrianHenryIE Jul 9, 2024
f56b6f1
Remove duplicate line
BrianHenryIE Jul 9, 2024
f6e5274
Allow 200, 201 status codes
BrianHenryIE Jul 9, 2024
a193a43
Assert WP_Error is returned
BrianHenryIE Jul 9, 2024
1f17404
lint / rename / type annotate
BrianHenryIE Jul 9, 2024
e180489
Use explicit HTTP 500 on error
BrianHenryIE Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions includes/API/Events.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,14 @@ public function register_routes() {
/**
* Dispatches a new event.
*
* `wp-json/newfold-data/v1/events`
*
* @param \WP_REST_Request $request Full details about the request.
*
* @used-by newfold-notifications/v1/notifications/
* @used-by NotificationsApi::registerRoutes() (in callback)
* @used-by wp-module-notifications/assets/js/realtime-notices.js:189
*
* @return \WP_REST_Response|\WP_Error Response object on success, or WP_Error object on failure.
*/
public function create_item( $request ) {
Expand All @@ -120,25 +126,13 @@ public function create_item( $request ) {

// If request isn't to be queued, we want the realtime response.
if ( ! $request['queue'] ) {
$notifications = array();
$hiive_response = $this->hiive->notify( array( $event ), true );

if ( is_wp_error( $hiive_response ) ) {
return new \WP_REST_Response( $hiive_response->get_error_message(), 401 );
}

$status_code = wp_remote_retrieve_response_code( $hiive_response );

if ( 200 !== $status_code ) {
return new \WP_REST_Response( wp_remote_retrieve_response_message( $hiive_response ), $status_code );
}
$hiive_response_notifications = $this->hiive->send_event( $event );

$payload = json_decode( wp_remote_retrieve_body( $hiive_response ) );
if ( $payload && is_array( $payload->data ) ) {
$notifications = $payload;
if ( is_wp_error( $hiive_response_notifications ) ) {
return new \WP_REST_Response( $hiive_response_notifications->get_error_message(), $hiive_response_notifications->get_error_code() );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrianHenryIE We'll want to return an explicit HTTP status code here. The WP_Error get_error_code() method isn't a status code. It would be something like invalid_rest_request or some other string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be appropriate to return 500 between the browser and the WP plugin. The browser doesn't really need to know about 401 etc.

}

return new \WP_REST_Response( $notifications, 201 );
return new \WP_REST_Response( array( 'data' => $hiive_response_notifications ), 201 );
}

// Otherwise, queue the event.
Expand All @@ -151,6 +145,7 @@ public function create_item( $request ) {
'data' => $data,
)
);
// 202 – "The request has been accepted for processing, but the processing has not been completed.".
$response->set_status( 202 );

return $response;
Expand Down
40 changes: 23 additions & 17 deletions includes/Data.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,7 @@ public function start(): void {
add_filter( 'rest_authentication_errors', array( $this, 'authenticate' ) );

// If we ever get a 401 response from the Hiive API, delete the token.
add_filter(
'http_response',
function ( $response, $args, $url ) {

if ( strpos( $url, NFD_HIIVE_URL ) === 0 && absint( wp_remote_retrieve_response_code( $response ) ) === 401 ) {
delete_option( 'nfd_data_token' );
}

return $response;
},
10,
3
);
add_filter( 'http_response', array( $this, 'delete_token_on_401_response' ), 10, 3 );
}

/**
Expand All @@ -81,9 +69,7 @@ public function init(): void {
if ( ! $this->hiive::is_connected() ) {

// Attempt to connect
if ( ! $this->hiive->is_throttled() ) {
$this->hiive->connect();
}
$this->hiive->connect();

return;
}
Expand All @@ -98,6 +84,27 @@ public function init(): void {
}
}

/**
* Check HTTP responses for 401 authentication errors from Hiive, delete the invalid token.
*
* @hooked http_response
* @see WP_Http::request()
*
* @param array $response The successful HTTP response.
* @param array $args HTTP request arguments.
* @param string $url The request URL.
*
* @return array
*/
public function delete_token_on_401_response( array $response, array $args, string $url ): array {

if ( strpos( $url, constant( 'NFD_HIIVE_URL' ) ) === 0 && absint( wp_remote_retrieve_response_code( $response ) ) === 401 ) {
delete_option( 'nfd_data_token' );
}

return $response;
}

/**
* Authenticate incoming REST API requests.
*
Expand All @@ -109,7 +116,6 @@ public function init(): void {
* @see WP_REST_Server::check_authentication()
*
* @used-by ConnectSite::verifyToken() in Hiive.
*
*/
public function authenticate( $errors ) {

Expand Down
120 changes: 79 additions & 41 deletions includes/EventManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

namespace NewfoldLabs\WP\Module\Data;

use Exception;
use NewfoldLabs\WP\Module\Data\EventQueue\EventQueue;
use NewfoldLabs\WP\Module\Data\Listeners\Listener;
use WP_Error;

/**
* Class to manage event subscriptions
Expand All @@ -12,7 +15,7 @@ class EventManager {
/**
* List of default listener category classes
*
* @var array
* @var Listener[]
*/
const LISTENERS = array(
'\\NewfoldLabs\\WP\\Module\\Data\\Listeners\\Admin',
Expand All @@ -39,16 +42,14 @@ class EventManager {
/**
* The queue of events logged in the current request
*
* @var array
* @var Event[]
*/
private $queue = array();

/**
* Initialize the Event Manager
*
* @return void
*/
public function init() {
public function init(): void {
$this->initialize_listeners();
$this->initialize_cron();

Expand All @@ -58,6 +59,8 @@ public function init() {

/**
* Initialize the REST API endpoint.
*
* @see Data::init()
*/
public function initialize_rest_endpoint() {
// Register REST endpoint.
Expand All @@ -66,19 +69,18 @@ public function initialize_rest_endpoint() {

/**
* Handle setting up the scheduled job for sending updates
*
* @return void
*/
public function initialize_cron() {
protected function initialize_cron(): void {
// Ensure there is a minutely option in the cron schedules
// phpcs:disable WordPress.WP.CronInterval.CronSchedulesInterval
add_filter( 'cron_schedules', array( $this, 'add_minutely_schedule' ) );

// Minutely cron hook
add_action( 'nfd_data_sync_cron', array( $this, 'send_batch' ) );
add_action( 'nfd_data_sync_cron', array( $this, 'send_saved_events_batch' ) );

// Register the cron task
if ( ! wp_next_scheduled( 'nfd_data_sync_cron' ) ) {
wp_schedule_event( time() + MINUTE_IN_SECONDS, 'minutely', 'nfd_data_sync_cron' );
wp_schedule_event( time() + constant( 'MINUTE_IN_SECONDS' ), 'minutely', 'nfd_data_sync_cron' );
}
}

Expand All @@ -93,9 +95,11 @@ public function rest_api_init() {
/**
* Add the weekly option to cron schedules if it doesn't exist
*
* @param array $schedules List of cron schedule options
* @hooked cron_schedules
*
* @param array<string, array{interval:int, display:string}> $schedules List of defined cron schedule options.
*
* @return array
* @return array<string, array{interval:int, display:string}>
*/
public function add_minutely_schedule( $schedules ) {
if ( ! array_key_exists( 'minutely', $schedules ) ||
Expand All @@ -113,9 +117,9 @@ public function add_minutely_schedule( $schedules ) {
/**
* Sends or saves all queued events at the end of the request
*
* @return void
* @hooked shutdown
*/
public function shutdown() {
public function shutdown(): void {

// Separate out the async events
$async = array();
Expand All @@ -133,18 +137,16 @@ public function shutdown() {

// Any remaining items in the queue should be sent now
if ( ! empty( $this->queue ) ) {
$this->send( $this->queue );
$this->send_request_events( $this->queue );
}
}

/**
* Register a new event subscriber
*
* @param SubscriberInterface $subscriber Class subscribing to event updates
*
* @return void
*/
public function add_subscriber( SubscriberInterface $subscriber ) {
public function add_subscriber( SubscriberInterface $subscriber ): void {
$this->subscribers[] = $subscriber;
}

Expand All @@ -158,21 +160,19 @@ public function get_subscribers() {
}

/**
* Return an array of registered listener classes
* Return an array of listener classes
*
* @return array List of listener classes
* @return Listener[] List of listener classes
*/
public function get_listeners() {
return apply_filters( 'newfold_data_listeners', $this::LISTENERS );
}

/**
* Initialize event listener classes
*
* @return void
*/
public function initialize_listeners() {
if ( defined( 'BURST_SAFETY_MODE' ) && BURST_SAFETY_MODE ) {
protected function initialize_listeners(): void {
if ( defined( 'BURST_SAFETY_MODE' ) && constant( 'BURST_SAFETY_MODE' ) ) {
// Disable listeners when site is under heavy load
return;
}
Expand All @@ -185,52 +185,90 @@ public function initialize_listeners() {
/**
* Push event data onto the queue
*
* @see wp-module-notifications/notifications.php
*
* @param Event $event Details about the action taken
*
* @return void
*/
public function push( Event $event ) {
public function push( Event $event ): void {
/**
* The `nfd_event_log` action is handled in the notification module.
*
* @see wp-module-notifications/notifications.php
*/
do_action( 'nfd_event_log', $event->key, $event );
$this->queue[] = $event;
}

/**
* Send queued events to all subscribers
* Send queued events to all subscribers; store them if they fail
*
* @param array $events A list of events
* @used-by EventManager::shutdown()
*
* @return void
* @param Event[] $events A list of events
*/
public function send( $events ) {
protected function send_request_events( array $events ): void {
foreach ( $this->get_subscribers() as $subscriber ) {
$subscriber->notify( $events );
/**
* @var array{succeededEvents:array,failedEvents:array}|WP_Error $response
*/
$response = $subscriber->notify( $events );

if ( ! ( $subscriber instanceof HiiveConnection ) ) {
continue;
}

$queue = EventQueue::getInstance()->queue();

if ( is_wp_error( $response ) ) {
EventQueue::getInstance()->queue()->push( $events );
continue;
}

EventQueue::getInstance()->queue()->push( $response['failedEvents'] );
}
}

/**
* Send queued events to all subscribers
* Send stored events to all subscribers; remove/release them from the store aftewards.
*
* @return void
* @hooked nfd_data_sync_cron
*/
public function send_batch() {
public function send_saved_events_batch(): void {

$queue = EventQueue::getInstance()->queue();

/**
* Array indexed by the table row id.
*
* @var array<int,Event> $events
*/
$events = $queue->pull( 100 );

// If queue is empty, do nothing.
if ( empty( $events ) ) {
return;
}

$ids = array_keys( $events );
$queue->reserve( array_keys( $events ) );

$queue->reserve( $ids );
foreach ( $this->get_subscribers() as $subscriber ) {
/**
* @var array{succeededEvents:array,failedEvents:array}|WP_Error $response
*/
$response = $subscriber->notify( $events );

$this->send( $events );
if ( ! ( $subscriber instanceof HiiveConnection ) ) {
continue;
}

if ( is_wp_error( $response ) ) {
$queue->release( array_keys( $events ) );
continue;
}

$queue->remove( $ids );
// Remove from the queue.
$queue->remove( array_keys( $response['succeededEvents'] ) );

// Release the 'reserve' we placed on the entry, so it will be tried again later.
$queue->release( array_keys( $response['failedEvents'] ) );
}
}
}
Loading
Loading