Skip to content

Commit

Permalink
internal/ctlog: two-step commits for idempotent uploads
Browse files Browse the repository at this point in the history
Based on a design by @jellevandenhooff at jellevandenhooff#1.

Fixes #11

Co-authored-by: Jelle van den Hooff <[email protected]>
  • Loading branch information
FiloSottile and jellevandenhooff committed Aug 7, 2024
1 parent e5214a4 commit a481639
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 62 deletions.
159 changes: 132 additions & 27 deletions internal/ctlog/ctlog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ctlog

import (
"archive/tar"
"bytes"
"context"
"crypto"
Expand All @@ -11,12 +12,16 @@ import (
"crypto/x509"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"maps"
mathrand "math/rand/v2"
"sync"
"testing"
"time"

"crawshaw.io/sqlite"
Expand Down Expand Up @@ -200,11 +205,17 @@ func LoadLog(ctx context.Context, config *Config) (*Log, error) {
case c1.N < c.N:
// It's possible that we crashed between committing a new checkpoint to
// the lock backend and uploading it to the object storage backend.
// Or maybe the object storage backend GETs are cached.
// That's ok, as long as the rest of the tree load correctly against the
// lock checkpoint.
// Apply the staged tiles before continuing.
config.Log.WarnContext(ctx, "checkpoint in object storage is older than lock checkpoint",
"old_size", c1.N, "size", c.N)
stagingPath := fmt.Sprintf("staging/%d-%s", c.N, hex.EncodeToString(c.Hash[:]))
stagedUploads, err := config.Backend.Fetch(ctx, stagingPath)
if err != nil {
return nil, fmt.Errorf("couldn't fetch staged uploads: %w", err)
}
if err := applyStagedUploads(ctx, config, stagedUploads); err != nil {
return nil, fmt.Errorf("couldn't apply staged uploads: %w", err)
}
}

cacheRead, cacheWrite, err := initCache(config.Cache)
Expand Down Expand Up @@ -376,6 +387,7 @@ type UploadOptions struct {

var optsHashTile = &UploadOptions{Immutable: true}
var optsDataTile = &UploadOptions{Compress: true, Immutable: true}
var optsStaging = &UploadOptions{Compress: true, Immutable: true}
var optsIssuer = &UploadOptions{ContentType: "application/pkix-cert", Immutable: true}
var optsCheckpoint = &UploadOptions{ContentType: "text/plain; charset=utf-8"}

Expand Down Expand Up @@ -621,7 +633,9 @@ func (l *Log) RunSequencer(ctx context.Context, period time.Duration) (err error
}()

// Randomly stagger the sequencers to avoid conflicting for resources.
time.Sleep(time.Duration(mathrand.Int64N(int64(period))))
if !testing.Testing() {
time.Sleep(time.Duration(mathrand.Int64N(int64(period))))
}

t := time.NewTicker(period)
defer t.Stop()
Expand Down Expand Up @@ -676,18 +690,16 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
close(p.done)
}()

var tileCount int
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, sequenceTimeout)
defer cancel()
g, gctx := errgroup.WithContext(ctx)
defer g.Wait()

timestamp := timeNowUnixMilli()
if timestamp <= l.tree.Time {
return fmt.Errorf("%w: time did not progress! %d -> %d", errFatal, l.tree.Time, timestamp)
}

var tileUploads []*uploadAction
edgeTiles := maps.Clone(l.edgeTiles)
var dataTile []byte
// Load the current partial data tile, if any.
Expand Down Expand Up @@ -719,37 +731,35 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {

n++

// If the data tile is full, upload it.
// If the data tile is full, stage it.
if n%sunlight.TileWidth == 0 {
tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1))
tile.L = -1
edgeTiles[-1] = tileWithBytes{tile, dataTile}
l.c.Log.DebugContext(ctx, "uploading full data tile",
l.c.Log.DebugContext(ctx, "staging full data tile",
"tree_size", n, "tile", tile, "size", len(dataTile))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
data := dataTile // data is captured by the g.Go function.
g.Go(func() error { return l.c.Backend.Upload(gctx, sunlight.TilePath(tile), data, optsDataTile) })
tileUploads = append(tileUploads, &uploadAction{
sunlight.TilePath(tile), dataTile, optsDataTile})
dataTile = nil
}
}

