Skip to content

Commit

Permalink
Allow unknown remote object size (#431)
Browse files Browse the repository at this point in the history
This PR handles the edge case when remote data source report object size
as unknown (-1)
* The size will not be part of the condition to determine if this file
already exists in the database
* Once prepared, the size of the file will be updated to the correct
size


Caveats
* Since the size is unknown, it's possible that the files within a job
may overflow the sector size
* A warning message is logged so the user is aware
  • Loading branch information
xinaxu authored Mar 7, 2024
1 parent 03cd9d9 commit 9c76b91
Show file tree
Hide file tree
Showing 23 changed files with 165 additions and 82 deletions.
8 changes: 7 additions & 1 deletion .github/actions/go-check-setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ runs:
using: "composite"
steps:
- name: Setup Golang caches
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ matrix.os }}-golang-${{ matrix.go }}-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ matrix.os }}-golang-${{ matrix.go }}-
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.55.2
args: --timeout=10m
2 changes: 1 addition & 1 deletion .github/actions/go-test-setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ runs:
using: "composite"
steps:
- name: Setup Golang caches
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/container-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- name: Get Ref from releaser
id: releaser
if: github.event_name == 'workflow_run'
uses: pl-strflt/uci/.github/actions/[email protected]
uses: ipdxco/unified-github-workflows/.github/actions/[email protected]
with:
artifacts-url: ${{ github.event.workflow_run.artifacts_url }}
publish:
Expand Down Expand Up @@ -64,4 +64,4 @@ jobs:
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/amd64,linux/arm64
platforms: linux/amd64,linux/arm64
2 changes: 1 addition & 1 deletion .github/workflows/go-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ concurrency:

jobs:
go-check:
uses: pl-strflt/uci/.github/workflows/[email protected]
uses: ipdxco/unified-github-workflows/.github/workflows/[email protected]
2 changes: 1 addition & 1 deletion .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ concurrency:

jobs:
go-test:
uses: pl-strflt/uci/.github/workflows/[email protected]
uses: ipdxco/unified-github-workflows/.github/workflows/[email protected]
28 changes: 0 additions & 28 deletions .github/workflows/go.yml

This file was deleted.

2 changes: 0 additions & 2 deletions .github/workflows/release-binaries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ on:
workflow_run:
workflows: [Releaser]
types: [completed]
pull_request:
branches: [ "main" ]
jobs:
bin-releaser:
name: Release Binaries
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ concurrency:

jobs:
release-check:
uses: pl-strflt/uci/.github/workflows/[email protected]
uses: ipdxco/unified-github-workflows/.github/workflows/[email protected]
2 changes: 1 addition & 1 deletion .github/workflows/releaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ concurrency:

jobs:
releaser:
uses: pl-strflt/uci/.github/workflows/[email protected]
uses: ipdxco/unified-github-workflows/.github/workflows/[email protected]
2 changes: 1 addition & 1 deletion .github/workflows/tagpush.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ concurrency:

jobs:
releaser:
uses: pl-strflt/uci/.github/workflows/[email protected]
uses: ipdxco/unified-github-workflows/.github/workflows/[email protected]
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ run:
- docs
- dashboard/model2ts
- handler/datasource/generate
- handler/storage/gen
skip-files:
- cmd/testutil.go

Expand Down Expand Up @@ -62,6 +63,7 @@ linters:
- lll
- dupword
- interfacebloat
- goconst

linters-settings:
errcheck:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ generate: check-go

lint: check-go install-lint-deps
gofmt -s -w .
golangci-lint run --no-config --fix --disable-all -E tagalign
golangci-lint run --fix
golangci-lint run --no-config --fix --disable-all -E tagalign --timeout 10m
golangci-lint run --fix --timeout 10m
staticcheck ./...

