From 94e355975399414b9171f6e37de8b2ca729778a4 Mon Sep 17 00:00:00 2001 From: Dan Moran Date: Mon, 4 Jan 2021 13:43:16 -0800 Subject: [PATCH 1/4] fix(http): don't return 500 errors for partial writes --- http/legacy/write_handler.go | 8 ++++++++ http/write_handler.go | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/http/legacy/write_handler.go b/http/legacy/write_handler.go index 7951c96e02e..d1a790e2bee 100644 --- a/http/legacy/write_handler.go +++ b/http/legacy/write_handler.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb/v2/kit/tracing" kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" "github.com/influxdata/influxdb/v2/storage" + "github.com/influxdata/influxdb/v2/tsdb" "go.uber.org/zap" ) @@ -141,6 +142,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { } if err := h.PointsWriter.WritePoints(ctx, auth.OrgID, bucket.ID, parsed.Points); err != nil { + // N.B. PartialWriteError is logged instead of returned to the user to match cloud behavior. + if partialErr, ok := err.(tsdb.PartialWriteError); ok { + h.logger.Warn("partial write failure", zap.Error(partialErr)) + sw.WriteHeader(http.StatusNoContent) + return + } + h.HandleHTTPError(ctx, &influxdb.Error{ Code: influxdb.EInternal, Op: opWriteHandler, diff --git a/http/write_handler.go b/http/write_handler.go index 966cc2c1dc5..e8f6f5c3621 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -16,6 +16,7 @@ import ( kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/storage" + "github.com/influxdata/influxdb/v2/tsdb" "go.uber.org/zap" ) @@ -187,6 +188,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { requestBytes = parsed.RawSize if err := h.PointsWriter.WritePoints(ctx, org.ID, bucket.ID, parsed.Points); err != nil { + // N.B. PartialWriteError is logged instead of returned to the user to match cloud behavior. + if partialErr, ok := err.(tsdb.PartialWriteError); ok { + h.log.Warn("partial write failure", zap.Error(partialErr)) + sw.WriteHeader(http.StatusNoContent) + return + } + h.HandleHTTPError(ctx, &influxdb.Error{ Code: influxdb.EInternal, Op: opWriteHandler, From b93e4c1440dfcf8518af90d8ad407ca55e9bb287 Mon Sep 17 00:00:00 2001 From: Dan Moran Date: Mon, 4 Jan 2021 13:57:02 -0800 Subject: [PATCH 2/4] test: add cases for partial failure to API tests --- http/legacy/write_handler_test.go | 83 +++++++++++++++++++++++++++++++ http/write_handler_test.go | 18 +++++++ 2 files changed, 101 insertions(+) diff --git a/http/legacy/write_handler_test.go b/http/legacy/write_handler_test.go index 51c73d89aa1..24837a53dc4 100644 --- a/http/legacy/write_handler_test.go +++ b/http/legacy/write_handler_test.go @@ -19,6 +19,7 @@ import ( kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/snowflake" + "github.com/influxdata/influxdb/v2/tsdb" "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" ) @@ -187,6 +188,88 @@ func TestWriteHandler_BucketAndMappingExistsSpecificRP(t *testing.T) { assert.Equal(t, "", w.Body.String()) } +func TestWriteHandler_PartialWrite(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + // Mocked Services + eventRecorder = mocks.NewMockEventRecorder(ctrl) + dbrpMappingSvc = mocks.NewMockDBRPMappingServiceV2(ctrl) + bucketService = mocks.NewMockBucketService(ctrl) + pointsWriter = mocks.NewMockPointsWriter(ctrl) + + // Found Resources + orgID = generator.ID() + bucket = &influxdb.Bucket{ + ID: generator.ID(), + OrgID: orgID, + Name: "mydb/autogen", + RetentionPolicyName: "autogen", + RetentionPeriod: 72 * time.Hour, + } + mapping = &influxdb.DBRPMappingV2{ + OrganizationID: orgID, + BucketID: bucket.ID, + Database: "mydb", + RetentionPolicy: "autogen", + Default: true, + } + + lineProtocolBody = "m,t1=v1 f1=2 100" + ) + + findAutogenMapping := dbrpMappingSvc. + EXPECT(). + FindMany(gomock.Any(), influxdb.DBRPMappingFilterV2{ + OrgID: &mapping.OrganizationID, + Database: &mapping.Database, + RetentionPolicy: &mapping.RetentionPolicy, + }).Return([]*influxdb.DBRPMappingV2{mapping}, 1, nil) + + findBucketByID := bucketService. + EXPECT(). + FindBucketByID(gomock.Any(), bucket.ID).Return(bucket, nil) + + points := parseLineProtocol(t, lineProtocolBody) + writePoints := pointsWriter. + EXPECT(). + WritePoints(gomock.Any(), orgID, bucket.ID, pointsMatcher{points}). + Return(tsdb.PartialWriteError{Dropped: 1}) + + recordWriteEvent := eventRecorder.EXPECT(). + Record(gomock.Any(), gomock.Any()) + + gomock.InOrder( + findAutogenMapping, + findBucketByID, + writePoints, + recordWriteEvent, + ) + + perms := newPermissions(influxdb.WriteAction, influxdb.BucketsResourceType, &orgID, nil) + auth := newAuthorization(orgID, perms...) + ctx := pcontext.SetAuthorizer(context.Background(), auth) + r := newWriteRequest(ctx, lineProtocolBody) + params := r.URL.Query() + params.Set("db", "mydb") + params.Set("rp", "autogen") + r.URL.RawQuery = params.Encode() + + handler := NewWriterHandler(&PointsWriterBackend{ + HTTPErrorHandler: DefaultErrorHandler, + Logger: zaptest.NewLogger(t), + BucketService: bucketService, + DBRPMappingService: dbrp.NewAuthorizedService(dbrpMappingSvc), + PointsWriter: pointsWriter, + EventRecorder: eventRecorder, + }) + w := httptest.NewRecorder() + handler.ServeHTTP(w, r) + assert.Equal(t, http.StatusNoContent, w.Code) + assert.Equal(t, "", w.Body.String()) +} + func TestWriteHandler_BucketAndMappingExistsNoPermissions(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/http/write_handler_test.go b/http/write_handler_test.go index de65cace14d..ebb29a27967 100644 --- a/http/write_handler_test.go +++ b/http/write_handler_test.go @@ -17,6 +17,7 @@ import ( kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" "github.com/influxdata/influxdb/v2/mock" influxtesting "github.com/influxdata/influxdb/v2/testing" + "github.com/influxdata/influxdb/v2/tsdb" "go.uber.org/zap/zaptest" ) @@ -124,6 +125,23 @@ func TestWriteHandler_handleWrite(t *testing.T) { code: 204, }, }, + { + name: "partial write error is accepted", + request: request{ + org: "043e0780ee2b1000", + bucket: "04504b356e23b000", + body: "m1,t1=v1 f1=1", + auth: bucketWritePermission("043e0780ee2b1000", "04504b356e23b000"), + }, + state: state{ + org: testOrg("043e0780ee2b1000"), + bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), + writeErr: tsdb.PartialWriteError{Dropped: 1}, + }, + wants: wants{ + code: 204, + }, + }, { name: "points writer error is an internal error", request: request{ From 33955f9240bda3f20815d6df1ff1c3e9c8b444cf Mon Sep 17 00:00:00 2001 From: Dan Moran Date: Mon, 4 Jan 2021 13:58:10 -0800 Subject: [PATCH 3/4] chore: update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ef04642f7b0..b624dd1b426 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards 1. [20380](https://github.com/influxdata/influxdb/pull/20380): Remove duplication from task error messages. 1. [20313](https://github.com/influxdata/influxdb/pull/20313): Automatically build `tsi1` indexes for shards that need it instead of falling back to `inmem`. 1. [20313](https://github.com/influxdata/influxdb/pull/20313): Fix logging initialization for storage engine. +1. [20442](https://github.com/influxdata/influxdb/pull/20442): Don't return 500 codes for partial write failures. ## v2.0.3 [2020-12-14] From edcf8b8b68e1f65bc60f17120db9808435aa6550 Mon Sep 17 00:00:00 2001 From: Dan Moran Date: Mon, 4 Jan 2021 14:46:25 -0800 Subject: [PATCH 4/4] fix: return a 4xx instead of a 204 code --- http/legacy/write_handler.go | 9 ++++++--- http/legacy/write_handler_test.go | 6 +++--- http/write_handler.go | 9 ++++++--- http/write_handler_test.go | 7 ++++--- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/http/legacy/write_handler.go b/http/legacy/write_handler.go index d1a790e2bee..fd3381ffd29 100644 --- a/http/legacy/write_handler.go +++ b/http/legacy/write_handler.go @@ -142,10 +142,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { } if err := h.PointsWriter.WritePoints(ctx, auth.OrgID, bucket.ID, parsed.Points); err != nil { - // N.B. PartialWriteError is logged instead of returned to the user to match cloud behavior. if partialErr, ok := err.(tsdb.PartialWriteError); ok { - h.logger.Warn("partial write failure", zap.Error(partialErr)) - sw.WriteHeader(http.StatusNoContent) + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EUnprocessableEntity, + Op: opWriteHandler, + Msg: "failure writing points to database", + Err: partialErr, + }, sw) return } diff --git a/http/legacy/write_handler_test.go b/http/legacy/write_handler_test.go index 24837a53dc4..e6c76682683 100644 --- a/http/legacy/write_handler_test.go +++ b/http/legacy/write_handler_test.go @@ -235,7 +235,7 @@ func TestWriteHandler_PartialWrite(t *testing.T) { writePoints := pointsWriter. EXPECT(). WritePoints(gomock.Any(), orgID, bucket.ID, pointsMatcher{points}). - Return(tsdb.PartialWriteError{Dropped: 1}) + Return(tsdb.PartialWriteError{Reason: "bad points", Dropped: 1}) recordWriteEvent := eventRecorder.EXPECT(). Record(gomock.Any(), gomock.Any()) @@ -266,8 +266,8 @@ func TestWriteHandler_PartialWrite(t *testing.T) { }) w := httptest.NewRecorder() handler.ServeHTTP(w, r) - assert.Equal(t, http.StatusNoContent, w.Code) - assert.Equal(t, "", w.Body.String()) + assert.Equal(t, http.StatusUnprocessableEntity, w.Code) + assert.Equal(t, `{"code":"unprocessable entity","message":"failure writing points to database: partial write: bad points dropped=1"}`, w.Body.String()) } func TestWriteHandler_BucketAndMappingExistsNoPermissions(t *testing.T) { diff --git a/http/write_handler.go b/http/write_handler.go index e8f6f5c3621..eff6ed334c6 100644 --- a/http/write_handler.go +++ b/http/write_handler.go @@ -188,10 +188,13 @@ func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { requestBytes = parsed.RawSize if err := h.PointsWriter.WritePoints(ctx, org.ID, bucket.ID, parsed.Points); err != nil { - // N.B. PartialWriteError is logged instead of returned to the user to match cloud behavior. if partialErr, ok := err.(tsdb.PartialWriteError); ok { - h.log.Warn("partial write failure", zap.Error(partialErr)) - sw.WriteHeader(http.StatusNoContent) + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EUnprocessableEntity, + Op: opWriteHandler, + Msg: "failure writing points to database", + Err: partialErr, + }, sw) return } diff --git a/http/write_handler_test.go b/http/write_handler_test.go index ebb29a27967..5cd84f93df5 100644 --- a/http/write_handler_test.go +++ b/http/write_handler_test.go @@ -126,7 +126,7 @@ func TestWriteHandler_handleWrite(t *testing.T) { }, }, { - name: "partial write error is accepted", + name: "partial write error is unprocessable", request: request{ org: "043e0780ee2b1000", bucket: "04504b356e23b000", @@ -136,10 +136,11 @@ func TestWriteHandler_handleWrite(t *testing.T) { state: state{ org: testOrg("043e0780ee2b1000"), bucket: testBucket("043e0780ee2b1000", "04504b356e23b000"), - writeErr: tsdb.PartialWriteError{Dropped: 1}, + writeErr: tsdb.PartialWriteError{Reason: "bad points", Dropped: 1}, }, wants: wants{ - code: 204, + code: 422, + body: `{"code":"unprocessable entity","message":"failure writing points to database: partial write: bad points dropped=1"}`, }, }, {