// Upload leftover partial data tile, if any.
// Stage leftover partial data tile, if any.
if n != l.tree.N && n%sunlight.TileWidth != 0 {
tile := tlog.TileForIndex(sunlight.TileHeight, tlog.StoredHashIndex(0, n-1))
tile.L = -1
edgeTiles[-1] = tileWithBytes{tile, dataTile}
l.c.Log.DebugContext(ctx, "uploading partial data tile",
l.c.Log.DebugContext(ctx, "staging partial data tile",
"tree_size", n, "tile", tile, "size", len(dataTile))
l.m.SeqDataTileSize.Observe(float64(len(dataTile)))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, sunlight.TilePath(tile), dataTile, optsDataTile) })
tileUploads = append(tileUploads, &uploadAction{
sunlight.TilePath(tile), dataTile, optsDataTile})
}

// Produce and upload new tree tiles.
// Produce and stage new tree tiles.
tiles := tlog.NewTiles(sunlight.TileHeight, l.tree.N, n)
for _, tile := range tiles {
tile := tile // tile is captured by the g.Go function.
data, err := tlog.ReadTileData(tile, hashReader)
if err != nil {
return fmtErrorf("couldn't generate tile %v: %w", tile, err)
Expand All @@ -759,14 +769,10 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
if t0, ok := edgeTiles[tile.L]; !ok || t0.N < tile.N || (t0.N == tile.N && t0.W < tile.W) {
edgeTiles[tile.L] = tileWithBytes{tile, data}
}
l.c.Log.DebugContext(ctx, "uploading tree tile", "old_tree_size", oldSize,
l.c.Log.DebugContext(ctx, "staging tree tile", "old_tree_size", oldSize,
"tree_size", n, "tile", tile, "size", len(data))
tileCount++
g.Go(func() error { return l.c.Backend.Upload(gctx, sunlight.TilePath(tile), data, optsHashTile) })
}

if err := g.Wait(); err != nil {
return fmtErrorf("couldn't upload a tile: %w", err)
tileUploads = append(tileUploads, &uploadAction{
sunlight.TilePath(tile), data, optsHashTile})
}

if testingOnlyPauseSequencing != nil {
Expand All @@ -778,6 +784,20 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
return fmtErrorf("couldn't compute tree head: %w", err)
}

// Upload tiles to staging, where they can be recovered by LoadLog if we
// crash right after updating the lock database. See also
// https://github.com/FiloSottile/sunlight/issues/11.
stagedUploads, err := marshalStagedUploads(tileUploads)
if err != nil {
return fmtErrorf("couldn't marshal staged uploads: %w", err)
}
stagingPath := fmt.Sprintf("staging/%d-%s", tree.N, hex.EncodeToString(tree.Hash[:]))
l.c.Log.DebugContext(ctx, "uploading staged tiles", "old_tree_size", oldSize,
"tree_size", n, "path", stagingPath, "size", len(stagedUploads))
if err := l.c.Backend.Upload(ctx, stagingPath, stagedUploads, optsStaging); err != nil {
return fmtErrorf("couldn't upload staged tiles: %w", err)
}

checkpoint, err := signTreeHead(l.c, tree)
if err != nil {
return fmtErrorf("couldn't sign checkpoint: %w", err)
Expand All @@ -802,6 +822,14 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
l.lockCheckpoint = newLock
l.edgeTiles = edgeTiles

// Use applyStagedUploads instead of going over tileUploads directly, to
// exercise the same code path as LoadLog.
if err := applyStagedUploads(ctx, l.c, stagedUploads); err != nil {
// This is also fatal, since we can't continue leaving behind missing
// tiles. LoadLog will retry uploading them from the staging bundle.
return fmtErrorf("%w: couldn't upload a tile: %w", errFatal, err)
}

if err := l.c.Backend.Upload(ctx, "checkpoint", checkpoint, optsCheckpoint); err != nil {
// Return an error so we don't produce SCTs that, although safely
// serialized, wouldn't be part of a publicly visible tree.
Expand All @@ -823,9 +851,9 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {
}
l.c.Log.Info("sequenced pool",
"tree_size", tree.N, "entries", n-oldSize,
"tiles", tileCount, "timestamp", timestamp,
"tiles", len(tileUploads), "timestamp", timestamp,
"elapsed", time.Since(start))
l.m.SeqTiles.Add(float64(tileCount))
l.m.SeqTiles.Add(float64(len(tileUploads)))
l.m.TreeSize.Set(float64(tree.N))
l.m.TreeTime.Set(float64(timestamp) / 1000)

Expand All @@ -834,6 +862,83 @@ func (l *Log) sequencePool(ctx context.Context, p *pool) (err error) {

var testingOnlyPauseSequencing func()

type uploadAction struct {
key string
data []byte
opts *UploadOptions
}

func marshalStagedUploads(uploads []*uploadAction) ([]byte, error) {
var buffer bytes.Buffer
writer := tar.NewWriter(&buffer)
for _, u := range uploads {
opts, err := json.Marshal(u.opts)
if err != nil {
return nil, fmtErrorf("couldn't marshal upload options: %w", err)
}
if err := writer.WriteHeader(&tar.Header{
Name: u.key,
Size: int64(len(u.data)),
PAXRecords: map[string]string{"SUNLIGHT.opts": string(opts)},
}); err != nil {
return nil, fmtErrorf("error writing tar header: %w", err)
}
if _, err := writer.Write(u.data); err != nil {
return nil, fmtErrorf("error writing tar data: %w", err)
}
}
if err := writer.Close(); err != nil {
return nil, fmtErrorf("error closing tar writer: %w", err)
}
return buffer.Bytes(), nil
}

func applyStagedUploads(ctx context.Context, config *Config, stagedUploads []byte) error {
g, gctx := errgroup.WithContext(ctx)
reader := tar.NewReader(bytes.NewReader(stagedUploads))
for {
header, err := reader.Next()
if err == io.EOF {
break
}
if err != nil {
return fmtErrorf("error reading tar header: %w", err)
}
key := header.Name
optsBytes, ok := header.PAXRecords["SUNLIGHT.opts"]
if !ok {
return fmtErrorf("missing SUNLIGHT.opts in tar header")
}
opts := &UploadOptions{}
if err := json.Unmarshal([]byte(optsBytes), opts); err != nil {
return fmtErrorf("couldn't unmarshal upload options: %w", err)
}
data, err := io.ReadAll(reader)
if err != nil {
return fmtErrorf("error reading tar data: %w", err)
}

g.Go(func() error {
// Since errors are fatal, and uploads are idempotent, retry to
// avoid having to reload the whole process.
attempt := 1
for {
err := config.Backend.Upload(gctx, key, data, opts)
if err == nil || attempt == 5 {
return err
}
config.Log.WarnContext(gctx, "tile upload failed",
"key", key, "err", err, "attempt", attempt)
if !testing.Testing() {
time.Sleep(time.Duration(attempt) * 25 * time.Millisecond)
}
attempt++
}
})
}
return g.Wait()
}

// signTreeHead signs the tree and returns a c2sp.org/checkpoint.
func signTreeHead(c *Config, tree treeWithTimestamp) (checkpoint []byte, err error) {
sthBytes, err := ct.SerializeSTHSignatureInput(ct.SignedTreeHead{
Expand Down
Loading

0 comments on commit a481639

Please sign in to comment.