diff --git a/indexers/base.go b/indexers/base.go index 59305a9..244b9bb 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -267,20 +267,23 @@ type indexResponse struct { } // indexes the batch of contacts -func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) { +func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) { response := indexResponse{} indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index) _, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response) if err != nil { - return 0, 0, err + return 0, 0, 0, err } - createdCount, deletedCount, conflictedCount := 0, 0, 0 + createdCount, updatedCount, deletedCount, conflictedCount := 0, 0, 0, 0 + for _, item := range response.Items { if item.Index.ID != "" { slog.Debug("index response", "id", item.Index.ID, "status", item.Index.Status) - if item.Index.Status == 200 || item.Index.Status == 201 { + if item.Index.Status == 200 { + updatedCount++ + } else if item.Index.Status == 201 { createdCount++ } else if item.Index.Status == 409 { conflictedCount++ @@ -298,8 +301,10 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) { slog.Error("unparsed item in response") } } - slog.Debug("indexed batch", "created", createdCount, "deleted", deletedCount, "conflicted", conflictedCount) - return createdCount, deletedCount, nil + + slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "deleted", deletedCount, "conflicted", conflictedCount) + + return createdCount, updatedCount, deletedCount, nil } // our response for finding the last modified document diff --git a/indexers/contacts.go b/indexers/contacts.go index 8a9aef7..32486e5 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -64,12 +64,12 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error // now index our docs start := time.Now() - indexed, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild) + created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild) if err != nil { return "", fmt.Errorf("error indexing documents: %w", err) } - i.recordComplete(indexed, deleted, time.Since(start)) + i.recordComplete(created+updated, deleted, time.Since(start)) // if the index didn't previously exist or we are rebuilding, remap to our alias if remapAlias { @@ -153,8 +153,8 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( ` // IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time -func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, error) { - totalFetched, totalCreated, totalDeleted := 0, 0, 0 +func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) { + totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0 var modifiedOn time.Time var contactJSON string @@ -168,18 +168,20 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st batchStart := time.Now() // start time for this batch batchFetched := 0 // contacts fetched in this batch batchCreated := 0 // contacts created in ES + batchUpdated := 0 // contacts updated in ES batchDeleted := 0 // contacts deleted in ES batchESTime := time.Duration(0) // time spent indexing for this batch indexSubBatch := func(b *bytes.Buffer) error { t := time.Now() - created, deleted, err := i.indexBatch(index, b.Bytes()) + created, updated, deleted, err := i.indexBatch(index, b.Bytes()) if err != nil { return err } batchESTime += time.Since(t) batchCreated += created + batchUpdated += updated batchDeleted += deleted b.Reset() return nil @@ -191,17 +193,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st // no more rows? return if err == sql.ErrNoRows { - return 0, 0, nil + return 0, 0, 0, nil } if err != nil { - return 0, 0, err + return 0, 0, 0, err } defer rows.Close() for rows.Next() { err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON) if err != nil { - return 0, 0, err + return 0, 0, 0, err } batchFetched++ @@ -224,14 +226,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st // write to elastic search in batches if batchFetched%i.batchSize == 0 { if err := indexSubBatch(subBatch); err != nil { - return 0, 0, err + return 0, 0, 0, err } } } if subBatch.Len() > 0 { if err := indexSubBatch(subBatch); err != nil { - return 0, 0, err + return 0, 0, 0, err } } @@ -239,6 +241,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st totalFetched += batchFetched totalCreated += batchCreated + totalUpdated += batchUpdated totalDeleted += batchDeleted totalTime := time.Since(start) @@ -249,10 +252,12 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st "rate", batchRate, "batch_fetched", batchFetched, "batch_created", batchCreated, + "batch_updated", batchUpdated, "batch_elapsed", batchTime, "batch_elapsed_es", batchESTime, "total_fetched", totalFetched, "total_created", totalCreated, + "total_updated", totalUpdated, "total_elapsed", totalTime, ) @@ -269,7 +274,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st } } - return totalCreated, totalDeleted, nil + return totalCreated, totalUpdated, totalDeleted, nil } func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {