Skip to content

Commit

Permalink
*(ticdc): do not print password in cdc log (#9691)
Browse files Browse the repository at this point in the history
close #9690
  • Loading branch information
sdojjy authored Sep 11, 2023
1 parent c410cff commit 6ea9a41
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 39 deletions.
11 changes: 3 additions & 8 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

infoStr, err := info.Marshal()
if err != nil {
_ = c.Error(err)
return
}

upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand All @@ -331,7 +324,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr))
log.Info("Create changefeed successfully!",
zap.String("id", changefeedConfig.ID),
zap.String("changefeed", info.String()))
c.Status(http.StatusAccepted)
}

Expand Down
8 changes: 1 addition & 7 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CAPath: cfg.CAPath,
CertAllowedCN: cfg.CertAllowedCN,
}
infoStr, err := info.Marshal()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
Expand Down Expand Up @@ -176,7 +170,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil, true))
Expand Down
12 changes: 6 additions & 6 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI)
if err != nil {
log.Error("failed to marshal changefeed info", zap.Error(err))
clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI)
if clone.Config != nil {
clone.Config.MaskSensitiveData()
}

str, err = clone.Marshal()
Expand Down Expand Up @@ -543,11 +543,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
}

func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) {
oldRawQuery := uri.RawQuery
newRawQuery := newQuery.Encode()
maskedURI, _ := util.MaskSinkURI(uri.String())
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newQuery.Encode()))
zap.String("oldURI", maskedURI),
zap.String("newProtocol", newProtocol))

uri.RawQuery = newRawQuery
fixedSinkURI := uri.String()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ LOOP2:
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))
zap.String("info", c.state.Info.String()))

return nil
}
Expand Down Expand Up @@ -726,7 +726,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Any("status", c.state.Status),
zap.Stringer("info", c.state.Info),
zap.String("info", c.state.Info.String()),
zap.Bool("isRemoved", c.isRemoved))
}

Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand All @@ -35,9 +34,6 @@ type PulsarMockProducers struct {
func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string,
totalPartitionsNum int32, message *common.Message,
) error {
// call SyncSendMessage

log.Info("pulsarProducers SyncBroadcastMessage in")
return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message)
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func NewKafkaDDLSink(
}

start := time.Now()
log.Info("Try to create a DDL sink producer", zap.Any("options", options))
log.Info("Try to create a DDL sink producer",
zap.String("changefeed", changefeedID.String()))
syncProducer, err := factory.SyncProducer(ctx)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/ddlsink/mq/pulsar_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func NewPulsarDDLSink(
return nil, errors.Trace(err)
}

log.Info("Try to create a DDL sink producer", zap.Any("pulsarConfig", pConfig))
log.Info("Try to create a DDL sink producer",
zap.String("changefeed", changefeedID.String()))

// NewEventRouter
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme)
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func NewPulsarDMLProducer(

var pulsarConfig *config.PulsarConfig
if sinkConfig.PulsarConfig == nil {
log.Error("new pulsar DML producer fail",
zap.Any("sink:pulsar config is empty", sinkConfig.PulsarConfig))
log.Error("new pulsar DML producer fail,sink:pulsar config is empty")
return nil, cerror.ErrPulsarInvalidConfig.
GenWithStackByArgs("pulsar config is empty")
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ func NewKafkaDMLSink(
eventRouter, encoderGroup, protocol, errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID),
zap.Any("options", options))
zap.String("changefeedID", changefeedID.ID))

return s, nil
}
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ func NewPulsarDMLSink(
}

failpointCh := make(chan error, 1)
log.Info("Try to create a DML sink producer", zap.Any("pulsar", pConfig))
log.Info("Try to create a DML sink producer", zap.String("changefeed", changefeedID.String()))
start := time.Now()
p, err := producerCreator(ctx, changefeedID, client, replicaConfig.Sink, errCh, failpointCh)
log.Info("DML sink producer created",
zap.String("changefeed", changefeedID.String()),
zap.Duration("duration", time.Since(start)))
if err != nil {
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions cdc/syncpointstore/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newMySQLSyncPointStore(
return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
}

log.Info("Start mysql syncpoint sink")
log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String()))

return &mysqlSyncPointStore{
db: syncDB,
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
var secondaryTs string
err = row.Scan(&secondaryTs)
if err != nil {
log.Info("sync table: get tidb_current_ts err")
log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String()))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti
func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("open db failed", zap.Error(err))
return nil, errors.Trace(err)
}

