Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic write-path logging #1858

Merged
merged 3 commits into from
Mar 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Features
- [#1755](https://github.com/influxdb/influxdb/pull/1848): Support JSON data ingest over UDP
- [#1857](https://github.com/influxdb/influxdb/pull/1857): Support retention policies with infinite duration
- [#1858](https://github.com/influxdb/influxdb/pull/1858): Enable detailed tracing of write path

## v0.9.0-rc7 [2015-03-02]

Expand Down
3 changes: 2 additions & 1 deletion cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ type Config struct {
} `toml:"cluster"`

Logging struct {
File string `toml:"file"`
File string `toml:"file"`
WriteTraceEnabled bool `toml:"write-tracing"`
} `toml:"logging"`

ContinuousQuery struct {
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ enabled = true

[logging]
file = "influxdb.log"
write-tracing = true

# Configure the admin server
[admin]
Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
// Start the server handler. Attach to broker if listening on the same port.
if s != nil {
sh := httpd.NewHandler(s, config.Authentication.Enabled, version)
sh.SetLogOutput(logWriter)
sh.WriteTrace = config.Logging.WriteTraceEnabled

if h != nil && config.BrokerAddr() == config.DataAddr() {
h.serverHandler = sh
} else {
Expand Down Expand Up @@ -262,6 +265,7 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)
s.WriteTrace = config.Logging.WriteTraceEnabled
s.RecomputePreviousN = config.ContinuousQuery.RecomputePreviousN
s.RecomputeNoOlderThan = time.Duration(config.ContinuousQuery.RecomputeNoOlderThan)
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval
Expand Down
1 change: 1 addition & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ dir = "/tmp/influxdb/development/state"

[logging]
file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr.
write-tracing = false # If true, enables detailed logging of the write system.
31 changes: 25 additions & 6 deletions httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net/http"
Expand Down Expand Up @@ -43,6 +44,9 @@ type Handler struct {
routes []route
mux *pat.PatternServeMux
requireAuthentication bool

Logger *log.Logger
WriteTrace bool // Detailed logging of write path
}

// NewHandler returns a new instance of Handler.
Expand All @@ -51,10 +55,9 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string)
server: s,
mux: pat.New(),
requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
}

weblog := log.New(os.Stderr, `[http] `, 0)

h.routes = append(h.routes,
route{
"query", // Query serving route.
Expand Down Expand Up @@ -129,17 +132,22 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string)
handler = cors(handler)
handler = requestID(handler)
if r.log {
handler = logging(handler, r.name, weblog)
handler = logging(handler, r.name, h.Logger)
}
handler = recovery(handler, r.name, weblog) // make sure recovery is always last
handler = recovery(handler, r.name, h.Logger) // make sure recovery is always last

h.mux.Add(r.method, r.pattern, handler)
}

return h
}

//ServeHTTP responds to HTTP request to the handler.
// SetLogOutput sets writer for all handler log output.
func (h *Handler) SetLogOutput(w io.Writer) {
h.Logger = log.New(w, "[http] ", log.LstdFlags)
}

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mux.ServeHTTP(w, r)
}
Expand Down Expand Up @@ -168,8 +176,19 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
// serveWrite receives incoming series data and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
var bp influxdb.BatchPoints
var dec *json.Decoder

dec := json.NewDecoder(r.Body)
if h.WriteTrace {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
h.Logger.Print("write handler failed to read bytes from request body")
} else {
h.Logger.Printf("write body received by handler: %s", string(b))
}
dec = json.NewDecoder(strings.NewReader(string(b)))
} else {
dec = json.NewDecoder(r.Body)
}

var writeError = func(result influxdb.Result, statusCode int) {
w.WriteHeader(statusCode)
Expand Down
32 changes: 31 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type Server struct {

shards map[uint64]*Shard // shards by shard id

Logger *log.Logger
Logger *log.Logger
WriteTrace bool // Detailed logging of write path

authenticationEnabled bool

Expand Down Expand Up @@ -1369,6 +1370,11 @@ type Point struct {
// WriteSeries writes series data to the database.
// Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {
if s.WriteTrace {
log.Printf("received write for database '%s', retention policy '%s', with %d points",
database, retentionPolicy, len(points))
}

// Make sure every point has at least one field.
for _, p := range points {
if len(p.Fields) == 0 {
Expand All @@ -1391,11 +1397,17 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
if err := s.createMeasurementsIfNotExists(database, retentionPolicy, points); err != nil {
return 0, err
}
if s.WriteTrace {
log.Printf("measurements and series created on database '%s'", database)
}

// Ensure all the required shard groups exist. TODO: this should be done async.
if err := s.createShardGroupsIfNotExists(database, retentionPolicy, points); err != nil {
return 0, err
}
if s.WriteTrace {
log.Printf("shard groups created for database '%s'", database)
}

// Build writeRawSeriesMessageType publish commands.
shardData := make(map[uint64][]byte, 0)
Expand All @@ -1420,9 +1432,15 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
if err != nil {
return err
}
if s.WriteTrace {
log.Printf("shard group located: %v", g)
}

// Find appropriate shard within the shard group.
sh := g.ShardBySeriesID(series.ID)
if s.WriteTrace {
log.Printf("shard located: %v", sh)
}

// Many points are likely to have the same Measurement name. Re-use codecs if possible.
var codec *FieldCodec
Expand All @@ -1445,6 +1463,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
shardData[sh.ID] = make([]byte, 0)
}
shardData[sh.ID] = append(shardData[sh.ID], data...)
if s.WriteTrace {
log.Printf("data appended to buffer for shard %d", sh.ID)
}
}

return nil
Expand All @@ -1467,6 +1488,9 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
if index > maxIndex {
maxIndex = index
}
if s.WriteTrace {
log.Printf("write series message published successfully for topic %d", i)
}
}

return maxIndex, err
Expand All @@ -1481,10 +1505,16 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
if sh == nil {
return ErrShardNotFound
}
if s.WriteTrace {
log.Printf("received write message for application, shard %d", sh.ID)
}

if err := sh.writeSeries(m.Data); err != nil {
return err
}
if s.WriteTrace {
log.Printf("write message successfully applied to shard %d", sh.ID)
}

return nil
}
Expand Down