Skip to content

Commit

Permalink
Replace atomic ints with atomic.Int64 for Statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
tomekjarosik committed Sep 5, 2024
1 parent a763966 commit 379c055
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 49 deletions.
5 changes: 3 additions & 2 deletions pkg/dirimage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ func precompute(ctx context.Context, layers []*filesegment.Layer, workersCount i
jobs := make(chan *filesegment.Layer, workersCount)
g, ctx := errgroup.WithContext(ctx)

var aBytesReadCount atomic.Int64
for w := 0; w < workersCount; w++ {
g.Go(func() error {
for l := range jobs {
_, _ = l.DiffID()
_, _ = l.Digest()
atomic.AddInt64(&bytesReadCount, 2*l.Length())
aBytesReadCount.Add(2 * l.Length())
}
return nil
})
Expand All @@ -42,7 +43,7 @@ func precompute(ctx context.Context, layers []*filesegment.Layer, workersCount i
})

err = g.Wait()
return bytesReadCount, err
return aBytesReadCount.Load(), err
}

func Read(ctx context.Context, dir string, opt ...Option) (*DirImage, error) {
Expand Down
39 changes: 27 additions & 12 deletions pkg/layout/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/mobileinf/geranos/pkg/dirimage"
"github.com/mobileinf/geranos/pkg/duplicator"
"github.com/mobileinf/geranos/pkg/filesegment"
"runtime"

"github.com/mobileinf/geranos/pkg/duplicator"

"github.com/mobileinf/geranos/pkg/sketch"
"github.com/mobileinf/geranos/pkg/sparsefile"
"io"
Expand Down Expand Up @@ -99,15 +98,20 @@ func (lm *Mapper) Write(ctx context.Context, img v1.Image, ref name.Reference) e
return err
}
for _, layer := range manifest.Layers {
lm.stats.Add(&Statistics{SourceBytesCount: layer.Size})
st := Statistics{}
st.SourceBytesCount.Store(layer.Size)
lm.stats.Add(&st)
}

bytesClonedCount, matchedSegmentsCount, err := lm.sketcher.Sketch(destinationDir, *manifest)
if err != nil {
// TODO: ensure we don't delete anything useful _ = os.RemoveAll(destinationDir)
return err
}
lm.stats.Add(&Statistics{BytesClonedCount: bytesClonedCount, MatchedSegmentsCount: matchedSegmentsCount})
st := Statistics{}
st.BytesClonedCount.Store(bytesClonedCount)
st.MatchedSegmentsCount.Store(matchedSegmentsCount)
lm.stats.Add(&st)

convertedImage, err := dirimage.Convert(img)
if err != nil {
Expand All @@ -117,11 +121,12 @@ func (lm *Mapper) Write(ctx context.Context, img v1.Image, ref name.Reference) e
if err != nil {
return fmt.Errorf("unable to write dirimage to '%v': %w", destinationDir, err)
}
lm.stats.Add(&Statistics{
BytesWrittenCount: convertedImage.BytesWrittenCount,
BytesSkippedCount: convertedImage.BytesSkippedCount,
BytesReadCount: convertedImage.BytesReadCount,
})

st = Statistics{}
st.BytesWrittenCount.Store(convertedImage.BytesWrittenCount)
st.BytesSkippedCount.Store(convertedImage.BytesSkippedCount)
st.BytesReadCount.Store(convertedImage.BytesReadCount)
lm.stats.Add(&st)
return nil
}

Expand All @@ -131,7 +136,9 @@ func (lm *Mapper) Read(ctx context.Context, ref name.Reference) (v1.Image, error
if err != nil {
return nil, fmt.Errorf("unable to read dirimage: %w", err)
}
lm.stats.Add(&Statistics{BytesReadCount: img.BytesReadCount})
st := Statistics{}
st.BytesReadCount.Store(img.BytesReadCount)
lm.stats.Add(&st)
return img, err
}

Expand Down Expand Up @@ -258,6 +265,14 @@ func (lm *Mapper) Remove(src name.Reference) error {
return os.RemoveAll(filepath.Join(lm.rootDir, lm.refToDir(ref)))
}

