diff --git a/pkg/dotc1z/entitlements.go b/pkg/dotc1z/entitlements.go index 7191d217..2ceebc6a 100644 --- a/pkg/dotc1z/entitlements.go +++ b/pkg/dotc1z/entitlements.go @@ -84,22 +84,15 @@ func (c *C1File) GetEntitlement(ctx context.Context, request *reader_v2.Entitlem } func (c *C1File) PutEntitlements(ctx context.Context, entitlementObjs ...*v2.Entitlement) error { - err := c.db.WithTx(func(tx *goqu.TxDatabase) error { - err := bulkPutConnectorObjectTx(ctx, c, tx, entitlements.Name(), - func(entitlement *v2.Entitlement) (goqu.Record, error) { - return goqu.Record{ - "resource_id": entitlement.Resource.Id.Resource, - "resource_type_id": entitlement.Resource.Id.ResourceType, - }, nil - }, - entitlementObjs..., - ) - if err != nil { - return err - } - - return nil - }) + err := bulkPutConnectorObject(ctx, c, entitlements.Name(), + func(entitlement *v2.Entitlement) (goqu.Record, error) { + return goqu.Record{ + "resource_id": entitlement.Resource.Id.Resource, + "resource_type_id": entitlement.Resource.Id.ResourceType, + }, nil + }, + entitlementObjs..., + ) if err != nil { return err } diff --git a/pkg/dotc1z/grants.go b/pkg/dotc1z/grants.go index daa1a86f..432f9093 100644 --- a/pkg/dotc1z/grants.go +++ b/pkg/dotc1z/grants.go @@ -168,24 +168,18 @@ func (c *C1File) ListGrantsForResourceType( } func (c *C1File) PutGrants(ctx context.Context, bulkGrants ...*v2.Grant) error { - err := c.db.WithTx(func(tx *goqu.TxDatabase) error { - err := bulkPutConnectorObjectTx(ctx, c, tx, grants.Name(), - func(grant *v2.Grant) (goqu.Record, error) { - return goqu.Record{ - "resource_type_id": grant.Entitlement.Resource.Id.ResourceType, - "resource_id": grant.Entitlement.Resource.Id.Resource, - "entitlement_id": grant.Entitlement.Id, - "principal_resource_type_id": grant.Principal.Id.ResourceType, - "principal_resource_id": grant.Principal.Id.Resource, - }, nil - }, - bulkGrants..., - ) - if err != nil { - return err - } - return nil - }) + err := bulkPutConnectorObject(ctx, c, grants.Name(), + func(grant *v2.Grant) (goqu.Record, error) { + return goqu.Record{ + "resource_type_id": grant.Entitlement.Resource.Id.ResourceType, + "resource_id": grant.Entitlement.Resource.Id.Resource, + "entitlement_id": grant.Entitlement.Id, + "principal_resource_type_id": grant.Principal.Id.ResourceType, + "principal_resource_id": grant.Principal.Id.Resource, + }, nil + }, + bulkGrants..., + ) if err != nil { return err } diff --git a/pkg/dotc1z/resouce_types.go b/pkg/dotc1z/resouce_types.go index a0bb4d71..4b12f40e 100644 --- a/pkg/dotc1z/resouce_types.go +++ b/pkg/dotc1z/resouce_types.go @@ -79,18 +79,12 @@ func (c *C1File) GetResourceType(ctx context.Context, request *reader_v2.Resourc } func (c *C1File) PutResourceTypes(ctx context.Context, resourceTypesObjs ...*v2.ResourceType) error { - err := c.db.WithTx(func(tx *goqu.TxDatabase) error { - err := bulkPutConnectorObjectTx(ctx, c, tx, resourceTypes.Name(), - func(resource *v2.ResourceType) (goqu.Record, error) { - return nil, nil - }, - resourceTypesObjs..., - ) - if err != nil { - return err - } - return nil - }) + err := bulkPutConnectorObject(ctx, c, resourceTypes.Name(), + func(resource *v2.ResourceType) (goqu.Record, error) { + return nil, nil + }, + resourceTypesObjs..., + ) if err != nil { return err } diff --git a/pkg/dotc1z/resources.go b/pkg/dotc1z/resources.go index dc235699..40e0542d 100644 --- a/pkg/dotc1z/resources.go +++ b/pkg/dotc1z/resources.go @@ -98,27 +98,21 @@ func (c *C1File) GetResource(ctx context.Context, request *reader_v2.ResourcesRe } func (c *C1File) PutResources(ctx context.Context, resourceObjs ...*v2.Resource) error { - err := c.db.WithTx(func(tx *goqu.TxDatabase) error { - err := bulkPutConnectorObjectTx(ctx, c, tx, resources.Name(), - func(resource *v2.Resource) (goqu.Record, error) { - fields := goqu.Record{ - "resource_type_id": resource.Id.ResourceType, - "external_id": fmt.Sprintf("%s:%s", resource.Id.ResourceType, resource.Id.Resource), - } - - if resource.ParentResourceId != nil { - fields["parent_resource_type_id"] = resource.ParentResourceId.ResourceType - fields["parent_resource_id"] = resource.ParentResourceId.Resource - } - return fields, nil - }, - resourceObjs..., - ) - if err != nil { - return err - } - return nil - }) + err := bulkPutConnectorObject(ctx, c, resources.Name(), + func(resource *v2.Resource) (goqu.Record, error) { + fields := goqu.Record{ + "resource_type_id": resource.Id.ResourceType, + "external_id": fmt.Sprintf("%s:%s", resource.Id.ResourceType, resource.Id.Resource), + } + + if resource.ParentResourceId != nil { + fields["parent_resource_type_id"] = resource.ParentResourceId.ResourceType + fields["parent_resource_id"] = resource.ParentResourceId.Resource + } + return fields, nil + }, + resourceObjs..., + ) if err != nil { return err } diff --git a/pkg/dotc1z/sql_helpers.go b/pkg/dotc1z/sql_helpers.go index 968e9277..b1f8ff8e 100644 --- a/pkg/dotc1z/sql_helpers.go +++ b/pkg/dotc1z/sql_helpers.go @@ -233,20 +233,21 @@ func (c *C1File) listConnectorObjects(ctx context.Context, tableName string, req var protoMarshaler = proto.MarshalOptions{Deterministic: true} -func bulkPutConnectorObjectTx[T proto.Message](ctx context.Context, c *C1File, - tx *goqu.TxDatabase, +func bulkPutConnectorObject[T proto.Message](ctx context.Context, c *C1File, tableName string, extractFields func(m T) (goqu.Record, error), msgs ...T) error { + if len(msgs) == 0 { + return nil + } + err := c.validateSyncDb(ctx) if err != nil { return err } - baseQ := tx.Insert(tableName).Prepared(true) - baseQ = baseQ.OnConflict(goqu.DoUpdate("external_id, sync_id", goqu.C("data").Set(goqu.I("EXCLUDED.data")))) - - for _, m := range msgs { + rows := make([]*goqu.Record, len(msgs)) + for i, m := range msgs { messageBlob, err := protoMarshaler.Marshal(m) if err != nil { return err @@ -270,16 +271,35 @@ func bulkPutConnectorObjectTx[T proto.Message](ctx context.Context, c *C1File, fields["data"] = messageBlob fields["sync_id"] = c.currentSyncID fields["discovered_at"] = time.Now().Format("2006-01-02 15:04:05.999999999") - q := baseQ.Rows(fields) - query, args, err := q.ToSQL() + rows[i] = &fields + } + chunkSize := 100 + chunks := len(rows) / chunkSize + if len(rows)%chunkSize != 0 { + chunks++ + } + + for i := 0; i < chunks; i++ { + start := i * chunkSize + end := (i + 1) * chunkSize + if end > len(rows) { + end = len(rows) + } + chunkedRows := rows[start:end] + query, args, err := c.db.Insert(tableName). + OnConflict(goqu.DoUpdate("external_id, sync_id", goqu.C("data").Set(goqu.I("EXCLUDED.data")))). + Rows(chunkedRows). + Prepared(true). + ToSQL() if err != nil { return err } - _, err = tx.Exec(query, args...) + _, err = c.db.Exec(query, args...) if err != nil { return err } } + return nil } diff --git a/pkg/sync/syncer.go b/pkg/sync/syncer.go index 7bb13297..5a63406b 100644 --- a/pkg/sync/syncer.go +++ b/pkg/sync/syncer.go @@ -1120,7 +1120,6 @@ func (s *syncer) fetchEtaggedGrantsForResource( Resource: resource, Annotations: storeAnnos, PageToken: npt, - PageSize: 1000, }) if err != nil { return nil, false, err @@ -1273,7 +1272,6 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) { // Fetch a page of source grants sourceGrants, err := s.store.ListGrantsForEntitlement(ctx, &reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest{ Entitlement: sourceEntitlement.GetEntitlement(), - PageSize: 1000, PageToken: action.PageToken, }) if err != nil { @@ -1281,6 +1279,7 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) { return false, fmt.Errorf("runGrantExpandActions: error fetching source grants: %w", err) } + var newGrants []*v2.Grant = make([]*v2.Grant, 0) for _, sourceGrant := range sourceGrants.List { // Skip this grant if it is not for a resource type we care about if len(action.ResourceTypeIDs) > 0 { @@ -1322,7 +1321,6 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) { req := &reader_v2.GrantsReaderServiceListGrantsForEntitlementRequest{ Entitlement: descendantEntitlement.GetEntitlement(), PrincipalId: sourceGrant.GetPrincipal().GetId(), - PageSize: 1000, PageToken: pageToken, Annotations: nil, } @@ -1383,13 +1381,14 @@ func (s *syncer) runGrantExpandActions(ctx context.Context) (bool, error) { zap.String("grant_id", descendantGrant.GetId()), zap.Any("sources", sources), ) - - err = s.store.PutGrants(ctx, descendantGrant) - if err != nil { - l.Error("runGrantExpandActions: error updating descendant grant", zap.Error(err)) - return false, fmt.Errorf("runGrantExpandActions: error updating descendant grant: %w", err) - } } + newGrants = append(newGrants, descendantGrants...) + } + + err = s.store.PutGrants(ctx, newGrants...) + if err != nil { + l.Error("runGrantExpandActions: error updating descendant grants", zap.Error(err)) + return false, fmt.Errorf("runGrantExpandActions: error updating descendant grants: %w", err) } // If we have no more pages of work, pop the action off the stack and mark this edge in the graph as done