diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 0afda6948fd4..a6d8a044f4aa 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -359,7 +359,28 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p } t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration case "schema_settings": - t.proto.SchemaSettings = req.Topic.SchemaSettings + // Clear this field. + t.proto.SchemaSettings = &pb.SchemaSettings{} + case "schema_settings.schema": + if t.proto.SchemaSettings == nil { + t.proto.SchemaSettings = &pb.SchemaSettings{} + } + t.proto.SchemaSettings.Schema = req.Topic.SchemaSettings.Schema + case "schema_settings.encoding": + if t.proto.SchemaSettings == nil { + t.proto.SchemaSettings = &pb.SchemaSettings{} + } + t.proto.SchemaSettings.Encoding = req.Topic.SchemaSettings.Encoding + case "schema_settings.first_revision_id": + if t.proto.SchemaSettings == nil { + t.proto.SchemaSettings = &pb.SchemaSettings{} + } + t.proto.SchemaSettings.FirstRevisionId = req.Topic.SchemaSettings.FirstRevisionId + case "schema_settings.last_revision_id": + if t.proto.SchemaSettings == nil { + t.proto.SchemaSettings = &pb.SchemaSettings{} + } + t.proto.SchemaSettings.LastRevisionId = req.Topic.SchemaSettings.LastRevisionId default: return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path) } diff --git a/pubsub/topic.go b/pubsub/topic.go index 22a1d70d54ec..36af7d582aa4 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -293,6 +293,8 @@ type TopicConfigToUpdate struct { RetentionDuration optional.Duration // Schema defines the schema settings upon topic creation. + // + // Use the zero value &SchemaSettings{} to remove the schema from the topic. SchemaSettings *SchemaSettings } @@ -407,7 +409,27 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { } if cfg.SchemaSettings != nil { pt.SchemaSettings = schemaSettingsToProto(cfg.SchemaSettings) - paths = append(paths, "schema_settings") + clearSchema := true + if pt.SchemaSettings.Schema != "" { + paths = append(paths, "schema_settings.schema") + clearSchema = false + } + if pt.SchemaSettings.Encoding != pb.Encoding_ENCODING_UNSPECIFIED { + paths = append(paths, "schema_settings.encoding") + clearSchema = false + } + if pt.SchemaSettings.FirstRevisionId != "" { + paths = append(paths, "schema_settings.first_revision_id") + clearSchema = false + } + if pt.SchemaSettings.LastRevisionId != "" { + paths = append(paths, "schema_settings.last_revision_id") + clearSchema = false + } + // Clear the schema if none of it's value changes. + if clearSchema { + paths = append(paths, "schema_settings") + } } return &pb.UpdateTopicRequest{ Topic: pt, diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 92e46400bdd9..13ccb0098091 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -331,8 +331,18 @@ func TestUpdateTopic_SchemaSettings(t *testing.T) { if err != nil { t.Fatal(err) } - if !testutil.Equal(config2.SchemaSettings, settings) { - t.Errorf("\ngot %+v\nwant %+v", config2, settings) + if !testutil.Equal(config2.SchemaSettings, settings, opt) { + t.Errorf("\ngot %+v\nwant %+v", config2.SchemaSettings, settings) + } + + // Clear schema settings. + settings = &SchemaSettings{} + config3, err := topic.Update(ctx, TopicConfigToUpdate{SchemaSettings: settings}) + if err != nil { + t.Fatal(err) + } + if !testutil.Equal(config3.SchemaSettings, settings, opt) { + t.Errorf("\ngot %+v\nwant %+v", config3.SchemaSettings, settings) } }