Skip to content

Commit

Permalink
Fix pending tips processing for verified Rewards profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
zenparsing committed Jun 8, 2022
1 parent bbb3d8a commit bcfcc43
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,6 @@ void ContributionExternalWallet::OnServerPublisherInfo(

BLOG(1, "Publisher not verified");

// TODO(zenparsing): Adding a record to the pending contribution table at
// this point can lead to an (async) infinite loop if pending contributions
// are currently being flushed. In `unverified.cc`, the pending contribution
// processor processes the first available pending record and then sets a
// timer to process the next one. If another record is added before that
// timer expires, it can cause the flushing operation to continue
// indefinitely.

/*
auto save_callback =
std::bind(&ContributionExternalWallet::OnSavePendingContribution,
this,
Expand All @@ -176,10 +167,8 @@ void ContributionExternalWallet::OnServerPublisherInfo(
type::PendingContributionList list;
list.push_back(std::move(contribution));

ledger_->database()->SavePendingContribution(
std::move(list),
save_callback);
*/
ledger_->database()->SavePendingContribution(std::move(list),
save_callback);

callback(type::Result::LEDGER_ERROR);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <utility>

#include "base/guid.h"
#include "base/numerics/safe_conversions.h"
#include "bat/ledger/internal/common/time_util.h"
#include "bat/ledger/internal/contribution/unverified.h"
#include "bat/ledger/internal/ledger_impl.h"
Expand All @@ -23,6 +24,12 @@ Unverified::Unverified(LedgerImpl* ledger) :
Unverified::~Unverified() = default;

void Unverified::Contribute() {
if (processing_start_time_) {
BLOG(1, "Pending tips already processing");
return;
}
BLOG(1, "Pending tips processing starting");
processing_start_time_ = base::Time::Now();
ledger_->database()->GetUnverifiedPublishersForPendingContributions(
std::bind(&Unverified::FetchInfoForUnverifiedPublishers, this, _1));
}
Expand All @@ -38,17 +45,22 @@ void Unverified::FetchInfoForUnverifiedPublishers(
FetchInfoForUnverifiedPublishers(std::move(publisher_keys));
});
} else {
ledger_->wallet()->FetchBalance(
std::bind(&Unverified::OnContributeUnverifiedBalance, this, _1, _2));
ProcessNext();
}
}

void Unverified::ProcessNext() {
DCHECK(processing_start_time_);
ledger_->wallet()->FetchBalance(
std::bind(&Unverified::OnContributeUnverifiedBalance, this, _1, _2));
}

void Unverified::OnContributeUnverifiedBalance(
type::Result result,
type::BalancePtr properties) {
if (result != type::Result::LEDGER_OK || !properties) {
BLOG(0, "Balance is null");
return;
return ProcessingCompleted();
}

ledger_->database()->GetPendingContributions(
Expand All @@ -63,7 +75,7 @@ void Unverified::OnContributeUnverifiedPublishers(
const type::PendingContributionInfoList& list) {
if (list.empty()) {
BLOG(1, "List is empty");
return;
return ProcessingCompleted();
}

if (balance == 0) {
Expand All @@ -72,9 +84,16 @@ void Unverified::OnContributeUnverifiedPublishers(
type::Result::PENDING_NOT_ENOUGH_FUNDS,
"",
"");
return;
return ProcessingCompleted();
}

// NOTE: `PendingContribution::added_date` is stored as a uint64_t of ms from
// Unix epoch, with the subsecond interval truncated. For correct comparison
// with that value, convert the processing start time to uin64_t and truncate.
DCHECK(processing_start_time_);
uint64_t processing_cutoff =
base::ClampFloor<uint64_t>(processing_start_time_->ToDoubleT());

const auto now = util::GetCurrentTimeStamp();

type::PendingContributionInfoPtr current;
Expand All @@ -90,6 +109,11 @@ void Unverified::OnContributeUnverifiedPublishers(
continue;
}

// If the pending entry was added after we started processing, then skip it.
if (item->added_date >= processing_cutoff) {
continue;
}

// If the publisher is still not verified,
// leave the contribution in the pending table.
if (item->status == type::PublisherStatus::NOT_VERIFIED ||
Expand All @@ -104,7 +128,7 @@ void Unverified::OnContributeUnverifiedPublishers(

if (!current) {
BLOG(1, "Nothing to process");
return;
return ProcessingCompleted();
}

auto get_callback = std::bind(&Unverified::WasPublisherProcessed,
Expand All @@ -123,7 +147,7 @@ void Unverified::OnContributeUnverifiedPublishers(
type::Result::PENDING_NOT_ENOUGH_FUNDS,
"",
"");
return;
return ProcessingCompleted();
}

type::ContributionQueuePublisherList queue_list;
Expand Down Expand Up @@ -168,8 +192,9 @@ void Unverified::QueueSaved(

BLOG(1, "Unverified contribution timer set for " << delay);

unverified_publishers_timer_.Start(FROM_HERE, delay,
base::BindOnce(&Unverified::Contribute, base::Unretained(this)));
unverified_publishers_timer_.Start(
FROM_HERE, delay,
base::BindOnce(&Unverified::ProcessNext, base::Unretained(this)));
}

void Unverified::WasPublisherProcessed(
Expand Down Expand Up @@ -211,7 +236,7 @@ void Unverified::OnRemovePendingContribution(
type::Result result) {
if (result != type::Result::LEDGER_OK) {
BLOG(0, "Problem removing pending contribution");
return;
return ProcessingCompleted();
}

ledger_->ledger_client()->OnContributeUnverifiedPublishers(
Expand All @@ -220,5 +245,10 @@ void Unverified::OnRemovePendingContribution(
"");
}

void Unverified::ProcessingCompleted() {
BLOG(1, "Pending tips processing completed");
processing_start_time_ = {};
}

} // namespace contribution
} // namespace ledger
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
#include <string>
#include <vector>

#include "base/time/time.h"
#include "base/timer/timer.h"
#include "bat/ledger/ledger.h"
#include "third_party/abseil-cpp/absl/types/optional.h"

namespace ledger {
class LedgerImpl;
Expand Down Expand Up @@ -45,6 +47,8 @@ class Unverified {

void OnRemovePendingContribution(type::Result result);

void ProcessNext();

void OnContributeUnverifiedBalance(
type::Result result,
type::BalancePtr properties);
Expand All @@ -57,8 +61,11 @@ class Unverified {
const type::Result result,
const uint64_t pending_contribution_id);

void ProcessingCompleted();

LedgerImpl* ledger_; // NOT OWNED
base::OneShotTimer unverified_publishers_timer_;
absl::optional<base::Time> processing_start_time_;
};

} // namespace contribution
Expand Down

0 comments on commit bcfcc43

Please sign in to comment.