test: check-go install-test-deps
Expand Down
19 changes: 10 additions & 9 deletions model/basetypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -205,10 +206,10 @@ func (c ClientConfig) String() string {
values = append(values, "expectContinueTimeout:"+c.ExpectContinueTimeout.String())
}
if c.InsecureSkipVerify != nil {
values = append(values, "insecureSkipVerify:"+fmt.Sprint(*c.InsecureSkipVerify))
values = append(values, "insecureSkipVerify:"+strconv.FormatBool(*c.InsecureSkipVerify))
}
if c.NoGzip != nil {
values = append(values, "noGzip:"+fmt.Sprint(*c.NoGzip))
values = append(values, "noGzip:"+strconv.FormatBool(*c.NoGzip))
}
if c.UserAgent != nil {
values = append(values, "userAgent:"+*c.UserAgent)
Expand All @@ -226,13 +227,13 @@ func (c ClientConfig) String() string {
values = append(values, "headers:<hidden>")
}
if c.DisableHTTP2 != nil {
values = append(values, "disableHTTP2"+fmt.Sprint(*c.DisableHTTP2))
values = append(values, "disableHTTP2"+strconv.FormatBool(*c.DisableHTTP2))
}
if c.DisableHTTPKeepAlives != nil {
values = append(values, "disableHTTPKeepAlives:"+fmt.Sprint(*c.DisableHTTPKeepAlives))
values = append(values, "disableHTTPKeepAlives:"+strconv.FormatBool(*c.DisableHTTPKeepAlives))
}
if c.RetryMaxCount != nil {
values = append(values, "retryMaxCount:"+fmt.Sprint(*c.RetryMaxCount))
values = append(values, "retryMaxCount:"+strconv.Itoa(*c.RetryMaxCount))
}
if c.RetryDelay != nil {
values = append(values, "retryDelay:"+c.RetryDelay.String())
Expand All @@ -244,16 +245,16 @@ func (c ClientConfig) String() string {
values = append(values, "retryBackoffExponential:"+fmt.Sprint(*c.RetryBackoffExponential))
}
if c.SkipInaccessibleFile != nil {
values = append(values, "skipInaccessibleFile:"+fmt.Sprint(*c.SkipInaccessibleFile))
values = append(values, "skipInaccessibleFile:"+strconv.FormatBool(*c.SkipInaccessibleFile))
}
if c.UseServerModTime != nil {
values = append(values, "useServerModTime:"+fmt.Sprint(*c.UseServerModTime))
values = append(values, "useServerModTime:"+strconv.FormatBool(*c.UseServerModTime))
}
if c.LowLevelRetries != nil {
values = append(values, "lowLevelRetries:"+fmt.Sprint(*c.LowLevelRetries))
values = append(values, "lowLevelRetries:"+strconv.Itoa(*c.LowLevelRetries))
}
if c.ScanConcurrency != nil {
values = append(values, "scanConcurrency:"+fmt.Sprint(*c.ScanConcurrency))
values = append(values, "scanConcurrency:"+strconv.Itoa(*c.ScanConcurrency))
}
return strings.Join(values, " ")
}
Expand Down
8 changes: 8 additions & 0 deletions pack/assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Assembler struct {
assembleLinkFor *int
noInline bool
skipInaccessibleFiles bool
fileLengthCorrection map[model.FileID]int64
}

// Close closes the assembler and all of its underlying readers
Expand All @@ -78,6 +79,7 @@ func NewAssembler(ctx context.Context, reader storagesystem.Reader,
objects: make(map[model.FileID]fs.Object),
noInline: noInline,
skipInaccessibleFiles: skipInaccessibleFiles,
fileLengthCorrection: make(map[model.FileID]int64),
}
}

