Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk store #848

Merged
merged 13 commits into from
Nov 5, 2024
111 changes: 110 additions & 1 deletion encoding/rs/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package rs

import (
"bytes"
"encoding/binary"
"encoding/gob"

"fmt"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254/fr"
)

Expand All @@ -13,6 +15,7 @@ type Frame struct {
Coeffs []fr.Element
}

// Encode serializes the frame into a byte slice.
func (f *Frame) Encode() ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand All @@ -23,6 +26,7 @@ func (f *Frame) Encode() ([]byte, error) {
return buf.Bytes(), nil
}

// Decode deserializes a byte slice into a frame.
func Decode(b []byte) (Frame, error) {
var f Frame
buf := bytes.NewBuffer(b)
Expand All @@ -33,3 +37,108 @@ func Decode(b []byte) (Frame, error) {
}
return f, nil
}

// GnarkEncodeFrames serializes a slice of frames into a byte slice.
//
// Serialization format:
// [number of frames: 4 byte uint32]
// [size of frame 1: 4 byte uint32][frame 1]
// [size of frame 2: 4 byte uint32][frame 2]
// ...
// [size of frame n: 4 byte uint32][frame n]
//
// Where relevant, big endian encoding is used.
func GnarkEncodeFrames(frames []*Frame) ([]byte, error) {

// Count the number of bytes.
encodedSize := uint32(4) // stores the number of frames
for _, frame := range frames {
encodedSize += 4 // stores the size of the frame
encodedSize += GnarkFrameSize(frame) // size of the frame
}

serializedBytes := make([]byte, encodedSize)
binary.BigEndian.PutUint32(serializedBytes, uint32(len(frames)))
index := uint32(4)

for _, frame := range frames {
index += GnarkEncodeFrame(frame, serializedBytes[index:])
}

if index != encodedSize {
// Sanity check, this should never happen.
return nil, fmt.Errorf("encoded size mismatch: expected %d, got %d", encodedSize, index)
}

return serializedBytes, nil
}

// GnarkEncodeFrame serializes a frame into a target byte slice. Returns the number of bytes written.
func GnarkEncodeFrame(frame *Frame, target []byte) uint32 {
binary.BigEndian.PutUint32(target, uint32(len(frame.Coeffs)))
index := uint32(4)

for _, coeff := range frame.Coeffs {
serializedCoeff := coeff.Marshal()
copy(target[index:], serializedCoeff)
index += uint32(len(serializedCoeff))
}

return index
}

// GnarkFrameSize returns the size of a frame in bytes.
func GnarkFrameSize(frame *Frame) uint32 {
return uint32(encoding.BYTES_PER_SYMBOL * len(frame.Coeffs))
}

// GnarkDecodeFrames deserializes a byte slice into a slice of frames.
func GnarkDecodeFrames(serializedFrames []byte) ([]*Frame, error) {
frameCount := binary.BigEndian.Uint32(serializedFrames)
index := uint32(4)

frames := make([]*Frame, frameCount)

for i := 0; i < int(frameCount); i++ {
frame, bytesRead, err := GnarkDecodeFrame(serializedFrames[index:])

if err != nil {
return nil, fmt.Errorf("failed to decode frame %d: %w", i, err)
}

frames[i] = frame
index += bytesRead
}

if index != uint32(len(serializedFrames)) {
return nil, fmt.Errorf("decoded size mismatch: expected %d, got %d", len(serializedFrames), index)
}

return frames, nil
}