func (lm *Mapper) Stats() Statistics {
return lm.stats
func (lm *Mapper) Stats() ImmutableStatistics {
return ImmutableStatistics{
SourceBytesCount: lm.stats.SourceBytesCount.Load(),
BytesWrittenCount: lm.stats.BytesWrittenCount.Load(),
BytesSkippedCount: lm.stats.BytesSkippedCount.Load(),
BytesReadCount: lm.stats.BytesReadCount.Load(),
BytesClonedCount: lm.stats.BytesClonedCount.Load(),
CompressedBytesCount: lm.stats.CompressedBytesCount.Load(),
MatchedSegmentsCount: lm.stats.MatchedSegmentsCount.Load(),
}
}
30 changes: 15 additions & 15 deletions pkg/layout/layout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestLayoutMapper_Read_VariousChunkSizes(t *testing.T) {
}
}
st := lmDst.Stats()
fmt.Printf("%#v\n", st)
fmt.Printf("%+v\n", st)
if st.BytesWrittenCount != 102 {
t.Fatalf("unexpected number of bytes written: expected %d got %d", 918, st.BytesWrittenCount)
}
Expand Down Expand Up @@ -178,8 +178,8 @@ func TestLayoutMapper_Write_MustAvoidWritingSameContent(t *testing.T) {
img1, err := lm.Read(ctx, srcRef)
require.NoErrorf(t, err, "unable to read disk image: %v", err)

if lm.stats.BytesReadCount != 2000 { // we read each byte twice to calculate diffID and digest
t.Fatalf("unexpected number of bytes read: expected %v, got %v", 2000, lm.stats.BytesReadCount)
if lm.stats.BytesReadCount.Load() != 2000 { // we read each byte twice to calculate diffID and digest
t.Fatalf("unexpected number of bytes read: expected %v, got %v", 2000, lm.stats.BytesReadCount.Load())
}

destRef, err := name.ParseReference("oci.jarosik.online/testrepo/a:v2")
Expand All @@ -188,18 +188,18 @@ func TestLayoutMapper_Write_MustAvoidWritingSameContent(t *testing.T) {
err = lm.Write(ctx, img1, destRef)
require.NoErrorf(t, err, "unable to write image %v: %v", destRef, err)

assert.Equal(t, int64(1000), lm.stats.BytesWrittenCount)
assert.Equal(t, int64(1000), lm.stats.BytesWrittenCount.Load())
lm.stats.Clear()

destRef3, err := name.ParseReference("oci.jarosik.online/testrepo/a:v3")
require.NoErrorf(t, err, "unable to parse reference %v: %v", destRef3, err)

err = lm.Write(ctx, img1, destRef3)
require.NoErrorf(t, err, "unable to write image %v: %v", destRef, err)
assert.Equal(t, int64(0), lm.stats.BytesWrittenCount)
assert.Equal(t, int64(1000), lm.stats.BytesReadCount)
assert.Equal(t, int64(1000), lm.stats.BytesClonedCount)
assert.Equal(t, int64(100), lm.stats.MatchedSegmentsCount)
assert.Equal(t, int64(0), lm.stats.BytesWrittenCount.Load())
assert.Equal(t, int64(1000), lm.stats.BytesReadCount.Load())
assert.Equal(t, int64(1000), lm.stats.BytesClonedCount.Load())
assert.Equal(t, int64(100), lm.stats.MatchedSegmentsCount.Load())

afterHash := hashFromFile(t, path.Join(tempDir, "oci.jarosik.online/testrepo/a:v3/disk.img"))
assert.Equal(t, beforeHash, afterHash)
Expand All @@ -224,8 +224,8 @@ func TestLayoutMapper_Write_MustOnlyWriteContentThatDiffersFromAlreadyWritten(t
img1, err := lm.Read(ctx, srcRef)
require.NoErrorf(t, err, "unable to read disk image: %v", err)

if lm.stats.BytesReadCount != 2000 { // we read each byte twice to calculate diffID and digest
t.Fatalf("unexpected number of bytes read: expected %v, got %v", 2000, lm.stats.BytesReadCount)
if lm.stats.BytesReadCount.Load() != 2000 { // we read each byte twice to calculate diffID and digest
t.Fatalf("unexpected number of bytes read: expected %v, got %v", 2000, lm.stats.BytesReadCount.Load())
}

destRef, err := name.ParseReference("oci.jarosik.online/testrepo/a:v2")
Expand All @@ -234,7 +234,7 @@ func TestLayoutMapper_Write_MustOnlyWriteContentThatDiffersFromAlreadyWritten(t
err = lm.Write(ctx, img1, destRef)
require.NoErrorf(t, err, "unable to write image %v: %v", destRef, err)

assert.Equal(t, int64(1000), lm.stats.BytesWrittenCount)
assert.Equal(t, int64(1000), lm.stats.BytesWrittenCount.Load())
lm.stats.Clear()

// Here "testrepo/a:v2" contains .oci.manifest.json, and is the same as generated file
Expand All @@ -260,10 +260,10 @@ func TestLayoutMapper_Write_MustOnlyWriteContentThatDiffersFromAlreadyWritten(t
require.NoErrorf(t, err, "unable to parse reference %v: %v", destRef, err)
err = lm.Write(ctx, img3, destRef)
require.NoErrorf(t, err, "unable to write image %v: %v", destRef, err)
assert.Equal(t, int64(20), lm.stats.BytesWrittenCount)
assert.Equal(t, int64(1020), lm.stats.BytesReadCount)
assert.Equal(t, int64(1020), lm.stats.BytesClonedCount)
assert.Equal(t, int64(100), lm.stats.MatchedSegmentsCount)
assert.Equal(t, int64(20), lm.stats.BytesWrittenCount.Load())
assert.Equal(t, int64(1020), lm.stats.BytesReadCount.Load())
assert.Equal(t, int64(1020), lm.stats.BytesClonedCount.Load())
assert.Equal(t, int64(100), lm.stats.MatchedSegmentsCount.Load())
}

func TestLayoutMapper_Write_MultipleConcurrentWorkers(t *testing.T) {
Expand Down
73 changes: 53 additions & 20 deletions pkg/layout/statistics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,60 @@
package layout

import "sync/atomic"
import (
"fmt"
"sync/atomic"
)

type Statistics struct {
SourceBytesCount atomic.Int64
BytesWrittenCount atomic.Int64
BytesSkippedCount atomic.Int64
BytesReadCount atomic.Int64
BytesClonedCount atomic.Int64
CompressedBytesCount atomic.Int64
MatchedSegmentsCount atomic.Int64
}

func (s *Statistics) Add(other *Statistics) {
s.BytesWrittenCount.Add(other.BytesWrittenCount.Load())
s.BytesSkippedCount.Add(other.BytesSkippedCount.Load())
s.BytesReadCount.Add(other.BytesReadCount.Load())
s.BytesClonedCount.Add(other.BytesClonedCount.Load())
s.CompressedBytesCount.Add(other.CompressedBytesCount.Load())
s.MatchedSegmentsCount.Add(other.MatchedSegmentsCount.Load())
s.SourceBytesCount.Add(other.SourceBytesCount.Load())
}

func (s *Statistics) Clear() {
s.BytesWrittenCount.Store(0)
s.BytesSkippedCount.Store(0)
s.BytesReadCount.Store(0)
s.BytesClonedCount.Store(0)
s.CompressedBytesCount.Store(0)
s.MatchedSegmentsCount.Store(0)
}

// String formats the Statistics struct for human-readable output
func (s *Statistics) String() string {
return fmt.Sprintf("Statistics: \n"+
"SourceBytesCount: %d\n"+
"BytesWrittenCount: %d\n"+
"BytesSkippedCount: %d\n"+
"BytesReadCount: %d\n"+
"BytesClonedCount: %d\n"+
"CompressedBytesCount: %d\n"+
"MatchedSegmentsCount: %d\n",
s.SourceBytesCount.Load(),
s.BytesWrittenCount.Load(),
s.BytesSkippedCount.Load(),
s.BytesReadCount.Load(),
s.BytesClonedCount.Load(),
s.CompressedBytesCount.Load(),
s.MatchedSegmentsCount.Load())
}

// ImmutableStatistics holds the immutable copy of statistics
type ImmutableStatistics struct {
SourceBytesCount int64
BytesWrittenCount int64
BytesSkippedCount int64
Expand All @@ -11,22 +63,3 @@ type Statistics struct {
CompressedBytesCount int64
MatchedSegmentsCount int64
}

func (s *Statistics) Add(other *Statistics) {
atomic.AddInt64(&s.BytesWrittenCount, other.BytesWrittenCount)
atomic.AddInt64(&s.BytesSkippedCount, other.BytesSkippedCount)
atomic.AddInt64(&s.BytesReadCount, other.BytesReadCount)
atomic.AddInt64(&s.BytesClonedCount, other.BytesClonedCount)
atomic.AddInt64(&s.CompressedBytesCount, other.CompressedBytesCount)
atomic.AddInt64(&s.MatchedSegmentsCount, other.MatchedSegmentsCount)
atomic.AddInt64(&s.SourceBytesCount, other.SourceBytesCount)
}

func (s *Statistics) Clear() {
atomic.StoreInt64(&s.BytesWrittenCount, 0)
atomic.StoreInt64(&s.BytesSkippedCount, 0)
atomic.StoreInt64(&s.BytesReadCount, 0)
atomic.StoreInt64(&s.BytesClonedCount, 0)
atomic.StoreInt64(&s.CompressedBytesCount, 0)
atomic.StoreInt64(&s.MatchedSegmentsCount, 0)
}

0 comments on commit 379c055

Please sign in to comment.