Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] status dereferencing improvements #3255

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 81 additions & 61 deletions internal/federation/dereferencing/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package dereferencing
import (
"context"
"errors"
"net/http"
"net/url"
"slices"
"time"
Expand Down Expand Up @@ -75,7 +76,7 @@ func statusFresh(

// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model whose 'last_fetched' date
// is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching,
// e.g. dereferencing the status thread. Param 'syncParent' = true indicates to fetch status ancestors synchronously. An ActivityPub object indicates the status was dereferenced.
// e.g. dereferencing the status thread. An ActivityPub object indicates the status was dereferenced.
func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) {

// Fetch and dereference / update status if necessary.
Expand Down Expand Up @@ -164,22 +165,13 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
return status, nil, false, nil
}

// Try to deref and update existing status model.
latest, statusable, isNew, err := d.enrichStatusSafely(ctx,
// Try to deref and update existing.
return d.enrichStatusSafely(ctx,
requestUser,
uri,
status,
nil,
)

if err != nil {
NyaaaWhatsUpDoc marked this conversation as resolved.
Show resolved Hide resolved
// fallback to the
// existing status.
latest = status
statusable = nil
}

return latest, statusable, isNew, err
}

// RefreshStatus is functionally equivalent to GetStatusByURI(), except that it requires a pre
Expand Down Expand Up @@ -211,9 +203,6 @@ func (d *Dereferencer) RefreshStatus(
status,
statusable,
)
if err != nil {
return nil, nil, err
}

if statusable != nil {
// Deref parents + children.
Expand All @@ -226,7 +215,7 @@ func (d *Dereferencer) RefreshStatus(
)
}

return latest, statusable, nil
return latest, statusable, err
}

// RefreshStatusAsync is functionally equivalent to RefreshStatus(), except that ALL
Expand Down Expand Up @@ -275,9 +264,10 @@ func (d *Dereferencer) RefreshStatusAsync(
})
}

