Skip to content

Commit

Permalink
Add actual bulk insertion. Hook it up to PutGrants(). (#269)
Browse files Browse the repository at this point in the history
* Add actual bulk insertion. Hook it up to PutGrants().

* Remove page size from store requests. These were added when our default page size was 100.

* Chunk rows in bulk inserts.

* Use bulkPutConnectorObject for all Put operations.
  • Loading branch information
ggreer authored Dec 18, 2024
1 parent ef1106d commit 3d1cef3
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 85 deletions.
25 changes: 9 additions & 16 deletions pkg/dotc1z/entitlements.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,15 @@ func (c *C1File) GetEntitlement(ctx context.Context, request *reader_v2.Entitlem
}

func (c *C1File) PutEntitlements(ctx context.Context, entitlementObjs ...*v2.Entitlement) error {
err := c.db.WithTx(func(tx *goqu.TxDatabase) error {
err := bulkPutConnectorObjectTx(ctx, c, tx, entitlements.Name(),
func(entitlement *v2.Entitlement) (goqu.Record, error) {
return goqu.Record{
"resource_id": entitlement.Resource.Id.Resource,
"resource_type_id": entitlement.Resource.Id.ResourceType,
}, nil
},
entitlementObjs...,
)
if err != nil {
return err
}

return nil
})
err := bulkPutConnectorObject(ctx, c, entitlements.Name(),
func(entitlement *v2.Entitlement) (goqu.Record, error) {
return goqu.Record{
"resource_id": entitlement.Resource.Id.Resource,
"resource_type_id": entitlement.Resource.Id.ResourceType,
}, nil
},
entitlementObjs...,
)
if err != nil {
return err
}
Expand Down
30 changes: 12 additions & 18 deletions pkg/dotc1z/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,24 +168,18 @@ func (c *C1File) ListGrantsForResourceType(
}

func (c *C1File) PutGrants(ctx context.Context, bulkGrants ...*v2.Grant) error {
err := c.db.WithTx(func(tx *goqu.TxDatabase) error {
err := bulkPutConnectorObjectTx(ctx, c, tx, grants.Name(),
func(grant *v2.Grant) (goqu.Record, error) {
return goqu.Record{
"resource_type_id": grant.Entitlement.Resource.Id.ResourceType,
"resource_id": grant.Entitlement.Resource.Id.Resource,
"entitlement_id": grant.Entitlement.Id,
"principal_resource_type_id": grant.Principal.Id.ResourceType,
"principal_resource_id": grant.Principal.Id.Resource,
}, nil
},
bulkGrants...,
)
if err != nil {
return err
}
return nil
})
err := bulkPutConnectorObject(ctx, c, grants.Name(),
func(grant *v2.Grant) (goqu.Record, error) {
return goqu.Record{
"resource_type_id": grant.Entitlement.Resource.Id.ResourceType,
"resource_id": grant.Entitlement.Resource.Id.Resource,
"entitlement_id": grant.Entitlement.Id,
"principal_resource_type_id": grant.Principal.Id.ResourceType,
"principal_resource_id": grant.Principal.Id.Resource,
}, nil
},
bulkGrants...,
)
if err != nil {
return err
}
Expand Down
18 changes: 6 additions & 12 deletions pkg/dotc1z/resouce_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,12 @@ func (c *C1File) GetResourceType(ctx context.Context, request *reader_v2.Resourc
}

func (c *C1File) PutResourceTypes(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error {
err := c.db.WithTx(func(tx *goqu.TxDatabase) error {
err := bulkPutConnectorObjectTx(ctx, c, tx, resourceTypes.Name(),
func(resource *v2.ResourceType) (goqu.Record, error) {
return nil, nil
},
resourceTypesObjs...,
)
if err != nil {
return err
}
return nil
})
err := bulkPutConnectorObject(ctx, c, resourceTypes.Name(),
func(resource *v2.ResourceType) (goqu.Record, error) {
return nil, nil
},
resourceTypesObjs...,
)
if err != nil {
return err
}
Expand Down
36 changes: 15 additions & 21 deletions pkg/dotc1z/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,21 @@ func (c *C1File) GetResource(ctx context.Context, request *reader_v2.ResourcesRe
}

func (c *C1File) PutResources(ctx context.Context, resourceObjs ...*v2.Resource) error {
err := c.db.WithTx(func(tx *goqu.TxDatabase) error {
err := bulkPutConnectorObjectTx(ctx, c, tx, resources.Name(),
func(resource *v2.Resource) (goqu.Record, error) {
fields := goqu.Record{
"resource_type_id": resource.Id.ResourceType,
"external_id": fmt.Sprintf("%s:%s", resource.Id.ResourceType, resource.Id.Resource),
}

if resource.ParentResourceId != nil {
fields["parent_resource_type_id"] = resource.ParentResourceId.ResourceType
fields["parent_resource_id"] = resource.ParentResourceId.Resource
}
return fields, nil
},
resourceObjs...,
)
if err != nil {
return err
}
return nil
})
err := bulkPutConnectorObject(ctx, c, resources.Name(),
func(resource *v2.Resource) (goqu.Record, error) {
fields := goqu.Record{
"resource_type_id": resource.Id.ResourceType,
"external_id": fmt.Sprintf("%s:%s", resource.Id.ResourceType, resource.Id.Resource),
}

if resource.ParentResourceId != nil {
fields["parent_resource_type_id"] = resource.ParentResourceId.ResourceType
fields["parent_resource_id"] = resource.ParentResourceId.Resource
}
return fields, nil
},
resourceObjs...,
)
if err != nil {
return err
}
Expand Down
38 changes: 29 additions & 9 deletions pkg/dotc1z/sql_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,21 @@ func (c *C1File) listConnectorObjects(ctx context.Context, tableName string, req

var protoMarshaler = proto.MarshalOptions{Deterministic: true}

func bulkPutConnectorObjectTx[T proto.Message](ctx context.Context, c *C1File,
tx *goqu.TxDatabase,
func bulkPutConnectorObject[T proto.Message](ctx context.Context, c *C1File,
tableName string,
extractFields func(m T) (goqu.Record, error),
msgs ...T) error {
if len(msgs) == 0 {
return nil
}

err := c.validateSyncDb(ctx)
if err != nil {
return err
}

baseQ := tx.Insert(tableName).Prepared(true)
baseQ = baseQ.OnConflict(goqu.DoUpdate("external_id, sync_id", goqu.C("data").Set(goqu.I("EXCLUDED.data"))))

for _, m := range msgs {
rows := make([]*goqu.Record, len(msgs))
for i, m := range msgs {
messageBlob, err := protoMarshaler.Marshal(m)
if err != nil {
return err
Expand All @@ -270,16 +271,35 @@ func bulkPutConnectorObjectTx[T proto.Message](ctx context.Context, c *C1File,
fields["data"] = messageBlob
fields["sync_id"] = c.currentSyncID
fields["discovered_at"] = time.Now().Format("2006-01-02 15:04:05.999999999")
q := baseQ.Rows(fields)
query, args, err := q.ToSQL()
rows[i] = &fields
}
chunkSize := 100
chunks := len(rows) / chunkSize
if len(rows)%chunkSize != 0 {
chunks++
}

for i := 0; i < chunks; i++ {
start := i * chunkSize
end := (i + 1) * chunkSize
if end > len(rows) {
end = len(rows)
}
chunkedRows := rows[start:end]
query, args, err := c.db.Insert(tableName).
OnConflict(goqu.DoUpdate("external_id, sync_id", goqu.C("data").Set(goqu.I("EXCLUDED.data")))).
Rows(chunkedRows).
Prepared(true).
ToSQL()
if err != nil {
return err
}
_, err = tx.Exec(query, args...)
_, err = c.db.Exec(query, args...)
if err != nil {
return err
}
}

return nil
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,6 @@ func (s *syncer) fetchEtaggedGrantsForResource(
Resource: resource,
Annotations: storeAnnos,
PageToken: npt,
PageSize: 1000,
})
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -1273,14 +1272,14 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) {
// Fetch a page of source grants
sourceGrants, err := s.store.ListGrantsForEntitlement(ctx, &reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest{
Entitlement: sourceEntitlement.GetEntitlement(),
PageSize: 1000,
PageToken: action.PageToken,
})
if err != nil {
l.Error("runGrantExpandActions: error fetching source grants", zap.Error(err))
return false, fmt.Errorf("runGrantExpandActions: error fetching source grants: %w", err)
}

var newGrants []*v2.Grant = make([]*v2.Grant, 0)
for _, sourceGrant := range sourceGrants.List {
// Skip this grant if it is not for a resource type we care about
if len(action.ResourceTypeIDs) > 0 {
Expand Down Expand Up @@ -1322,7 +1321,6 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) {
req := &reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest{
Entitlement: descendantEntitlement.GetEntitlement(),
PrincipalId: sourceGrant.GetPrincipal().GetId(),
PageSize: 1000,
PageToken: pageToken,
Annotations: nil,
}
Expand Down Expand Up @@ -1383,13 +1381,14 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) {
zap.String("grant_id", descendantGrant.GetId()),
zap.Any("sources", sources),
)

err = s.store.PutGrants(ctx, descendantGrant)
if err != nil {
l.Error("runGrantExpandActions: error updating descendant grant", zap.Error(err))
return false, fmt.Errorf("runGrantExpandActions: error updating descendant grant: %w", err)
}
}
newGrants = append(newGrants, descendantGrants...)
}

err = s.store.PutGrants(ctx, newGrants...)
if err != nil {
l.Error("runGrantExpandActions: error updating descendant grants", zap.Error(err))
return false, fmt.Errorf("runGrantExpandActions: error updating descendant grants: %w", err)
}

// If we have no more pages of work, pop the action off the stack and mark this edge in the graph as done
Expand Down

0 comments on commit 3d1cef3

Please sign in to comment.