diff --git a/CHANGELOG.md b/CHANGELOG.md index d8645f3944f..b48aec90ed3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ### Features - [#1755](https://github.com/influxdb/influxdb/pull/1848): Support JSON data ingest over UDP - [#1857](https://github.com/influxdb/influxdb/pull/1857): Support retention policies with infinite duration +- [#1858](https://github.com/influxdb/influxdb/pull/1858): Enable detailed tracing of write path ## v0.9.0-rc7 [2015-03-02] diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 82547886463..7485dda0294 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -96,7 +96,8 @@ type Config struct { } `toml:"cluster"` Logging struct { - File string `toml:"file"` + File string `toml:"file"` + WriteTraceEnabled bool `toml:"write-tracing"` } `toml:"logging"` ContinuousQuery struct { diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index 10a9bed00aa..4e6b4ac88e2 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -174,6 +174,7 @@ enabled = true [logging] file = "influxdb.log" +write-tracing = true # Configure the admin server [admin] diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 447008d3cae..712177c20d5 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -80,6 +80,9 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B // Start the server handler. Attach to broker if listening on the same port. if s != nil { sh := httpd.NewHandler(s, config.Authentication.Enabled, version) + sh.SetLogOutput(logWriter) + sh.WriteTrace = config.Logging.WriteTraceEnabled + if h != nil && config.BrokerAddr() == config.DataAddr() { h.serverHandler = sh } else { @@ -262,6 +265,7 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b // Create and open the server. s := influxdb.NewServer() s.SetLogOutput(w) + s.WriteTrace = config.Logging.WriteTraceEnabled s.RecomputePreviousN = config.ContinuousQuery.RecomputePreviousN s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan) s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 8448404bc3d..28444afdc32 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -83,3 +83,4 @@ dir = "/tmp/influxdb/development/state" [logging] file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr. +write-tracing = false # If true, enables detailed logging of the write system. diff --git a/httpd/handler.go b/httpd/handler.go index 94749b7a18d..2cb601e2bb7 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "math" "net/http" @@ -43,6 +44,9 @@ type Handler struct { routes []route mux *pat.PatternServeMux requireAuthentication bool + + Logger *log.Logger + WriteTrace bool // Detailed logging of write path } // NewHandler returns a new instance of Handler. @@ -51,10 +55,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) server: s, mux: pat.New(), requireAuthentication: requireAuthentication, + Logger: log.New(os.Stderr, "[http] ", log.LstdFlags), } - weblog := log.New(os.Stderr, `[http] `, 0) - h.routes = append(h.routes, route{ "query", // Query serving route. @@ -129,9 +132,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) handler = cors(handler) handler = requestID(handler) if r.log { - handler = logging(handler, r.name, weblog) + handler = logging(handler, r.name, h.Logger) } - handler = recovery(handler, r.name, weblog) // make sure recovery is always last + handler = recovery(handler, r.name, h.Logger) // make sure recovery is always last h.mux.Add(r.method, r.pattern, handler) } @@ -139,7 +142,12 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) return h } -//ServeHTTP responds to HTTP request to the handler. +// SetLogOutput sets writer for all handler log output. +func (h *Handler) SetLogOutput(w io.Writer) { + h.Logger = log.New(w, "[http] ", log.LstdFlags) +} + +// ServeHTTP responds to HTTP request to the handler. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.mux.ServeHTTP(w, r) } @@ -168,8 +176,19 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { var bp influxdb.BatchPoints + var dec *json.Decoder - dec := json.NewDecoder(r.Body) + if h.WriteTrace { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + h.Logger.Print("write handler failed to read bytes from request body") + } else { + h.Logger.Printf("write body received by handler: %s", string(b)) + } + dec = json.NewDecoder(strings.NewReader(string(b))) + } else { + dec = json.NewDecoder(r.Body) + } var writeError = func(result influxdb.Result, statusCode int) { w.WriteHeader(statusCode) diff --git a/server.go b/server.go index 4abf9c67287..66c49cdf8e6 100644 --- a/server.go +++ b/server.go @@ -65,7 +65,8 @@ type Server struct { shards map[uint64]*Shard // shards by shard id - Logger *log.Logger + Logger *log.Logger + WriteTrace bool // Detailed logging of write path authenticationEnabled bool @@ -1369,6 +1370,11 @@ type Point struct { // WriteSeries writes series data to the database. // Returns the messaging index the data was written to. func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) { + if s.WriteTrace { + log.Printf("received write for database '%s', retention policy '%s', with %d points", + database, retentionPolicy, len(points)) + } + // Make sure every point has at least one field. for _, p := range points { if len(p.Fields) == 0 { @@ -1391,11 +1397,17 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil { return 0, err } + if s.WriteTrace { + log.Printf("measurements and series created on database '%s'", database) + } // Ensure all the required shard groups exist. TODO: this should be done async. if err := s.createShardGroupsIfNotExists(database, retentionPolicy, points); err != nil { return 0, err } + if s.WriteTrace { + log.Printf("shard groups created for database '%s'", database) + } // Build writeRawSeriesMessageType publish commands. shardData := make(map[uint64][]byte, 0) @@ -1420,9 +1432,15 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if err != nil { return err } + if s.WriteTrace { + log.Printf("shard group located: %v", g) + } // Find appropriate shard within the shard group. sh := g.ShardBySeriesID(series.ID) + if s.WriteTrace { + log.Printf("shard located: %v", sh) + } // Many points are likely to have the same Measurement name. Re-use codecs if possible. var codec *FieldCodec @@ -1445,6 +1463,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( shardData[sh.ID] = make([]byte, 0) } shardData[sh.ID] = append(shardData[sh.ID], data...) + if s.WriteTrace { + log.Printf("data appended to buffer for shard %d", sh.ID) + } } return nil @@ -1467,6 +1488,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) ( if index > maxIndex { maxIndex = index } + if s.WriteTrace { + log.Printf("write series message published successfully for topic %d", i) + } } return maxIndex, err @@ -1481,10 +1505,16 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error { if sh == nil { return ErrShardNotFound } + if s.WriteTrace { + log.Printf("received write message for application, shard %d", sh.ID) + } if err := sh.writeSeries(m.Data); err != nil { return err } + if s.WriteTrace { + log.Printf("write message successfully applied to shard %d", sh.ID) + } return nil }