Expand Down Expand Up @@ -204,6 +206,9 @@ func (a *Assembler) prefetch() error {
a.assembleLinkFor = ptr.Of(a.index)
a.fileReadCloser = nil
a.Close()
if a.fileRanges[a.index].Length < 0 {
a.fileLengthCorrection[a.fileRanges[a.index].FileID] = a.fileOffset
}
a.index++
return nil
}
Expand Down Expand Up @@ -252,6 +257,9 @@ func (a *Assembler) prefetch() error {

a.assembleLinkFor = ptr.Of(a.index)
a.Close()
if a.fileRanges[a.index].Length < 0 {
a.fileLengthCorrection[a.fileRanges[a.index].FileID] = a.fileOffset + int64(n)
}
a.fileReadCloser = nil
a.index++

Expand Down
46 changes: 29 additions & 17 deletions pack/assembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ import (
func TestAssembler(t *testing.T) {
tmp := t.TempDir()
sizes := map[int]struct {
size int
encSize int
size int
encSize int
sizeUnknown bool
}{
0: {96, 297},
1: {97, 298},
1024: {1121, 1321},
1024 * 1024: {1048674, 1049296},
1024 * 1024 * 2: {2097436, 2098218},
1024*1024*2 + 1: {2097520, 2098235},
1024*1024*2 - 1: {2097434, 2098217},
1024 * 1024 * 10: {10486756, 10489586},
1024*1024*10 - 1: {10486755, 10489585},
1024*1024*10 + 1: {10486840, 10489603},
0: {96, 297, false},
1: {97, 298, false},
1024: {1121, 1321, false},
1024 * 1024: {1048674, 1049296, false},
1024 * 1024 * 2: {2097436, 2098218, false},
1024*1024*2 + 1: {2097520, 2098235, false},
1024*1024*2 - 1: {2097434, 2098217, false},
1024 * 1024 * 10: {10486756, 10489586, false},
1024*1024*10 - 1: {10486755, 10489585, false},
1024*1024*10 + 1: {10486840, 10489603, false},
2048: {2145, 2345, true},
}

ctx := context.Background()
Expand All @@ -45,21 +47,25 @@ func TestAssembler(t *testing.T) {
require.NoError(t, err)

var allFileRanges []model.FileRange
for size := range sizes {
for size, val := range sizes {
filename := fmt.Sprintf("%d.bin", size)
err := os.WriteFile(filepath.Join(tmp, filename), testutil.GenerateRandomBytes(size), 0644)
require.NoError(t, err)
stat, err := os.Stat(filepath.Join(tmp, filename))
require.NoError(t, err)
length := int64(size)
if val.sizeUnknown {
length = -1
}
allFileRanges = append(allFileRanges, model.FileRange{
ID: model.FileRangeID(size),
Offset: 0,
Length: int64(size),
Length: length,
FileID: model.FileID(size),
File: &model.File{
ID: model.FileID(size),
Path: filename,
Size: int64(size),
Size: length,
LastModifiedNano: stat.ModTime().UnixNano(),
}})
}
Expand All @@ -77,6 +83,12 @@ func TestAssembler(t *testing.T) {
require.Equal(t, expected.size, len(content))
validateCarContent(t, content)
validateAssembler(t, assembler)
if expected.sizeUnknown {
require.Greater(t, len(assembler.carBlocks), 0)
for _, corrected := range assembler.fileLengthCorrection {
require.Equal(t, corrected, int64(size))
}
}
})
}

Expand All @@ -88,7 +100,7 @@ func TestAssembler(t *testing.T) {
defer assembler.Close()
content, err := io.ReadAll(assembler)
require.NoError(t, err)
require.Equal(t, 38802198, len(content))
require.Equal(t, 38804284, len(content))
validateCarContent(t, content)
validateAssembler(t, assembler)
require.Greater(t, len(assembler.carBlocks), 0)
Expand All @@ -98,7 +110,7 @@ func TestAssembler(t *testing.T) {
defer assembler.Close()
content, err := io.ReadAll(assembler)
require.NoError(t, err)
require.Equal(t, 38802198, len(content))
require.Equal(t, 38804284, len(content))
validateCarContent(t, content)
validateAssembler(t, assembler)
require.Len(t, assembler.carBlocks, 0)
Expand Down
23 changes: 23 additions & 0 deletions pack/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,29 @@ func Pack(
JobID: &job.ID,
}

// Update all Files and FileRanges that have size == -1
for fileID, length := range assembler.fileLengthCorrection {
err := database.DoRetry(ctx, func() error {
return db.Model(&model.File{}).Where("id = ?", fileID).Update("size", length).Error
})
if err != nil {
return nil, errors.WithStack(err)
}
err = database.DoRetry(ctx, func() error {
return db.Model(&model.FileRange{}).Where("file_id = ?", fileID).Update("length", length).Error
})
if err != nil {
return nil, errors.WithStack(err)
}
}
for i := range job.FileRanges {
if job.FileRanges[i].Length == -1 {
job.FileRanges[i].Length = assembler.fileLengthCorrection[job.FileRanges[i].FileID]
job.FileRanges[i].File.Size = assembler.fileLengthCorrection[job.FileRanges[i].FileID]
logger.Warnw("correcting unknown file size", "path", job.FileRanges[i].File.Path, "length", job.FileRanges[i].Length)
}
}

// Update all FileRange and file CID that are not split
splitFileIDs := make(map[model.FileID]model.File)
var updatedFiles []model.File
Expand Down
Loading

0 comments on commit 9c76b91

Please sign in to comment.