diff --git a/config/config.go b/config/config.go index d68bbce0..605853d4 100644 --- a/config/config.go +++ b/config/config.go @@ -126,6 +126,7 @@ const ( // Olric project. DefaultStorageEngine = "kvstore" + // DefaultRoutingTablePushInterval is interval between routing table push events. DefaultRoutingTablePushInterval = time.Minute ) @@ -168,6 +169,7 @@ type Config struct { // bootstrapping status without blocking indefinitely. BootstrapTimeout time.Duration + // RoutingTablePushInterval is interval between routing table push events. RoutingTablePushInterval time.Duration // The list of host:port which are used by memberlist for discovery. diff --git a/internal/kvstore/entry.go b/internal/kvstore/entry.go index 511caed1..04e99e84 100644 --- a/internal/kvstore/entry.go +++ b/internal/kvstore/entry.go @@ -108,11 +108,11 @@ func (e *Entry) Encode() []byte { func (e *Entry) Decode(buf []byte) { var offset int - klen := int(uint8(buf[offset])) + keyLength := int(buf[offset]) offset++ - e.key = string(buf[offset : offset+klen]) - offset += klen + e.key = string(buf[offset : offset+keyLength]) + offset += keyLength e.ttl = int64(binary.BigEndian.Uint64(buf[offset : offset+8])) offset += 8 diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 9a516a42..98d11007 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -74,7 +74,7 @@ type EncodeDecoder interface { Response(*bytes.Buffer) EncodeDecoder } -const headerSize int64 = 6 +const HeaderLength int64 = 6 // Header is a shared message header for all the message types in Olric Binary Protocol. type Header struct { @@ -98,7 +98,7 @@ func readHeader(conn io.ReadWriteCloser) (*Header, error) { // Read the header section. The first 6 bytes. var header Header - _, err := io.CopyN(buf, conn, headerSize) + _, err := io.CopyN(buf, conn, HeaderLength) if err != nil { return nil, filterNetworkErrors(err) } diff --git a/internal/transport/client.go b/internal/transport/client.go index 21b329c5..dce4f25b 100644 --- a/internal/transport/client.go +++ b/internal/transport/client.go @@ -68,7 +68,7 @@ func (c *Client) Close() { } // ClosePool closes the underlying connections in a pool, -// deletes from Olric's pools map and frees resources. +// deletes from the pools map and frees resources. func (c *Client) ClosePool(addr string) { c.mu.Lock() defer c.mu.Unlock() @@ -93,7 +93,14 @@ func (c *Client) pool(addr string) (connpool.Pool, error) { } factory := func() (net.Conn, error) { - return c.dialer.Dial("tcp", addr) + conn, err := c.dialer.Dial("tcp", addr) + if err != nil { + return nil, err + } + + ConnectionsTotal.Increase(1) + CurrentConnections.Increase(1) + return conn, nil } p, err := connpool.NewChannelPool(c.config.MinConn, c.config.MaxConn, factory) @@ -148,6 +155,7 @@ func (c *Client) teardownConn(rawConn net.Conn, dead bool) { pc, _ := rawConn.(*connpool.PoolConn) if dead { + CurrentConnections.Decrease(1) pc.MarkUnusable() } err := pc.Close() @@ -176,21 +184,25 @@ func (c *Client) RequestTo(addr string, req protocol.EncodeDecoder) (protocol.En if err != nil { return nil, err } - _, err = req.Buffer().WriteTo(conn) + + nr, err := req.Buffer().WriteTo(conn) if err != nil { dead = true return nil, err } + WrittenBytesTotal.Increase(nr) // Await for the response buf.Reset() - _, err = protocol.ReadMessage(conn, buf) + h, err := protocol.ReadMessage(conn, buf) if err != nil { // Failed to read message from the TCP socket. Close it. dead = true return nil, err } + ReadBytesTotal.Increase(protocol.HeaderLength + int64(h.MessageLength)) + // Response is a shortcut to create a response message for the request. resp := req.Response(buf) err = resp.Decode() diff --git a/stats/stats.go b/stats/stats.go index 288ca444..a24d6460 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -25,15 +25,15 @@ type ( MemberID uint64 ) -// SlabInfo denotes memory usage of the storage engine(a hash indexed append only log file). +// SlabInfo denotes memory usage of the storage engine(a hash indexed, append only byte slice). type SlabInfo struct { - // Total allocated space by the append-only log files. + // Total allocated space by the append-only byte slice. Allocated int - // Total inuse memory space in the append-only log files. + // Total inuse memory space in the append-only byte slice. Inuse int - // Total garbage(deleted key/value pairs) space in the append-only log files. + // Total garbage(deleted key/value pairs) space in the append-only byte slice. Garbage int }