// enrichStatusSafely wraps enrichStatus() to perform
// it within the State{}.FedLocks mutexmap, which protects
// dereferencing actions with per-URI mutex locks.
// enrichStatusSafely wraps enrichStatus() to perform it within
// a State{}.FedLocks mutexmap, which protects it within per-URI
// mutex locks. This also handles necessary delete of now-deleted
// statuses, and updating fetched_at on returned HTTP errors.
func (d *Dereferencer) enrichStatusSafely(
ctx context.Context,
requestUser string,
Expand Down Expand Up @@ -307,44 +297,54 @@ func (d *Dereferencer) enrichStatusSafely(
defer unlock()

// Perform status enrichment with passed vars.
latest, apubStatus, err := d.enrichStatus(ctx,
latest, statusable, err := d.enrichStatus(ctx,
requestUser,
uri,
status,
statusable,
)

if gtserror.StatusCode(err) >= 400 {
if isNew {
// This was a new status enrich
// attempt which failed before we
// got to store it, so we can't
// return anything useful.
return nil, nil, isNew, err
// Check for a returned HTTP code via error.
switch code := gtserror.StatusCode(err); {

// Gone (410) definitely indicates deletion.
// Remove status if it was an existing one.
case code == http.StatusGone && !isNew:
if err := d.state.DB.DeleteStatusByID(ctx, status.ID); err != nil {
log.Error(ctx, "error deleting gone status %s: %v", uriStr, err)
}

// We had this status stored already
// before this enrichment attempt.
//
// Don't return any status.
return nil, nil, false, err

// Any other HTTP error mesg
// code, with existing status.
case code >= 400 && !isNew:

// Update fetched_at to slow re-attempts
// but don't return early. We can still
// return the model we had stored already.
status.FetchedAt = time.Now()
if err := d.state.DB.UpdateStatus(ctx, status, "fetched_at"); err != nil {
log.Error(ctx, "error updating %s fetched_at: %v", uriStr, err)
}

// See below.
fallthrough

// In case of error with an existing
// status in the database, return error
// but still return existing status.
case err != nil && !isNew:
latest = status
statusable = nil
}

// Unlock now
// we're done.
unlock()

if errors.Is(err, db.ErrAlreadyExists) {
// Ensure AP model isn't set,
// otherwise this indicates WE
// enriched the status.
apubStatus = nil
tsmethurst marked this conversation as resolved.
Show resolved Hide resolved

// We leave 'isNew' set so that caller
// still dereferences parents, otherwise
// the version we pass back may not have
Expand All @@ -362,7 +362,7 @@ func (d *Dereferencer) enrichStatusSafely(
}
}

return latest, apubStatus, isNew, err
return latest, statusable, isNew, err
}

// enrichStatus will enrich the given status, whether a new
Expand All @@ -374,7 +374,11 @@ func (d *Dereferencer) enrichStatus(
uri *url.URL,
status *gtsmodel.Status,
apubStatus ap.Statusable,
) (*gtsmodel.Status, ap.Statusable, error) {
) (
*gtsmodel.Status,
ap.Statusable,
error,
) {
// Pre-fetch a transport for requesting username, used by later dereferencing.
tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil {
Expand All @@ -385,7 +389,7 @@ func (d *Dereferencer) enrichStatus(
if blocked, err := d.state.DB.IsDomainBlocked(ctx, uri.Host); err != nil {
return nil, nil, gtserror.Newf("error checking blocked domain: %w", err)
} else if blocked {
err = gtserror.Newf("%s is blocked", uri.Host)
err := gtserror.Newf("%s is blocked", uri.Host)
return nil, nil, gtserror.SetUnretrievable(err)
}

Expand All @@ -406,7 +410,7 @@ func (d *Dereferencer) enrichStatus(
if err != nil {
// ResolveStatusable will set gtserror.WrongType
// on the returned error, so we don't need to do it here.
err = gtserror.Newf("error resolving statusable %s: %w", uri, err)
err := gtserror.Newf("error resolving statusable %s: %w", uri, err)
return nil, nil, err
}

Expand Down Expand Up @@ -448,11 +452,14 @@ func (d *Dereferencer) enrichStatus(
// Ensure we have the author account of the status dereferenced (+ up-to-date). If this is a new status
// (i.e. status.AccountID == "") then any error here is irrecoverable. status.AccountID must ALWAYS be set.
if _, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil && status.AccountID == "" {
return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err)

// Note that we specifically DO NOT wrap the error, instead collapsing it as string.
// Errors fetching an account do not necessarily relate to dereferencing the status.
return nil, nil, gtserror.Newf("failed to dereference status author %s: %v", uri, err)
NyaaaWhatsUpDoc marked this conversation as resolved.
Show resolved Hide resolved
}

// ActivityPub model was recently dereferenced, so assume that passed status
// may contain out-of-date information, convert AP model to our GTS model.
// ActivityPub model was recently dereferenced, so assume passed status
// may contain out-of-date information. Convert AP model to our GTS model.
latestStatus, err := d.converter.ASStatusToStatus(ctx, apubStatus)
if err != nil {
return nil, nil, gtserror.Newf("error converting statusable to gts model for status %s: %w", uri, err)
Expand Down Expand Up @@ -603,11 +610,11 @@ func (d *Dereferencer) fetchStatusMentions(
err error
)

mention, alreadyExists, err = d.populateMentionTarget(
ctx,
// Search existing status for a mention already stored,
// else ensure new mention's target account is populated.
mention, alreadyExists, err = d.getPopulatedMention(ctx,
requestUser,
existing,
status,
mention,
)
if err != nil {
Expand Down Expand Up @@ -984,7 +991,7 @@ func (d *Dereferencer) fetchStatusEmojis(
return nil
}

// populateMentionTarget tries to populate the given
// getPopulatedMention tries to populate the given
// mention with the correct TargetAccount and (if not
// yet set) TargetAccountURI, returning the populated
// mention.
Expand All @@ -996,11 +1003,10 @@ func (d *Dereferencer) fetchStatusEmojis(
// Otherwise, this function will try to parse first
// the Href of the mention, and then the namestring,
// to see who it targets, and go fetch that account.
func (d *Dereferencer) populateMentionTarget(
func (d *Dereferencer) getPopulatedMention(
ctx context.Context,
requestUser string,
existing *gtsmodel.Status,
status *gtsmodel.Status,
mention *gtsmodel.Mention,
) (
*gtsmodel.Mention,
Expand All @@ -1010,8 +1016,8 @@ func (d *Dereferencer) populateMentionTarget(
// Mentions can be created using Name or Href.
// Prefer Href (TargetAccountURI), fall back to Name.
if mention.TargetAccountURI != "" {
// Look for existing mention with this URI.
// If we already have it we can return early.

// Look for existing mention with target account's URI, if so use this.
existingMention, ok := existing.GetMentionByTargetURI(mention.TargetAccountURI)
if ok && existingMention.ID != "" {
return existingMention, true, nil
Expand All @@ -1020,33 +1026,47 @@ func (d *Dereferencer) populateMentionTarget(
// Ensure that mention account URI is parseable.
accountURI, err := url.Parse(mention.TargetAccountURI)
if err != nil {
err = gtserror.Newf("invalid account uri %q: %w", mention.TargetAccountURI, err)
err := gtserror.Newf("invalid account uri %q: %w", mention.TargetAccountURI, err)
return nil, false, err
}

// Ensure we have the account of the mention target dereferenced.
mention.TargetAccount, _, err = d.getAccountByURI(ctx, requestUser, accountURI)
// Ensure we have account of the mention target dereferenced.
mention.TargetAccount, _, err = d.getAccountByURI(ctx,
requestUser,
accountURI,
)
if err != nil {
err = gtserror.Newf("failed to dereference account %s: %w", accountURI, err)
err := gtserror.Newf("failed to dereference account %s: %w", accountURI, err)
return nil, false, err
}
} else {
// Href wasn't set. Find the target account using namestring.

// Href wasn't set, extract the username and domain parts from namestring.
username, domain, err := util.ExtractNamestringParts(mention.NameString)
if err != nil {
err = gtserror.Newf("failed to parse namestring %s: %w", mention.NameString, err)
err := gtserror.Newf("failed to parse namestring %s: %w", mention.NameString, err)
return nil, false, err
}

mention.TargetAccount, _, err = d.getAccountByUsernameDomain(ctx, requestUser, username, domain)
// Look for existing mention with username domain target, if so use this.
existingMention, ok := existing.GetMentionByUsernameDomain(username, domain)
if ok && existingMention.ID != "" {
return existingMention, true, nil
}

// Ensure we have the account of the mention target dereferenced.
mention.TargetAccount, _, err = d.getAccountByUsernameDomain(ctx,
requestUser,
username,
domain,
)
if err != nil {
err = gtserror.Newf("failed to dereference account %s: %w", mention.NameString, err)
err := gtserror.Newf("failed to dereference account %s: %w", mention.NameString, err)
return nil, false, err
}

// Look for existing mention with this URI.
mention.TargetAccountURI = mention.TargetAccount.URI
existingMention, ok := existing.GetMentionByTargetURI(mention.TargetAccountURI)
// Look for existing mention with target account's URI, if so use this.
existingMention, ok = existing.GetMentionByTargetURI(mention.TargetAccountURI)
if ok && existingMention.ID != "" {
return existingMention, true, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/federation/dereferencing/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (d *Dereferencer) DereferenceStatusDescendants(ctx context.Context, usernam

// Keep track of already dereferenced collection
// pages for this thread to prevent recursion.
derefdPages := make(map[string]struct{}, 10)
derefdPages := make(map[string]struct{}, 16)

// frame represents a single stack frame when
// iteratively derefencing status descendants.
Expand Down
3 changes: 2 additions & 1 deletion internal/gtsmodel/mention.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ type Mention struct {
//
// This will not be put in the database, it's just for convenience.
NameString string `bun:"-"`

// TargetAccountURI is the AP ID (uri) of the user mentioned.
//
// This will not be put in the database, it's just for convenience.
TargetAccountURI string `bun:"-"`

// TargetAccountURL is the web url of the user mentioned.
//
// This will not be put in the database, it's just for convenience.
TargetAccountURL string `bun:"-"`
// A pointer to the gtsmodel account of the mentioned account.
}

// ParseMentionFunc describes a function that takes a lowercase account namestring
Expand Down
29 changes: 29 additions & 0 deletions internal/gtsmodel/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,35 @@ func (s *Status) GetMentionByTargetURI(uri string) (*Mention, bool) {
return nil, false
}

// GetMentionByUsernameDomain fetches the Mention associated with given
// username and domains, typically extracted from a mention Namestring.
func (s *Status) GetMentionByUsernameDomain(username, domain string) (*Mention, bool) {
for _, mention := range s.Mentions {

// We can only check if target
// account is set on the mention.
account := mention.TargetAccount
if account == nil {
continue
}

// Usernames must always match.
if account.Username != username {
continue
}

// Finally, either domains must
// match or an empty domain may
// be permitted if account local.
if account.Domain == domain ||
(domain == "" && account.IsLocal()) {
return mention, true
}
}

return nil, false
}

// GetTagByName searches status for Tag{} with name.
func (s *Status) GetTagByName(name string) (*Tag, bool) {
for _, tag := range s.Tags {
Expand Down