Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checksum op #7

Merged
merged 6 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion disk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func (d *DiskStore) Get(key string) (string, error) {
return "", ErrDecodingFailed
}

//validate if the checksum matches means the value is not corrupted
if !result.VerifyCheckSum(data) {
return "", ErrChecksumMismatch
}

return result.Value, nil
}

Expand All @@ -147,9 +152,15 @@ func (d *DiskStore) Set(key string, value string) error {
// 1. Encode the KV into bytes
// 2. Write the bytes to disk by appending to the file
// 3. Update KeyDir with the KeyEntry of this key

if err := validateKV(key, []byte(value)); err != nil {
return err
}

timestamp := uint32(time.Now().Unix())
h := Header{TimeStamp: timestamp, KeySize: uint32(len(key)), ValueSize: uint32(len(value))}
r := Record{Header: h, Key: key, Value: value, RecordSize: headerSize + h.KeySize + h.ValueSize}
r.Header.CheckSum = r.CalculateCheckSum()

//encode the record
buf := new(bytes.Buffer)
Expand All @@ -168,10 +179,13 @@ func (d *DiskStore) Set(key string, value string) error {

func (d *DiskStore) Delete(key string) error {
timestamp := uint32(time.Now().Unix())
h := Header{TimeStamp: timestamp, KeySize: uint32(len(key)), ValueSize: uint32(len(""))}
value := ""
h := Header{TimeStamp: timestamp, KeySize: uint32(len(key)), ValueSize: uint32(len(value))}

// mark as tombstone
h.MarkTombStone()
r := Record{Header: h, Key: key, Value: "", RecordSize: headerSize + h.KeySize + h.ValueSize}
r.Header.CheckSum = r.CalculateCheckSum()

buf := new(bytes.Buffer)
err := r.EncodeKV(buf)
Expand Down Expand Up @@ -258,3 +272,14 @@ func (d *DiskStore) initKeyDir(existingFile string) error {
}
return nil
}

// returns a list of the current keys
func (d *DiskStore) ListKeys() []string {
result := make([]string, 0, len(d.keyDir))

for k := range d.keyDir {
result = append(result, k)
}

return result
}
14 changes: 11 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ package caskdb
import "errors"

var (
ErrKeyNotFound = errors.New("invalid key: key either deleted or expired")
ErrSeekFailed = errors.New("see fail: failed to seek to the correct offset")
ErrReadFailed = errors.New("read fail: failed to read data from disk")
ErrEmptyKey = errors.New("invalid key: empty key not allowed")
ErrLargeKey = errors.New("invalid key: size cant be greater than 4.2GB")
ErrKeyNotFound = errors.New("invalid key: key either deleted or expired")

ErrLargeValue = errors.New("invalid value: size cant be greater than 4.2GB")

ErrSeekFailed = errors.New("see fail: failed to seek to the correct offset")
ErrReadFailed = errors.New("read fail: failed to read data from disk")

ErrEncodingFailed = errors.New("encoding fail: failed to encode kv record")
ErrDecodingFailed = errors.New("decoding fail: failed to decode kv record")

ErrChecksumMismatch = errors.New("invalid data: checksum does not match")
)
11 changes: 9 additions & 2 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

func main() {
store, err := caskDB.NewDiskStore("test.db")
dataFileName := "test.db"
store, err := caskDB.NewDiskStore(dataFileName)
if err != nil {
log.Fatalf("failed to create disk store: %v", err)
os.Exit(-1)
Expand All @@ -31,5 +32,11 @@ func main() {
}

rbDriver, _ := store.Get("redbull")
fmt.Printf("%s drives for redbull racing!", rbDriver)
fmt.Printf("%s drives for redbull racing!\n", rbDriver)

fmt.Println("Current keys:")
for _, key := range store.ListKeys() {
fmt.Printf("key: %s\n", key)
}

}
66 changes: 47 additions & 19 deletions format.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package caskdb
import (
"bytes"
"encoding/binary"
"hash/crc32"
)

// format file provides encode/decode functions for serialisation and deserialisation
Expand Down Expand Up @@ -44,28 +45,33 @@ import (
// headerSize specifies the total header size. Our key value pair, when stored on disk
// looks like this:
//
// ┌───────────┬───────────┬────────────┬──────────────┬───────┐
// │ timestamp │ key_size | value_size meta │ key │ value │
// └───────────┴───────────┴────────────┴──────────────┴───────┘
// ┌─────────────────┬───────────┬────────────┬─────────────┬───────┬───────┐
// │ crc | meta | timestamp key_size | value_size │ key │ value │
// └─────────────────┴───────────┴────────────┴─────────────┴───────┴───────┘
//
// This is analogous to a typical database's row (or a record). The total length of
// the row is variable, depending on the contents of the key and value.
//
// The first four fields form the header:
// The first five fields form the header:
//
// ┌───────────────┬───────────────────────────────────────────┐
// │ timestamp(4B) │ key_size(4B) | value_size(4B) │ meta(1B) │
// └───────────────┴───────────────────────────────────────────┘
// ┌───────────────────────┬────────────────────────────────┬─────────────────┐
// │ crc(4B) │ meta(1B) | timestamp(4B) | key_size(4B) │ value_size(4B) │
// └───────────────────────┴────────────────────────────────┴─────────────────┘
//
// The first three fields store unsigned integers of size 4 bytes and last field stores 1 byte.
// giving our header a fixed length of 14 bytes.
// The first field of 4 bytes stores the checksum of the kv record including the header.
// The second byte stores the metadata about the kv record.
// We can use it for marking a record as tombstone by setting its MSB to 1.
// The rest three fields store unsigned integers of size 4 bytes giving our header a fixed length of 17 bytes.
// Timestamp field stores the time the record we inserted in unix epoch seconds.
// Key size and value size fields store the length of bytes occupied by the key and value.
// meta stores all the metadata about a kv record.
// We can use it for marking a record as tombstone by setting its MSB to 1.
const headerSize = 17

// The maximum integer stored by 4 bytes is 4,294,967,295 (2 ** 32 - 1), roughly ~4.2GB.
// So, the size of each key or value cannot exceed this. Theoretically, a single row can be as large as ~8.4GB.
const headerSize = 13
const (
MaxKeySize = 1<<32 - 1
MaxValueSize = 1<<32 - 1
)

// KeyEntry keeps the metadata about the KV, specially the position of
// the byte offset in the file. Whenever we insert/update a key, we create a new
Expand All @@ -83,10 +89,11 @@ type KeyEntry struct {
}

type Header struct {
CheckSum uint32
Meta uint8
TimeStamp uint32
KeySize uint32
ValueSize uint32
Meta uint8
}

type Record struct {
Expand All @@ -101,18 +108,20 @@ func NewKeyEntry(timestamp uint32, position uint32, totalSize uint32) KeyEntry {
}

func (h *Header) EncodeHeader(buf *bytes.Buffer) error {
err := binary.Write(buf, binary.LittleEndian, &h.TimeStamp)
err := binary.Write(buf, binary.LittleEndian, &h.CheckSum)
binary.Write(buf, binary.LittleEndian, &h.Meta)
binary.Write(buf, binary.LittleEndian, &h.TimeStamp)
binary.Write(buf, binary.LittleEndian, &h.KeySize)
binary.Write(buf, binary.LittleEndian, &h.ValueSize)
binary.Write(buf, binary.LittleEndian, &h.Meta)
return err
}

func (h *Header) DecodeHeader(buf []byte) error {
err := binary.Read(bytes.NewReader(buf[0:4]), binary.LittleEndian, &h.TimeStamp)
binary.Read(bytes.NewReader(buf[4:8]), binary.LittleEndian, &h.KeySize)
binary.Read(bytes.NewReader(buf[8:12]), binary.LittleEndian, &h.ValueSize)
binary.Read(bytes.NewReader(buf[12:13]), binary.LittleEndian, &h.Meta)
err := binary.Read(bytes.NewReader(buf[0:4]), binary.LittleEndian, &h.CheckSum)
binary.Read(bytes.NewReader(buf[4:5]), binary.LittleEndian, &h.Meta)
binary.Read(bytes.NewReader(buf[5:9]), binary.LittleEndian, &h.TimeStamp)
binary.Read(bytes.NewReader(buf[9:13]), binary.LittleEndian, &h.KeySize)
binary.Read(bytes.NewReader(buf[13:17]), binary.LittleEndian, &h.ValueSize)
return err
}

Expand Down Expand Up @@ -153,3 +162,22 @@ func (r *Record) DecodeKV(buf []byte) error {
func (r *Record) Size() uint32 {
return r.RecordSize
}

func (r *Record) CalculateCheckSum() uint32 {
// encode header
headerBuf := new(bytes.Buffer)
binary.Write(headerBuf, binary.LittleEndian, &r.Header.Meta)
binary.Write(headerBuf, binary.LittleEndian, &r.Header.TimeStamp)
binary.Write(headerBuf, binary.LittleEndian, &r.Header.KeySize)
binary.Write(headerBuf, binary.LittleEndian, &r.Header.ValueSize)

// encode kv
kvBuf := append([]byte(r.Key), []byte(r.Value)...)

buf := append(headerBuf.Bytes(), kvBuf...)
return crc32.ChecksumIEEE(buf)
}

func (r *Record) VerifyCheckSum(data []byte) bool {
return crc32.ChecksumIEEE(data[4:]) == r.Header.CheckSum
}
6 changes: 3 additions & 3 deletions format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

func Test_encodeHeader(t *testing.T) {
tests := []*Header{
{10, 10, 10, 1},
{0, 0, 0, 0},
{10000, 10000, 10000, 1},
{10, 1, 10, 10, 10},
{0, 0, 0, 0, 0},
{10000, 1, 10000, 10000, 1000},
}
for _, tt := range tests {
newBuf := new(bytes.Buffer)
Expand Down
3 changes: 3 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ type Store interface {
Get(key string) (string, error)
Set(key string, value string) error
Delete(key string) error
ListKeys() []string
Close() bool
}

var _ Store = (*DiskStore)(nil)
17 changes: 17 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package caskdb

func validateKV(key string, value []byte) error {
if len(key) == 0 {
return ErrEmptyKey
}

if len(key) > MaxKeySize {
return ErrLargeKey
}

if len(value) > MaxValueSize {
return ErrLargeValue
}

return nil
}