Expand All @@ -957,7 +957,7 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err = db.PingContext(ctx); err != nil {
log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("ping db failed", zap.Error(err))
return nil, errors.Trace(err)
}
log.Info("open db success", zap.String("dsn", dsn))
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
)

// ConsistentConfig represents replication consistency config for a changefeed.
Expand Down Expand Up @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
}
return redo.ValidateStorage(uri)
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
func (c *ConsistentConfig) MaskSensitiveData() {
c.Storage = util.MaskSensitiveDataInURI(c.Storage)
}
10 changes: 10 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,13 @@ func isSinkCompatibleWithSpanReplication(u *url.URL) bool {
return u != nil &&
(strings.Contains(u.Scheme, "kafka") || strings.Contains(u.Scheme, "blackhole"))
}

// MaskSensitiveData masks sensitive data in ReplicaConfig
func (c *ReplicaConfig) MaskSensitiveData() {
if c.Sink != nil {
c.Sink.MaskSensitiveData()
}
if c.Consistent != nil {
c.Consistent.MaskSensitiveData()
}
}
40 changes: 40 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -160,6 +161,19 @@ type SinkConfig struct {
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
}

// MaskSensitiveData masks sensitive data in SinkConfig
func (s *SinkConfig) MaskSensitiveData() {
if s.SchemaRegistry != nil {
s.SchemaRegistry = aws.String(util.MaskSensitiveDataInURI(*s.SchemaRegistry))
}
if s.KafkaConfig != nil {
s.KafkaConfig.MaskSensitiveData()
}
if s.PulsarConfig != nil {
s.PulsarConfig.MaskSensitiveData()
}
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields
Expand Down Expand Up @@ -328,6 +342,19 @@ type KafkaConfig struct {
GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"`
}

// MaskSensitiveData masks sensitive data in KafkaConfig
func (k *KafkaConfig) MaskSensitiveData() {
k.SASLPassword = aws.String("********")
k.SASLGssAPIPassword = aws.String("********")
k.SASLOAuthClientSecret = aws.String("********")
k.Key = aws.String("********")
if k.GlueSchemaRegistryConfig != nil {
k.GlueSchemaRegistryConfig.AccessKey = "********"
k.GlueSchemaRegistryConfig.Token = "********"
k.GlueSchemaRegistryConfig.SecretAccessKey = "********"
}
}

// PulsarCompressionType is the compression type for pulsar
type PulsarCompressionType string

Expand Down Expand Up @@ -471,6 +498,19 @@ type PulsarConfig struct {
SinkURI *url.URL `toml:"-" json:"-"`
}

// MaskSensitiveData masks sensitive data in PulsarConfig
func (c *PulsarConfig) MaskSensitiveData() {
if c.AuthenticationToken != nil {
c.AuthenticationToken = aws.String("******")
}
if c.BasicPassword != nil {
c.BasicPassword = aws.String("******")
}
if c.OAuth2 != nil {
c.OAuth2.OAuth2PrivateKey = "******"
}
}

// Check get broker url
func (c *PulsarConfig) validate() (err error) {
if c.OAuth2 != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/avro/confluent_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (m *confluentSchemaManager) ClearRegistry(ctx context.Context, schemaSubjec
uri := m.registryURL + "/subjects/" + url.QueryEscape(schemaSubject)
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
if err != nil {
log.Error("Could not construct request for clearRegistry", zap.String("uri", uri))
log.Error("Could not construct request for clearRegistry", zap.Error(err))
return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
}
req.Header.Add(
Expand Down
29 changes: 29 additions & 0 deletions pkg/util/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) {
}
return uriParsed.Redacted(), nil
}

var name = []string{
"password",
"sasl-password",
"access-key",
"secret-access-key",
"access_token",
"token",
"secret",
"passwd",
"pwd",
}

// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked.
func MaskSensitiveDataInURI(uri string) string {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return ""
}
queries := uriParsed.Query()
for _, secretKey := range name {
if queries.Has(secretKey) {
queries.Set(secretKey, "******")
}
}
uriParsed.RawQuery = queries.Encode()
return uriParsed.Redacted()
}

0 comments on commit 6ea9a41

Please sign in to comment.