// GnarkDecodeFrame deserializes a byte slice into a frame. Returns the frame and the number of bytes read.
func GnarkDecodeFrame(serializedFrame []byte) (*Frame, uint32, error) {
if len(serializedFrame) < 4 {
return nil, 0, fmt.Errorf("invalid frame size: %d", len(serializedFrame))
}

frameCount := binary.BigEndian.Uint32(serializedFrame)
index := uint32(4)

if len(serializedFrame) < int(index+frameCount*encoding.BYTES_PER_SYMBOL) {
return nil, 0, fmt.Errorf("invalid frame size: %d", len(serializedFrame))
}

coeffs := make([]fr.Element, frameCount)
for i := 0; i < int(frameCount); i++ {
coeff := fr.Element{}
coeff.Unmarshal(serializedFrame[index : index+encoding.BYTES_PER_SYMBOL])
coeffs[i] = coeff
index += uint32(encoding.BYTES_PER_SYMBOL)
}

frame := &Frame{Coeffs: coeffs}

return frame, index, nil
}
79 changes: 79 additions & 0 deletions encoding/rs/frame_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rs_test

import (
"fmt"
"math"
"testing"

Expand Down Expand Up @@ -47,3 +48,81 @@ func TestEncodeDecodeFrame_AreInverses(t *testing.T) {

assert.Equal(t, frame, frames[0])
}

func TestGnarkEncodeDecodeFrame_AreInverses(t *testing.T) {
teardownSuite := setupSuite(t)
defer teardownSuite(t)

params := encoding.ParamsFromSysPar(numSys, numPar, uint64(len(GETTYSBURG_ADDRESS_BYTES)))
enc, _ := rs.NewEncoder(params, true)

n := uint8(math.Log2(float64(enc.NumEvaluations())))
if enc.ChunkLength == 1 {
n = uint8(math.Log2(float64(2 * enc.NumChunks)))
}
fs := fft.NewFFTSettings(n)

RsComputeDevice := &rs_cpu.RsCpuComputeDevice{
Fs: fs,
EncodingParams: params,
}

enc.Computer = RsComputeDevice
require.NotNil(t, enc)

frames, _, err := enc.EncodeBytes(GETTYSBURG_ADDRESS_BYTES)
require.Nil(t, err)
require.NotNil(t, frames, err)

serializedSize := rs.GnarkFrameSize(&frames[0]) + 4
bytes := make([]byte, serializedSize)
rs.GnarkEncodeFrame(&frames[0], bytes)

fmt.Printf("\n\n\n")

deserializedFrame, bytesRead, err := rs.GnarkDecodeFrame(bytes)
assert.NoError(t, err)
assert.Equal(t, bytesRead, serializedSize)
assert.Equal(t, &frames[0], deserializedFrame)
}

func TestGnarkEncodeDecodeFrames_AreInverses(t *testing.T) {
teardownSuite := setupSuite(t)
defer teardownSuite(t)

params := encoding.ParamsFromSysPar(numSys, numPar, uint64(len(GETTYSBURG_ADDRESS_BYTES)))
enc, _ := rs.NewEncoder(params, true)

n := uint8(math.Log2(float64(enc.NumEvaluations())))
if enc.ChunkLength == 1 {
n = uint8(math.Log2(float64(2 * enc.NumChunks)))
}
fs := fft.NewFFTSettings(n)

RsComputeDevice := &rs_cpu.RsCpuComputeDevice{
Fs: fs,
EncodingParams: params,
}

enc.Computer = RsComputeDevice
require.NotNil(t, enc)

frames, _, err := enc.EncodeBytes(GETTYSBURG_ADDRESS_BYTES)
assert.NoError(t, err)

framesPointers := make([]*rs.Frame, len(frames))
for i, frame := range frames {
framesPointers[i] = &frame
}

encodedFrames, err := rs.GnarkEncodeFrames(framesPointers)
assert.NoError(t, err)

decodedFrames, err := rs.GnarkDecodeFrames(encodedFrames)
assert.NoError(t, err)

assert.Equal(t, len(framesPointers), len(decodedFrames))
for i := range framesPointers {
assert.Equal(t, *framesPointers[i], *decodedFrames[i])
}
}
110 changes: 110 additions & 0 deletions relay/chunkstore/chunk_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package chunkstore

import (
"context"
"fmt"
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/rs"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
)

// ChunkReader reads chunks written by ChunkWriter.
type ChunkReader interface {
// GetChunkProofs reads a slice of proofs from the chunk store.
GetChunkProofs(ctx context.Context, blobKey disperser.BlobKey) ([]*encoding.Proof, error)
// GetChunkCoefficients reads a slice of frames from the chunk store. The metadata parameter
// should match the metadata returned by PutChunkCoefficients.
GetChunkCoefficients(
ctx context.Context,
blobKey disperser.BlobKey) ([]*rs.Frame, error)
}

var _ ChunkReader = (*chunkReader)(nil)

type chunkReader struct {
logger logging.Logger
metadataStore *blobstore.BlobMetadataStore
client s3.Client
bucket string
shards []uint32
}

// NewChunkReader creates a new ChunkReader.
//
// This chunk reader will only return data for the shards specified in the shards parameter.
// If empty, it will return data for all shards. (Note: shard feature is not yet implemented.)
func NewChunkReader(
logger logging.Logger,
metadataStore *blobstore.BlobMetadataStore,
s3Client s3.Client,
bucketName string,
shards []uint32) ChunkReader {

return &chunkReader{
logger: logger,
metadataStore: metadataStore,
client: s3Client,
bucket: bucketName,
shards: shards,
}
}

func (r *chunkReader) GetChunkProofs(
ctx context.Context,
blobKey disperser.BlobKey) ([]*encoding.Proof, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this blobkey references the v1 blob key. In StoreBlob for V2 we use blobKey.Hex() = string

func (b *BlobStore) StoreBlob(ctx context.Context, blobKey string, data []byte) error {
.

V2 blob key:

type BlobKey [32]byte

func (b BlobKey) Hex() string {
	return hex.EncodeToString(b[:])
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also when we fetch for proofs vs coefficients don't we need a different S3 key to differentiate it?

Copy link
Contributor Author

@cody-littley cody-littley Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been assuming we'd use different buckets. Started a slack conversation to discuss. Will circle back on this prior to merging once we decide how we want to handle buckets and namespacing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched over to using v2.BlobKey as recommended by @ian-shim.


s3Key := blobKey.String()

bytes, err := r.client.DownloadObject(ctx, r.bucket, s3Key)
if err != nil {
r.logger.Error("Failed to download chunks from S3: %v", err)
return nil, fmt.Errorf("failed to download chunks from S3: %w", err)
}

if len(bytes)%bn254.SizeOfG1AffineCompressed != 0 {
r.logger.Error("Invalid proof size")
return nil, fmt.Errorf("invalid proof size: %w", err)
}

proofCount := len(bytes) / bn254.SizeOfG1AffineCompressed
proofs := make([]*encoding.Proof, proofCount)

for i := 0; i < proofCount; i++ {
proof := encoding.Proof{}
err := proof.Unmarshal(bytes[i*bn254.SizeOfG1AffineCompressed:])
if err != nil {
r.logger.Error("Failed to unmarshal proof: %v", err)
return nil, fmt.Errorf("failed to unmarshal proof: %w", err)
}
proofs[i] = &proof
}

return proofs, nil
}

func (r *chunkReader) GetChunkCoefficients(
ctx context.Context,
blobKey disperser.BlobKey) ([]*rs.Frame, error) {

s3Key := blobKey.String()

bytes, err := r.client.DownloadObject(ctx, r.bucket, s3Key)
// TODO: Implement fragmented download
//bytes, err := r.client.FragmentedDownloadObject(ctx, r.bucket, s3Key, metadata.DataSize, metadata.FragmentSize)
if err != nil {
r.logger.Error("Failed to download chunks from S3: %v", err)
return nil, fmt.Errorf("failed to download chunks from S3: %w", err)
}

frames, err := rs.GnarkDecodeFrames(bytes)
if err != nil {
r.logger.Error("Failed to decode frames: %v", err)
return nil, fmt.Errorf("failed to decode frames: %w", err)
}

return frames, nil
}
Loading
Loading