-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
metadata_updater.go
248 lines (233 loc) · 8.09 KB
/
metadata_updater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package descmetadata
import (
"context"
"fmt"
"strings"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)
// metadataUpdater which implements scexec.MetaDataUpdater that is used to update
// comments on different schema objects.
type metadataUpdater struct {
ctx context.Context
txn *kv.Txn
ieFactory sqlutil.SessionBoundInternalExecutorFactory
sessionData *sessiondata.SessionData
descriptors *descs.Collection
cacheEnabled bool
codec keys.SQLCodec
}
// NewMetadataUpdater creates a new comment updater, which can be used to
// create / destroy metadata (i.e. comments) associated with different
// schema objects.
func NewMetadataUpdater(
ctx context.Context,
ieFactory sqlutil.SessionBoundInternalExecutorFactory,
descriptors *descs.Collection,
codec keys.SQLCodec,
settings *settings.Values,
txn *kv.Txn,
sessionData *sessiondata.SessionData,
) scexec.DescriptorMetadataUpdater {
// Unfortunately, we can't use the session data unmodified, previously the
// code modifying this metadata would use a circular executor that would ignore
// any settings set later on. We will intentionally, unset problematic settings
// here.
modifiedSessionData := sessionData.Clone()
modifiedSessionData.ExperimentalDistSQLPlanningMode = sessiondatapb.ExperimentalDistSQLPlanningOn
return metadataUpdater{
ctx: ctx,
txn: txn,
codec: codec,
ieFactory: ieFactory,
sessionData: modifiedSessionData,
descriptors: descriptors,
cacheEnabled: sessioninit.CacheEnabled.Get(settings),
}
}
// UpsertDescriptorComment implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) UpsertDescriptorComment(
id int64, subID int64, commentType keys.CommentType, comment string,
) error {
ie := mu.ieFactory(mu.ctx, mu.sessionData)
_, err := ie.ExecEx(context.Background(),
fmt.Sprintf("upsert-%s-comment", commentType),
mu.txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"UPSERT INTO system.comments VALUES ($1, $2, $3, $4)",
commentType,
id,
subID,
comment,
)
return err
}
// DeleteDescriptorComment implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) DeleteDescriptorComment(
id int64, subID int64, commentType keys.CommentType,
) error {
ie := mu.ieFactory(mu.ctx, mu.sessionData)
_, err := ie.ExecEx(context.Background(),
fmt.Sprintf("delete-%s-comment", commentType),
mu.txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"DELETE FROM system.comments WHERE object_id = $1 AND sub_id = $2 AND "+
"type = $3;",
id,
subID,
commentType,
)
return err
}
// DeleteAllCommentsForTables implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) DeleteAllCommentsForTables(idSet catalog.DescriptorIDSet) error {
if idSet.Empty() {
return nil
}
var buf strings.Builder
ids := idSet.Ordered()
_, _ = fmt.Fprintf(&buf, `
DELETE FROM system.comments
WHERE type IN (%d, %d, %d, %d)
AND object_id IN (%d`,
keys.TableCommentType, keys.ColumnCommentType, keys.ConstraintCommentType,
keys.IndexCommentType, ids[0],
)
for _, id := range ids[1:] {
_, _ = fmt.Fprintf(&buf, ", %d", id)
}
buf.WriteString(")")
ie := mu.ieFactory(mu.ctx, mu.sessionData)
_, err := ie.ExecEx(context.Background(),
"delete-all-comments-for-tables",
mu.txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
buf.String(),
)
return err
}
// UpsertConstraintComment implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) UpsertConstraintComment(
tableID descpb.ID, constraintID descpb.ConstraintID, comment string,
) error {
return mu.UpsertDescriptorComment(int64(tableID), int64(constraintID), keys.ConstraintCommentType, comment)
}
// DeleteConstraintComment implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) DeleteConstraintComment(
tableID descpb.ID, constraintID descpb.ConstraintID,
) error {
return mu.DeleteDescriptorComment(int64(tableID), int64(constraintID), keys.ConstraintCommentType)
}
// DeleteDatabaseRoleSettings implement scexec.DescriptorMetaDataUpdater.
func (mu metadataUpdater) DeleteDatabaseRoleSettings(ctx context.Context, dbID descpb.ID) error {
ie := mu.ieFactory(mu.ctx, mu.sessionData)
rowsDeleted, err := ie.ExecEx(ctx,
"delete-db-role-setting",
mu.txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
fmt.Sprintf(
`DELETE FROM %s WHERE database_id = $1`,
sessioninit.DatabaseRoleSettingsTableName,
),
dbID,
)
if err != nil {
return err
}
// If the cache is off or if no rows changed, there's no need to bump the
// table version.
if !mu.cacheEnabled || rowsDeleted == 0 {
return nil
}
// Bump the table version for the role settings table when we modify it.
desc, err := mu.descriptors.GetMutableTableByID(
ctx,
mu.txn,
keys.DatabaseRoleSettingsTableID,
tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
RequireMutable: true,
},
})
if err != nil {
return err
}
desc.MaybeIncrementVersion()
return mu.descriptors.WriteDesc(ctx, false /*kvTrace*/, desc, mu.txn)
}
// SwapDescriptorSubComment implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) SwapDescriptorSubComment(
id int64, oldSubID int64, newSubID int64, commentType keys.CommentType,
) error {
ie := mu.ieFactory(mu.ctx, mu.sessionData)
_, err := ie.ExecEx(context.Background(),
fmt.Sprintf("upsert-%s-comment", commentType),
mu.txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"UPDATE system.comments SET sub_id= $1 WHERE "+
"object_id = $2 AND sub_id = $3 AND type = $4",
newSubID,
id,
oldSubID,
commentType,
)
return err
}
// DeleteSchedule implement scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) DeleteSchedule(ctx context.Context, scheduleID int64) error {
ie := mu.ieFactory(mu.ctx, mu.sessionData)
_, err := ie.ExecEx(
ctx,
"delete-schedule",
mu.txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
"DELETE FROM system.scheduled_jobs WHERE schedule_id = $1",
scheduleID,
)
return err
}
// DeleteZoneConfig implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) DeleteZoneConfig(
ctx context.Context, id descpb.ID,
) (numAffected int, err error) {
ie := mu.ieFactory(mu.ctx, mu.sessionData)
return ie.Exec(ctx, "delete-zone", mu.txn,
"DELETE FROM system.zones WHERE id = $1", id)
}
// UpsertZoneConfig implements scexec.DescriptorMetadataUpdater.
func (mu metadataUpdater) UpsertZoneConfig(
ctx context.Context, id descpb.ID, zone *zonepb.ZoneConfig,
) (numAffected int, err error) {
ie := mu.ieFactory(mu.ctx, mu.sessionData)
bytes, err := protoutil.Marshal(zone)
if err != nil {
return 0, pgerror.Wrap(err, pgcode.CheckViolation, "could not marshal zone config")
}
return ie.Exec(ctx, "upsert-zone", mu.txn,
"UPSERT INTO system.zones (id, config) VALUES ($1, $2)", id, bytes)
}