From 4287ae07290af7b73bae9b494b2a6aa79c2474f9 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Wed, 1 Jul 2020 17:35:40 -0700 Subject: [PATCH 1/2] publish-*artifacts: merge implementations of putRelease / putNonRelease The `putRelease` / `putNonRelease` functionality of `publish-artifacts` and `publish-provisional-artifacts` are almost exactly the same. I've merged this to one implementation in the `release` package. This change should have no functional difference to what is being published / archived today. I've also added the ability to put multiple files into a release archive / to be sent as part of a non release. This is relevant for a future change, but getting that work bootstrapped here. Release note: None --- pkg/cmd/publish-artifacts/main.go | 175 ++--------- pkg/cmd/publish-provisional-artifacts/main.go | 174 ++-------- pkg/release/release.go | 297 +++++++++++++++++- 3 files changed, 334 insertions(+), 312 deletions(-) diff --git a/pkg/cmd/publish-artifacts/main.go b/pkg/cmd/publish-artifacts/main.go index 0446593d4709..c3525885e6ac 100644 --- a/pkg/cmd/publish-artifacts/main.go +++ b/pkg/cmd/publish-artifacts/main.go @@ -11,20 +11,14 @@ package main import ( - "archive/tar" - "archive/zip" "bytes" - "compress/gzip" "flag" "fmt" "go/build" - "io" "log" - "mime" "os" "os/exec" "path/filepath" - "regexp" "strings" "github.com/aws/aws-sdk-go/aws" @@ -56,21 +50,16 @@ var testableS3 = func() (s3putter, error) { return s3.New(sess), nil } -var osVersionRe = regexp.MustCompile(`\d+(\.\d+)*-`) - var isRelease = flag.Bool("release", false, "build in release mode instead of bleeding-edge mode") var destBucket = flag.String("bucket", "", "override default bucket") var ( - noCache = "no-cache" // TODO(tamird,benesch,bdarnell): make "latest" a website-redirect // rather than a full key. This means that the actual artifact will no // longer be named "-latest". latestStr = "latest" ) -const dotExe = ".exe" - func main() { flag.Parse() @@ -220,7 +209,7 @@ func buildArchive(svc s3putter, o opts) { Body: f, } if releaseVersionStr == latestStr { - putObjectInput.CacheControl = &noCache + putObjectInput.CacheControl = &release.NoCache } if _, err := svc.PutObject(&putObjectInput); err != nil { log.Fatalf("s3 upload %s: %s", absoluteSrcArchivePath, err) @@ -259,23 +248,12 @@ func buildOneCockroach(svc s3putter, o opts) { o.Base = "cockroach" + o.Suffix o.AbsolutePath = filepath.Join(o.PkgDir, o.Base) - { - var err error - o.Binary, err = os.Open(o.AbsolutePath) - - if err != nil { - log.Fatalf("os.Open(%s): %s", o.AbsolutePath, err) - } - } if !*isRelease { putNonRelease(svc, o) } else { putRelease(svc, o) } - if err := o.Binary.Close(); err != nil { - log.Fatal(err) - } } func buildOneWorkload(svc s3putter, o opts) { @@ -293,18 +271,7 @@ func buildOneWorkload(svc s3putter, o opts) { o.Base = "workload" o.AbsolutePath = filepath.Join(o.PkgDir, "bin", o.Base) - { - var err error - o.Binary, err = os.Open(o.AbsolutePath) - - if err != nil { - log.Fatalf("os.Open(%s): %s", o.AbsolutePath, err) - } - } putNonRelease(svc, o) - if err := o.Binary.Close(); err != nil { - log.Fatal(err) - } } type opts struct { @@ -319,136 +286,36 @@ type opts struct { Base string BucketName string - Binary *os.File AbsolutePath string PkgDir string } -// TrimDotExe trims '.exe. from `name` and returns the result (and whether any -// trimming has occurred). -func TrimDotExe(name string) (string, bool) { - return strings.TrimSuffix(name, dotExe), strings.HasSuffix(name, dotExe) -} - func putNonRelease(svc s3putter, o opts) { - const repoName = "cockroach" - remoteName, hasExe := TrimDotExe(o.Base) - // TODO(tamird): do we want to keep doing this? No longer - // doing so requires updating cockroachlabs/production, and - // possibly cockroachdb/cockroach-go. - remoteName = osVersionRe.ReplaceAllLiteralString(remoteName, "") - - fileName := fmt.Sprintf("%s.%s", remoteName, o.VersionStr) - if hasExe { - fileName += ".exe" - } - disposition := mime.FormatMediaType("attachment", map[string]string{"filename": fileName}) - - // NB: The leading slash is required to make redirects work - // correctly since we reuse this key as the redirect location. - versionKey := fmt.Sprintf("/%s/%s", repoName, fileName) - if _, err := svc.PutObject(&s3.PutObjectInput{ - Bucket: &o.BucketName, - ContentDisposition: &disposition, - Key: &versionKey, - Body: o.Binary, - }); err != nil { - log.Fatalf("s3 upload %s: %s", o.AbsolutePath, err) - } - latestSuffix := o.Branch - if latestSuffix == "master" { - latestSuffix = "LATEST" - } - latestKey := fmt.Sprintf("%s/%s.%s", repoName, remoteName, latestSuffix) - if _, err := svc.PutObject(&s3.PutObjectInput{ - Bucket: &o.BucketName, - CacheControl: &noCache, - Key: &latestKey, - WebsiteRedirectLocation: &versionKey, - }); err != nil { - log.Fatalf("s3 redirect to %s: %s", versionKey, err) + files := []release.NonReleaseFile{ + release.MakeCRDBBinaryNonReleaseFile(o.Base, o.AbsolutePath, o.VersionStr), } + release.PutNonRelease( + svc, + release.PutNonReleaseOptions{ + Branch: o.Branch, + BucketName: o.BucketName, + Files: files, + }, + ) } func putRelease(svc s3putter, o opts) { - targetSuffix, hasExe := TrimDotExe(o.Suffix) - // TODO(tamird): remove this weirdness. Requires updating - // "users" e.g. docs, cockroachdb/cockroach-go, maybe others. - if strings.Contains(o.BuildType, "linux") { - targetSuffix = strings.Replace(targetSuffix, "gnu-", "", -1) - targetSuffix = osVersionRe.ReplaceAllLiteralString(targetSuffix, "") - } - - // Stat the binary. Info is needed for archive headers. - binaryInfo, err := o.Binary.Stat() - if err != nil { - log.Fatal(err) - } - for _, releaseVersionStr := range o.ReleaseVersionStrs { - archiveBase := fmt.Sprintf("cockroach-%s", releaseVersionStr) - targetArchiveBase := archiveBase + targetSuffix - var targetArchive string - var body bytes.Buffer - if hasExe { - targetArchive = targetArchiveBase + ".zip" - zw := zip.NewWriter(&body) - - // Set the zip header from the file info. Overwrite name. - zipHeader, err := zip.FileInfoHeader(binaryInfo) - if err != nil { - log.Fatal(err) - } - zipHeader.Name = filepath.Join(targetArchiveBase, "cockroach.exe") - - zfw, err := zw.CreateHeader(zipHeader) - if err != nil { - log.Fatal(err) - } - if _, err := io.Copy(zfw, o.Binary); err != nil { - log.Fatal(err) - } - if err := zw.Close(); err != nil { - log.Fatal(err) - } - } else { - targetArchive = targetArchiveBase + ".tgz" - gzw := gzip.NewWriter(&body) - tw := tar.NewWriter(gzw) - - // Set the tar header from the file info. Overwrite name. - tarHeader, err := tar.FileInfoHeader(binaryInfo, "") - if err != nil { - log.Fatal(err) - } - tarHeader.Name = filepath.Join(targetArchiveBase, "cockroach") - if err := tw.WriteHeader(tarHeader); err != nil { - log.Fatal(err) - } - - if _, err := io.Copy(tw, o.Binary); err != nil { - log.Fatal(err) - } - if err := tw.Close(); err != nil { - log.Fatal(err) - } - if err := gzw.Close(); err != nil { - log.Fatal(err) - } - } - if _, err := o.Binary.Seek(0, 0); err != nil { - log.Fatal(err) - } - putObjectInput := s3.PutObjectInput{ - Bucket: &o.BucketName, - Key: &targetArchive, - Body: bytes.NewReader(body.Bytes()), - } - if releaseVersionStr == latestStr { - putObjectInput.CacheControl = &noCache - } - if _, err := svc.PutObject(&putObjectInput); err != nil { - log.Fatalf("s3 upload %s: %s", targetArchive, err) + files := []release.ArchiveFile{ + release.MakeCRDBBinaryArchiveFile(o.Base, o.AbsolutePath), } + release.PutRelease(svc, release.PutReleaseOptions{ + BucketName: o.BucketName, + NoCache: releaseVersionStr == latestStr, + Suffix: o.Suffix, + BuildType: o.BuildType, + VersionStr: releaseVersionStr, + Files: files, + }) } } diff --git a/pkg/cmd/publish-provisional-artifacts/main.go b/pkg/cmd/publish-provisional-artifacts/main.go index 65dae031cb0c..0fc5f061de07 100644 --- a/pkg/cmd/publish-provisional-artifacts/main.go +++ b/pkg/cmd/publish-provisional-artifacts/main.go @@ -11,16 +11,12 @@ package main import ( - "archive/tar" - "archive/zip" "bytes" - "compress/gzip" "flag" "fmt" "go/build" "io" "log" - "mime" "os" "os/exec" "path/filepath" @@ -58,23 +54,18 @@ func makeS3() (s3I, error) { return s3.New(sess), nil } -var osVersionRe = regexp.MustCompile(`\d+(\.\d+)*-`) - var isReleaseF = flag.Bool("release", false, "build in release mode instead of bleeding-edge mode") var destBucket = flag.String("bucket", "", "override default bucket") var doProvisionalF = flag.Bool("provisional", false, "publish provisional binaries") var doBlessF = flag.Bool("bless", false, "bless provisional binaries") var ( - noCache = "no-cache" // TODO(tamird,benesch,bdarnell): make "latest" a website-redirect // rather than a full key. This means that the actual artifact will no // longer be named "-latest". latestStr = "latest" ) -const dotExe = ".exe" - func main() { flag.Parse() log.SetFlags(log.LstdFlags | log.Lshortfile) @@ -218,18 +209,10 @@ func run(svc s3I, execFn release.ExecFn, flags runFlags) { for _, o := range cockroachBuildOpts { buildCockroach(execFn, flags, o) - absolutePath := filepath.Join(o.PkgDir, o.Base) - binary, err := os.Open(absolutePath) - if err != nil { - log.Fatalf("os.Open(%s): %s", absolutePath, err) - } if !flags.isRelease { - putNonRelease(svc, o, binary) + putNonRelease(svc, o) } else { - putRelease(svc, o, binary) - } - if err := binary.Close(); err != nil { - log.Fatal(err) + putRelease(svc, o) } } if flags.isRelease { @@ -332,69 +315,23 @@ type opts struct { PkgDir string } -// TrimDotExe trims '.exe. from `name` and returns the result (and whether any -// trimming has occurred). -func TrimDotExe(name string) (string, bool) { - return strings.TrimSuffix(name, dotExe), strings.HasSuffix(name, dotExe) -} - -func putNonRelease(svc s3I, o opts, binary io.ReadSeeker) { - const repoName = "cockroach" - remoteName, hasExe := TrimDotExe(o.Base) - // TODO(tamird): do we want to keep doing this? No longer - // doing so requires updating cockroachlabs/production, and - // possibly cockroachdb/cockroach-go. - remoteName = osVersionRe.ReplaceAllLiteralString(remoteName, "") - - fileName := fmt.Sprintf("%s.%s", remoteName, o.VersionStr) - if hasExe { - fileName += ".exe" - } - disposition := mime.FormatMediaType("attachment", map[string]string{"filename": fileName}) - - // NB: The leading slash is required to make redirects work - // correctly since we reuse this key as the redirect location. - versionKey := fmt.Sprintf("/%s/%s", repoName, fileName) - log.Printf("Uploading to s3://%s%s", o.BucketName, versionKey) - if _, err := svc.PutObject(&s3.PutObjectInput{ - Bucket: &o.BucketName, - ContentDisposition: &disposition, - Key: &versionKey, - Body: binary, - }); err != nil { - log.Fatalf("s3 upload %s: %s", versionKey, err) - } - latestSuffix := o.Branch - if latestSuffix == "master" { - latestSuffix = "LATEST" - } - latestKey := fmt.Sprintf("%s/%s.%s", repoName, remoteName, latestSuffix) - log.Printf("Uploading to s3://%s/%s", o.BucketName, latestKey) - if _, err := svc.PutObject(&s3.PutObjectInput{ - Bucket: &o.BucketName, - CacheControl: &noCache, - Key: &latestKey, - WebsiteRedirectLocation: &versionKey, - }); err != nil { - log.Fatalf("s3 redirect to %s: %s", versionKey, err) - } +func putNonRelease(svc s3I, o opts) { + crdbPath := filepath.Join(o.PkgDir, o.Base) + files := []release.NonReleaseFile{ + release.MakeCRDBBinaryNonReleaseFile(o.Base, crdbPath, o.VersionStr), + } + release.PutNonRelease( + svc, + release.PutNonReleaseOptions{ + Branch: o.Branch, + BucketName: o.BucketName, + Files: files, + }, + ) } func s3KeyRelease(o opts) (string, string) { - targetSuffix, hasExe := TrimDotExe(o.Suffix) - // TODO(tamird): remove this weirdness. Requires updating - // "users" e.g. docs, cockroachdb/cockroach-go, maybe others. - if strings.Contains(o.BuildType, "linux") { - targetSuffix = strings.Replace(targetSuffix, "gnu-", "", -1) - targetSuffix = osVersionRe.ReplaceAllLiteralString(targetSuffix, "") - } - - archiveBase := fmt.Sprintf("cockroach-%s", o.VersionStr) - targetArchiveBase := archiveBase + targetSuffix - if hasExe { - return targetArchiveBase, targetArchiveBase + ".zip" - } - return targetArchiveBase, targetArchiveBase + ".tgz" + return release.S3KeyRelease(o.Suffix, o.BuildType, o.VersionStr) } func s3KeyArchive(o opts) (string, string) { @@ -403,68 +340,19 @@ func s3KeyArchive(o opts) (string, string) { return archiveBase, srcArchive } -func putRelease(svc s3I, o opts, binary *os.File) { - // Stat the binary. Info is needed for archive headers. - binaryInfo, err := binary.Stat() - if err != nil { - log.Fatal(err) - } - - targetArchiveBase, targetArchive := s3KeyRelease(o) - var body bytes.Buffer - if _, hasExe := TrimDotExe(o.Suffix); hasExe { - zw := zip.NewWriter(&body) - - // Set the zip header from the file info. Overwrite name. - zipHeader, err := zip.FileInfoHeader(binaryInfo) - if err != nil { - log.Fatal(err) - } - zipHeader.Name = filepath.Join(targetArchiveBase, "cockroach.exe") - - zfw, err := zw.CreateHeader(zipHeader) - if err != nil { - log.Fatal(err) - } - if _, err := io.Copy(zfw, binary); err != nil { - log.Fatal(err) - } - if err := zw.Close(); err != nil { - log.Fatal(err) - } - } else { - gzw := gzip.NewWriter(&body) - tw := tar.NewWriter(gzw) - - // Set the tar header from the file info. Overwrite name. - tarHeader, err := tar.FileInfoHeader(binaryInfo, "") - if err != nil { - log.Fatal(err) - } - tarHeader.Name = filepath.Join(targetArchiveBase, "cockroach") - if err := tw.WriteHeader(tarHeader); err != nil { - log.Fatal(err) - } - - if _, err := io.Copy(tw, binary); err != nil { - log.Fatal(err) - } - if err := tw.Close(); err != nil { - log.Fatal(err) - } - if err := gzw.Close(); err != nil { - log.Fatal(err) - } - } - log.Printf("Uploading to s3://%s/%s", o.BucketName, targetArchive) - putObjectInput := s3.PutObjectInput{ - Bucket: &o.BucketName, - Key: &targetArchive, - Body: bytes.NewReader(body.Bytes()), - } - if _, err := svc.PutObject(&putObjectInput); err != nil { - log.Fatalf("s3 upload %s: %s", targetArchive, err) - } +func putRelease(svc s3I, o opts) { + crdbPath := filepath.Join(o.PkgDir, o.Base) + files := []release.ArchiveFile{ + release.MakeCRDBBinaryArchiveFile(o.Base, crdbPath), + } + release.PutRelease(svc, release.PutReleaseOptions{ + BucketName: o.BucketName, + NoCache: false, + Suffix: o.Suffix, + BuildType: o.BuildType, + VersionStr: o.VersionStr, + Files: files, + }) } func markLatestRelease(svc s3I, o opts) { @@ -491,7 +379,7 @@ func markLatestRelease(svc s3I, o opts) { Bucket: &o.BucketName, Key: &keyLatest, Body: bytes.NewReader(buf.Bytes()), - CacheControl: &noCache, + CacheControl: &release.NoCache, } if _, err := svc.PutObject(&putObjectInput); err != nil { log.Fatalf("s3 upload %s: %s", keyLatest, err) @@ -523,7 +411,7 @@ func markLatestArchive(svc s3I, o opts) { Bucket: &o.BucketName, Key: &keyLatest, Body: bytes.NewReader(buf.Bytes()), - CacheControl: &noCache, + CacheControl: &release.NoCache, } if _, err := svc.PutObject(&putObjectInput); err != nil { log.Fatalf("s3 upload %s: %s", keyLatest, err) diff --git a/pkg/release/release.go b/pkg/release/release.go index e9d14e0f27e6..685e4a941a9c 100644 --- a/pkg/release/release.go +++ b/pkg/release/release.go @@ -13,33 +13,47 @@ package release import ( + "archive/tar" + "archive/zip" "bufio" "bytes" + "compress/gzip" "fmt" "io" "log" + "mime" "os" "os/exec" + "path/filepath" "regexp" "strings" + "github.com/aws/aws-sdk-go/service/s3" "github.com/cockroachdb/errors" ) -// linuxStaticLibsRe returns the regexp of all static libraries. -var linuxStaticLibsRe = func() *regexp.Regexp { - libs := strings.Join([]string{ - regexp.QuoteMeta("linux-vdso.so."), - regexp.QuoteMeta("librt.so."), - regexp.QuoteMeta("libpthread.so."), - regexp.QuoteMeta("libdl.so."), - regexp.QuoteMeta("libm.so."), - regexp.QuoteMeta("libc.so."), - regexp.QuoteMeta("libresolv.so."), - strings.Replace(regexp.QuoteMeta("ld-linux-ARCH.so."), "ARCH", ".*", -1), - }, "|") - return regexp.MustCompile(libs) -}() +var ( + // linuxStaticLibsRe returns the regexp of all static libraries. + linuxStaticLibsRe = func() *regexp.Regexp { + libs := strings.Join([]string{ + regexp.QuoteMeta("linux-vdso.so."), + regexp.QuoteMeta("librt.so."), + regexp.QuoteMeta("libpthread.so."), + regexp.QuoteMeta("libdl.so."), + regexp.QuoteMeta("libm.so."), + regexp.QuoteMeta("libc.so."), + regexp.QuoteMeta("libresolv.so."), + strings.Replace(regexp.QuoteMeta("ld-linux-ARCH.so."), "ARCH", ".*", -1), + }, "|") + return regexp.MustCompile(libs) + }() + osVersionRe = regexp.MustCompile(`\d+(\.\d+)*-`) +) + +var ( + // NoCache is a string constant to send no-cache to AWS. + NoCache = "no-cache" +) // SupportedTarget contains metadata about a supported target. type SupportedTarget struct { @@ -103,7 +117,7 @@ func MakeWorkload(pkgDir string) error { return cmd.Run() } -// MakeRelease makes the release binary. +// MakeRelease makes the release binary and associated files. func MakeRelease(b SupportedTarget, pkgDir string, opts ...MakeReleaseOption) error { params := makeReleaseAndVerifyOptions{ execFn: DefaultExecFn, @@ -156,3 +170,256 @@ func MakeRelease(b SupportedTarget, pkgDir string, opts ...MakeReleaseOption) er } return nil } + +// TrimDotExe trims '.exe. from `name` and returns the result (and whether any +// trimming has occurred). +func TrimDotExe(name string) (string, bool) { + const dotExe = ".exe" + return strings.TrimSuffix(name, dotExe), strings.HasSuffix(name, dotExe) +} + +// S3Putter is an interface allowing uploads to S3. +type S3Putter interface { + PutObject(*s3.PutObjectInput) (*s3.PutObjectOutput, error) +} + +// S3KeyRelease extracts the target archive base and archive +// name for the given parameters. +func S3KeyRelease(suffix string, buildType string, versionStr string) (string, string) { + targetSuffix, hasExe := TrimDotExe(suffix) + // TODO(tamird): remove this weirdness. Requires updating + // "users" e.g. docs, cockroachdb/cockroach-go, maybe others. + if strings.Contains(buildType, "linux") { + targetSuffix = strings.Replace(targetSuffix, "gnu-", "", -1) + targetSuffix = osVersionRe.ReplaceAllLiteralString(targetSuffix, "") + } + + archiveBase := fmt.Sprintf("cockroach-%s", versionStr) + targetArchiveBase := archiveBase + targetSuffix + if hasExe { + return targetArchiveBase, targetArchiveBase + ".zip" + } + return targetArchiveBase, targetArchiveBase + ".tgz" +} + +// NonReleaseFile is a file to upload when publishing a non-release. +type NonReleaseFile struct { + // S3FileName is the name of the file stored in S3. + S3FileName string + // S3FilePath is the path the file should be stored within the Cockroach bucket. + S3FilePath string + // S3RedirectPathPrefix is the prefix of the path that redirects to the S3FilePath. + // It is suffixed with .VersionStr or .LATEST, depending on whether the branch is + // the master branch. + S3RedirectPathPrefix string + + // LocalAbsolutePath is the location of the file to upload in the local OS. + LocalAbsolutePath string +} + +// MakeCRDBBinaryNonReleaseFile creates a NonRelease object for the +// CRDB binary. +func MakeCRDBBinaryNonReleaseFile( + base string, localAbsolutePath string, versionStr string, +) NonReleaseFile { + remoteName, hasExe := TrimDotExe(base) + // TODO(tamird): do we want to keep doing this? No longer + // doing so requires updating cockroachlabs/production, and + // possibly cockroachdb/cockroach-go. + remoteName = osVersionRe.ReplaceAllLiteralString(remoteName, "") + + fileName := fmt.Sprintf("%s.%s", remoteName, versionStr) + if hasExe { + fileName += ".exe" + } + + return NonReleaseFile{ + S3FileName: fileName, + S3FilePath: fileName, + S3RedirectPathPrefix: remoteName, + LocalAbsolutePath: localAbsolutePath, + } +} + +// PutNonReleaseOptions are options to pass into PutNonRelease. +type PutNonReleaseOptions struct { + // Branch is the branch from which the release is being uploaded from. + Branch string + // BucketName is the bucket to upload the files to. + BucketName string + + // Files are all the files to be uploaded into S3. + Files []NonReleaseFile +} + +// PutNonRelease uploads non-release related files to S3. +func PutNonRelease(svc S3Putter, o PutNonReleaseOptions) { + const repoName = "cockroach" + for _, f := range o.Files { + disposition := mime.FormatMediaType("attachment", map[string]string{ + "filename": f.S3FileName, + }) + + fileToUpload, err := os.Open(f.LocalAbsolutePath) + if err != nil { + log.Fatalf("failed to open %s: %s", f.LocalAbsolutePath, err) + } + defer func() { + _ = fileToUpload.Close() + }() + + // NB: The leading slash is required to make redirects work + // correctly since we reuse this key as the redirect location. + versionKey := fmt.Sprintf("/%s/%s", repoName, f.S3FilePath) + log.Printf("Uploading to s3://%s%s", o.BucketName, versionKey) + if _, err := svc.PutObject(&s3.PutObjectInput{ + Bucket: &o.BucketName, + ContentDisposition: &disposition, + Key: &versionKey, + Body: fileToUpload, + }); err != nil { + log.Fatalf("s3 upload %s: %s", versionKey, err) + } + + latestSuffix := o.Branch + if latestSuffix == "master" { + latestSuffix = "LATEST" + } + latestKey := fmt.Sprintf("%s/%s.%s", repoName, f.S3RedirectPathPrefix, latestSuffix) + if _, err := svc.PutObject(&s3.PutObjectInput{ + Bucket: &o.BucketName, + CacheControl: &NoCache, + Key: &latestKey, + WebsiteRedirectLocation: &versionKey, + }); err != nil { + log.Fatalf("s3 redirect to %s: %s", versionKey, err) + } + } +} + +// ArchiveFile is a file to store in the a archive for a release. +type ArchiveFile struct { + // LocalAbsolutePath is the location of the file to upload include in the archive on the local OS. + LocalAbsolutePath string + // ArchiveFilePath is the location of the file within the archive in which the file is to be stored. + ArchiveFilePath string +} + +// MakeCRDBBinaryArchiveFile generates the ArchiveFile object for a CRDB binary. +func MakeCRDBBinaryArchiveFile(base string, localAbsolutePath string) ArchiveFile { + _, hasExe := TrimDotExe(base) + path := "cockroach" + if hasExe { + path += ".exe" + } + return ArchiveFile{ + LocalAbsolutePath: localAbsolutePath, + ArchiveFilePath: path, + } +} + +// PutReleaseOptions are options to for the PutRelease function. +type PutReleaseOptions struct { + // BucketName is the bucket to upload the files to. + BucketName string + // NoCache is true if we should set the NoCache option to S3. + NoCache bool + // Suffix is the suffix of the main CRDB binary. + Suffix string + // BuildType is the build type of the release. + BuildType string + // VersionStr is the version (SHA/branch name) of the release. + VersionStr string + + // Files are all the files to be included in the archive. + Files []ArchiveFile +} + +// PutRelease uploads a compressed archive containing the release +// files to S3. +func PutRelease(svc S3Putter, o PutReleaseOptions) { + targetArchiveBase, targetArchive := S3KeyRelease(o.Suffix, o.BuildType, o.VersionStr) + var body bytes.Buffer + + if strings.HasSuffix(targetArchive, ".zip") { + zw := zip.NewWriter(&body) + + for _, f := range o.Files { + file, err := os.Open(f.LocalAbsolutePath) + if err != nil { + log.Fatalf("failed to open file: %s", f.LocalAbsolutePath) + } + defer func() { _ = file.Close() }() + + stat, err := file.Stat() + if err != nil { + log.Fatalf("failed to stat: %s", f.LocalAbsolutePath) + } + + zipHeader, err := zip.FileInfoHeader(stat) + if err != nil { + log.Fatal(err) + } + zipHeader.Name = filepath.Join(targetArchiveBase, f.ArchiveFilePath) + + zfw, err := zw.CreateHeader(zipHeader) + if err != nil { + log.Fatal(err) + } + if _, err := io.Copy(zfw, file); err != nil { + log.Fatal(err) + } + } + if err := zw.Close(); err != nil { + log.Fatal(err) + } + } else { + gzw := gzip.NewWriter(&body) + for _, f := range o.Files { + tw := tar.NewWriter(gzw) + + file, err := os.Open(f.LocalAbsolutePath) + if err != nil { + log.Fatalf("failed to open file: %s", f.LocalAbsolutePath) + } + defer func() { _ = file.Close() }() + + stat, err := file.Stat() + if err != nil { + log.Fatalf("failed to stat: %s", f.LocalAbsolutePath) + } + + // Set the tar header from the file info. Overwrite name. + tarHeader, err := tar.FileInfoHeader(stat, "") + if err != nil { + log.Fatal(err) + } + tarHeader.Name = filepath.Join(targetArchiveBase, f.ArchiveFilePath) + if err := tw.WriteHeader(tarHeader); err != nil { + log.Fatal(err) + } + + if _, err := io.Copy(tw, file); err != nil { + log.Fatal(err) + } + if err := tw.Close(); err != nil { + log.Fatal(err) + } + } + if err := gzw.Close(); err != nil { + log.Fatal(err) + } + } + + putObjectInput := s3.PutObjectInput{ + Bucket: &o.BucketName, + Key: &targetArchive, + Body: bytes.NewReader(body.Bytes()), + } + if o.NoCache { + putObjectInput.CacheControl = &NoCache + } + if _, err := svc.PutObject(&putObjectInput); err != nil { + log.Fatalf("s3 upload %s: %s", targetArchive, err) + } +} From b9fb65782da116c022717950a60d4d620a88eb34 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 17 Jun 2020 20:43:18 -0400 Subject: [PATCH 2/2] docs/RFCS: first draft of a sqlliveness RFC There's been a lot of back-and-forth over what we're trying to build and why. Hopefully an RFC can help us to consolidate on terminology, goals, and implementation details. Release note: None --- docs/RFCS/20200615_sql_liveness.md | 571 +++++++++++++++++++++++++++++ 1 file changed, 571 insertions(+) create mode 100644 docs/RFCS/20200615_sql_liveness.md diff --git a/docs/RFCS/20200615_sql_liveness.md b/docs/RFCS/20200615_sql_liveness.md new file mode 100644 index 000000000000..4fdae751e706 --- /dev/null +++ b/docs/RFCS/20200615_sql_liveness.md @@ -0,0 +1,571 @@ +- Feature Name: SQL Liveness +- Status: draft +- Start Date: 2020-06-15 +- Authors: Andrew Werner, Spas Bojanov +- RFC PR: [#50377](https://github.com/cockroachdb/cockroach/pull/50377) +- Cockroach Issue: [#47892](https://github.com/cockroachdb/cockroach/issues/47892) + +# Summary + +As Cockroach moves towards a multi-tenant world, we need a mechanism to +associate resources at the SQL level with tenant SQL processes. This +RFC introduces a registry of live SQL instances for the purposes of +tenant-scoped resource leasing and discovery. + +# Motivation + +Today some amount of resource leasing (jobs) is associated with +node-liveness while other resource leasing explicitly heartbeats (sql +table leases) KV rows with references to node IDs. Both the +node ID and node-liveness will be unavailable to SQL tenant processes. + +The driving motivation for this work is the usage of node-liveness +epochs as the basis for the best-effort locks in the jobs registry. +Jobs are a property of an individual SQL tenant rather than of a KV +deployment. + +An additional goal of this effort is to strengthen the +semantics of job leases to ensure that only the most recent successful +session which has adopted a job can update that jobs state as today +there is little that precludes concurrent resumptions of the same job +from overwriting each other's state. + +# Guide-level explanation + +The design of SQL liveness is inspired by cockroach's NodeLiveness layer, +though differs in its implementation due to its differing constraints and +implementation environment. SQL Liveness does not need to worry about the +bootstrapping problems involved in creating range leases (see +[Range Leases RFC](../20160210_range_leases.md)). + +The goals of SQL liveness is to build a mechanism for causality propagating +notion of instance liveness for the purpose of coordinating access to resources. +This use maps directly to the use of node liveness to drive epoch-based +range leases. We would like to utilize SQL liveness to create leases +which do not need to be independently heartbeated to remain active. + +Today we heartbeat SQL table descriptor leases. In the upcoming release we will +be leasing database, schema, and type descriptors. A single table may, in fact, +require leasing hundreds of objects (if it utilizes hundreds of types). While +this example is certainly far-fetched as the only user-defined type which will +be available in 20.2 are enums, it is certainly believable that new user defined +types and functions can lead to applications schemas which involve on the order +of 1000 descriptors without getting too contrived. If these resources could be +leased without O(descriptors) periodic coordination, that could be a big win, +especially on rate-limited tenant pods. + +The addition of this layer of indirection is primarily a scalability +optimization but additionally confers lifetime clarity on distributed resources +throughout the SQL layer of cockroach. Over time we anticipate needing more, +rather than less distributed coordination and expecting each such case to +implement its own leasing protocol is madness. + +A non-goal of this proposal is to provide discovery of SQL instances and their +metadata. This use case is likely important as we discuss exposing +DistSQL to tenants. We highlight this non-goal as it has been discussed in the +context of this project. We will, however, at time discuss how such metadata +could be propagated as it will help correlate sessions presented here with +the lifecycle of tenant processes in the surrounding infrastructure. + +## Key Terms + + * `SQL Instance` - An instance is a (unix) process running a SQL server. An + instance is bound to a single SQL tenant. Instances have at various times been + referred to as a SQL pod, or a SQL node. An instance will have a + `SQLInstanceID` which is guaranteed to be unique within the tenant cluster + while the `SQL Instance` exists. + * `Session` - A session is defined by a unique, opaque `SessionID`. A session + will carry a `SQLInstanceID` in order to aid in understandability and +debugging. It is conceivable that a later iteration of this subsystem may allow +sessions with other metadata and no `SQLInstanceID`. + * `SessionID` - A unique identifier for a `Session`. The ID is opaque, + serialized bytes. `SessionID` are client generated and should be generated in + such a way that ensures their uniqueness. For the rest of this proposal, we'll + assume that `SessionID`s are generated as random `UUID`s. An instance will have + at most one live `Session` at a time. + * `Record` - The durable state of a `Session`. A `Record` contains an + expiration timestamp. The expiration timestamp is updated periodically by the +`Session` to maintain its liveness. + * `Claim` - A resource which refers to a session. A claim can be viewed +as a lock which has a lifetime tied to the session. In order to ensure +that a claim remains valid, the session must either be observed by +transactions which utilize the claimed resource, or the transaction must +commit before a known expiration timestamp for the session (perhaps by +utilizing the `*kv.Txn`'s commit deadline functionality which has serious +room for improvement), or the utilizing subsystem must make sure to observe +sessions when creating or deleting claims and must observe claims in their +transaction (this is discussed at length below). + +## Interface + +```go + +package sqlliveness + + +// SQLInstance represents a SQL tenant server instance. +type SQLInstance interface { + InstanceID() base.SQLInstanceID + Session(context.Context) (Session, error) +} + +// SessionID represents an opaque identifier for a session. This ID is +// client generated but should be globally unique. +type SessionID []byte + +// Session represents a session that was at some point believed to be alive. +type Session interface { + ID() SessionID + InstanceID() base.SQLInstanceID + + // Expiration is the current expiration value for this Session. After the + // Session finishes, this function will return a zero-value timestamp. + // Transactions run by this Instance which ensure that they commit before + // this time will be assured that any resources claimed under this session + // are known to be valid. + // + // See discussion in Open Questions. + Expiration{} hlc.Timestamp + + // Done returns a channel which is closed when this Session is known to no + // longer be live. + Done() <-chan struct{} + + // Observe can be used to transactionally ensure that + // the current session is not finished. If the transaction + // commits, any resources claimed under this session + // are known to be valid. + // + // See discussion in Open Questions about whether this + // is a good idea. + Observe(context.Context, *kv.Txn) error +} + +// Storage abstracts over the set of sessions in the cluster. +type Storage interface { + + // IsAlive is used to query the liveness of a Session. + IsAlive(context.Context, sessionID SessionID) (alive bool, err error) +} +``` + +Above find something of a straw-man interface to the `sqlliveness` +subsystem. Key is the idea that an `SQLInstance` may have multiple +`Session`s over its lifetime and that the sessions may end. Further +note that a transaction may `Observe` a session. This API provides +a mechanism by which a transaction can ensure that if it commits, the +observed session exists. + +# Reference-level explanation + +The creation and maintenance of session liveness is entrusted to +the `SQLInstance` (TBD whether the construction of the current `Session` +should be done lazily or eagerly). Each SQL server will have a handle +to an instance. The `SQLInstance.Session()` method will block until a +`Session` has been durably created (straw man). When the `Session` fails to +heartbeat its record and thus extend its expiration due to the detection of a +deletion it will close its `Done` channel. Resources claimed by that session +will no longer be held. Operations which need to ensure the validity of claims +for correctness + +## State + +This RFC proposes that liveness session state live in a new system +table, `system.sqlliveness`. Each row in this table is a `Record` as defined by +the key terms. The table will initially have the following schema: + +```sql +CREATE TABLE system.sqlliveness ( + -- TODO(ajwerner): Should there be a check constraint on some structure? + -- See open questions. + session_id BYTES PRIMARY KEY, + + -- expiration is the hlc.Timestamp before which nobody else will delete + -- this row. + expiration DECIMAL NOT NULL +); +``` + +## Liveness, expirations, and removal + +The `sqlliveness` subsystem maintains a set of sessions. Sessions +may be `live`, `expired`, or `done`. An `expired` session only +differs from a `live` session in that it can be moved to `done` by +another `SQLInstance`. A `Session` may be successfully observed even if +the current expiration has already passed. + +In this straw man, each instance's `Storage` implementation will periodically +poll all of the instances and remove entries for `Session`s which are +expired. This background bumping may not be ideal for rapid resource +reclaimation. Resource reclamation is left to the individual subsystems which +serialize the `SessionID` as a claim. + +### Storage implementation + +A straw-man implementation could look like the following. It will fetch +session status on-demand and cache that status. In a second iteration +it might update more rapidly with the use of a rangefeed whereby newly created +sessions can be added to the cache and and deletions can rapidly populate the +deleted sessions cache. + +```go +package slstorage + +// Storage implements sqlliveness.Storage. +type Storage struct { + db *kv.DB + ie *sql.InternalExecutor + g singleflight.Group + mu struct { + syncutil.Mutex + deadSessions *lru.ARCCache + liveSessions *lru.ARCCache + } +} + +// IsAlive returns whether the query session is currently alive. It may return +// true for a session which is no longer alive but will never return false for +// a session which is alive. +func (s *Storage) IsAlive( + ctx context.Context, sessionID sqlliveness.SessionID, +) (alive bool, err error) { + s.mu.Lock() + if _, ok := s.deadSessions.Get(sessionID); ok { + s.mu.Unlock() + return false, nil + } + if expiration, ok := s.liveSessions.Get(sessionID); ok { + expiration := expiration.(hlc.Timestamp) + if db.Clock().Now().Less(expiration) { + s.mu.Unlock() + return true, nil + } + s.liveSessions.Remove(sessionID) + } + // Launch singleflight to go read from the database. If it is found, we + // can add it and its expiration to the liveSessions cache. If it isn't + // found, we know it's dead and we can add that to the deadSessions cache. + resChan, _ := s.g.DoChan(string(sessionID), func() (interface{}, error) { + // store the result underneath the singleflight to avoid the need + // for additional synchronization. + live, expiration, err := s.fetchSession(ctx, sessionID) + if err != nil { + return nil, err + } + s.mu.Lock() + defer s.mu.Unlock() + if live { + s.mu.liveSessions.Add(sessionID, expiration) + } else { + s.mu.deadSessions.Add(sessionID, nil) + } + return live, nil + }) + s.mu.Unlock() + res := <-resChan + if res.Err != nil { + return false, err + } + return res.Val.(bool), nil +} + +func (s *Storage) fetchSession( + ctx context.Context, sessionID []byte, +) (live bool, expiration hlc.Timestamp, err error) { + // ... +} +``` + +## Use in `jobs` + +See [#47892](https://github.com/cockroachdb/cockroach/issues/47892). + +### Some background on what's there today + +Today jobs are "leased" by the `Registry` by writing a `NodeID` into a field +in the job's `Payload`. This forms something of a best-effort +prevention of jobs being adopted by more than one node. The `Registry` +periodically polls the set of jobs and checks whether the `NodeID` for +a job's lease is live, if not, it considers the job as available for +adoption. Furthermore, the `Registry` periodically checks if it itself +is alive and, if not, it cancels the jobs it is currently running. It +remains wholly possible for a node running a `Registry` to become not +live for a short period of time, during which time another registry +adopts the job, and begins to run concurrently, and then, by the time +that the `Registry` goes to check whether it should cancel its jobs, +finds that it is again live and merrily keeps them running. + +It's shocking the extent to which the `Registry`/`Job` subsystem does +not validate that it is, in fact, the current leaseholder node when +updating job state. I (Andrew Werner) feel relatively confident that +this has caused bugs in the past. It is worth noting that the original RFC +seemed more careful about ensuring leases remained valid but alas, today's +are not such. + +### Overview + +The basic idea is that we'll transition to running jobs only on behalf of a +`Session`. The `Registry` will have a handle to a `sqlliveness.SQLInstance` and +will hold on to its current `Session`. When the `Session` is `Done`, the +`Registry` will cancel execution of all jobs claimed under that `Session`. + +When adopting a job, a registry will do so using its current active session +and will serialize the session into the job record. + +### Jobs table changes + +The jobs table will be augmented to include two new columns: + +```sql +ALTER TABLE system.jobs ADD COLUMN claim_session_id BYTES FAMILY claim, + ADD COLUMN claim_instance_id INT8 FAMILY claim; +``` + +### Adopting jobs + +When looking to adopt jobs, the `Registry` can construct a +query which excludes jobs claimed by sessions thought to +currently be live (or at least not finished). + +As an aside, perhaps we can expose access to the implementation of +`sqlliveness.Storage` as a builtin function +`crdb_internal.sql_liveness_is_alive` in that way avoid even giving a +`sqlliveness.Storage` to the `Registry` (great idea or bad idea?). + +For example: + +```sql +SELECT * + FROM system.jobs + WHERE claim_session_id IS NULL + OR NOT crdb_internal.sql_liveness_is_alive(claim_session_id) + WHERE state IN (""); +``` + +### Mixed version migration + +During the mixed version time period, we'll require that jobs maintain both +types of leases. In particular, we should filter out the set of nodes we know +to be live right now. This conditions is a bit annoying today because we cannot +evaluate the predicate over the lease field of the payload in the execution +engine (see this proposal for a protobuf type +[#47534](https://github.com/cockroachdb/cockroach/issues/47534)). + +## Future use cases + +### SQL leases + +Today each SQL node maintains a set of leases for table descriptors in the +`system.lease` table. The `lease.Manager` is responsible for heartbeating the +leases on a per-descriptor basis. In most clusters only a small number of +leases are in use at a time, say, at most hundreds. Today leases last for 5 +minutes but refresh every minute. At 100 leases that's just 5/3rd updated per +node per second. + +As user-defined types flourish and the scale at which cockroach grows, perhaps +users might want to have 10s of thousands of leased objects (fine, it's sort of +farfetched). At that point, the lease renewal load would be something like 165 +per node per second. + +Maybe that 5m is excessive as we anticipate running SQL +pods on preemptible instances. Shortening it too much is costly here. + +We can also imagine some more advanced use cases which might be unlocked by +making large resource graphs leasable. Consider, for example, storing +protocol buffer types as a first-class type in the SQL type system. We could +imagine hosting protocol buffer definitions for a set of packages where each +message type is stored as a type. We could imagine allowing these types to +import each other such that there was a relatively large graph of type +references. See [#47534](https://github.com/cockroachdb/cockroach/issues/47534) +for some discussion though it's worth noting that the above issue is far less, +ambitious in its current thinking. + +### Transaction Record Heartbeating + +This is likely controversial. Long-running transactions have a relatively +substantial write load on the system; they write to their transaction record +once per second in order to keep the transaction alive. There is a good bit of +discussion in [#45013](https://github.com/cockroachdb/cockroach/issues/45013). + +That issue ends up recommending that we coalesce heartbeats from nodes to +ranges. In our traditional deployment topology that approach is likely to +be effective for the workloads that we have historically cared about: TPC-C. + +For a multi-tenant cluster, this approach is would likely also be relatively +effective as we anticipate that tenants will have a small number of ranges. +For a multi-tenant cluster, one concern is how txn heartbeats interact with +rate-limiting. It seems like we'll probably not want to rate-limit txn +heartbeats. + +There are problems with this approach: we'll need to retain the heartbeat as is +for bootstrapping, liveness querying will become more complex, transaction +liveness queries will now be another RPC. + +### One-version schema leasing for CHANGEFEED + +One big issue with `CHANGEFEED`s are that they buffer results while they wait +to poll the descriptors table. This prevents changefeeds from emitting results +at low latency and can lead to internal buffering. One idea which has been +proposed is to utilize an additional leasing mechanism which prevents any +schema changes. If such a lease existed, then the `CHANGEFEED` would be able +to emit entries without needing to poll the descriptors. + +Implementing this would require coordination between user transactions +attempting to perform schema changes and active `CHANGEFEED`s. This coordination +is not totally obvious but the hack-a-thon project showed that it is possible. +In particular, the user-transactions will: + +1) Write (possibly with a delete) an intent indicating its intention to operate + on a descriptor +2) in a separate, child transaction, scan to see if any one-version leases + exists +3) If yes, write a key to inform the `CHANGEFEED`s that they should drop that + lease. +4) Wait for the leases to be dropped and then delete the key. + +The `CHANGEFEED` will attempt to delete the "intention" key before writing its +lease. This ensures proper sequencing of operations in the lease acquisition. + +## Drawbacks + +TODO(ajwerner) + +## Rationale and Alternatives + +Part of the rationale for this proposal is that it centralized a distributed +coordination paradigm that has been shown to be broadly useful in etcd. + +Etcd, offers a low-level concept called a `lease` which roughly maps to our +`Session`. These leases are always stored as `UUID`s and the process of +keeping these leases alive is left to the client (though the interface has a +`KeepAlive` which should do the heavy lifting), see +[here](https://github.com/etcd-io/etcd/blob/d8c8f903eee10b8391abaef7758c38b2cd393c55/clientv3/lease.go#L108-L148). +Etcd then has a higher level construct built on top of this `Leaser` called a +[`Session`](https://godoc.org/github.com/coreos/etcd/clientv3/concurrency#Session) +which represents, roughly, the non-metadata portions of what we propose here. + + +### Leaving leasing to subsystems (maybe with libraries) + +The main alternative is to not build a subsystem but rather to request +that each individual subsystem which needs to lease resources to find +a solution which best fits its needs. With this, perhaps, could be the +construction of libraries which are flexible and can be plugged into +subsystems with ease. For example, the existing `sqlmigration/leasemanager` +package provides an implementation of a heartbeated lease +which can be relatively generically plugged into a column family of +any sort of table. + +In the end, however, the above is maintainability nightmare and potentially +exposes cockroach to scalability problems as it is unlikely +that projects whose sole purpose is anything other than leasing will +invest in a well-designed solution. + +### Tighter coupling of Session to SQLInstance + +One closely related idea is buy into the idea that generally centralizing the +concept of heartbeated rows used to then claim resources is good but is too +opaque for the current use-case. The unit of liveness with which this proposal +is concerned are `SQLInstance`s. The liveness of `SQLInstance`s relates directly +to other runtime properties of that instance. Centralizing the liveness +information of a `SQLInstance` with its other properties would improve +observability and simplify implementation for extended metadata of +`SQLInstance`s. + +A main counter-argument against using `UUID`s for sessions is that it will make +observability more difficult for little obvious gain. The epoch-based leases in +KV have shown us that node epochs, given their monotonic nature, give state in +the cluster a sense of time and place. This may not be as true for ephemeral +SQL pods. + +Given we're going to map sessions to `SQLInstanceID`. We may want to know other +things about that `SQLInstanceID` in order to make choices about how to treat a +given claim. If this is true then we'll expect metadata to be written somewhere +else before a `Session` is created. Ultimately we've decided that these concerns +do not obviously exist and do not justify coupling these separate purposes. + +It seems almost certain that network addresses for `SQLInstance`s will need to +be written somewhere. This does not have to be the same place as sessions. + +## Unresolved questions + +- How does epoch bumping happen? Should it be explicit? In this straw-man it + happens due to periodic polling. +- How long should expirations be? + - Maybe like 1m? Shorter than SQL schema leases but much longer than node + liveness. +- Are we sad about the names `Session`/`SessionID`? + - Pros: + - They align with the terminology in etcd/v3/concurrency. + - Cons: + - Overloads term used to mean the session of a SQL connection. + - We could use the full `sqlliveness.SesssionID` wherever it might be + ambiguous. +- How should transactions dependent upon claims verify the validity? There are +two different approaches offered above and another below. They seems like they +all might have valid use cases. Are any strictly better? Is there any reason to +avoid doing any of them? + + 1) Observing the session record + + - This approach is potentially expensive because you have to read from + the sqlliveness table. Today we don't have the APIs to attach the + logic of observing the session to some future batch; we'd need to + send an RPC to do the observation. In a single-region cluster this is + a good approach. For global clusters and latency sensitive operations, + it's pretty bad. + - This approach is correct if transactions are serializable. + - This approach doesn't rely on any hooks into the KV. + + + 2) Ensuring transactions commit below the expiration. + + * This is the approach used by SQL schema leases today. + * Today's use is primitive: the transaction deadline is the earliest of + the expirations seen for any table descriptor. Even if later in the + transaction that lease gets renewed. E.g. + ```sql + CREATE TABLE foo (i INT PRIMARY KEY); + INSERT INTO foo VALUES (1); + BEGIN; + SELECT * FROM foo; + SELECT pg_sleep('5m'::INTERVAL::FLOAT); + INSERT INTO foo VALUES (2); + COMMIT; + + ERROR: restart transaction: TransactionRetryWithProtoRefreshError: TransactionRetryError: retry txn (RETRY_COMMIT_DEADLINE_EXCEEDED - txn timestamp pushed too much; deadline exceeded by 1m2.355364715s (1592448539.221650880,2 > 1592448476.866286165,0)): "sql txn" meta={id=ca3a8484 key=/Table/59/1/2/0 pri=0.00314736 epo=0 ts=1592448539.221650880,2 min=1592448236.647092821,0 seq=2} lock=true stat=PENDING rts=1592448539.221650880,2 wto=false max=1592448236.647092821,0 + SQLSTATE: 40001 + ``` + + * This approach has the problem of potentially exposing tight coupling to + the KV layer. The KV layer independently decides when it's going to issue + EndTransaction requests. Ideally for this approach to work best, the + deadline would be determined as close to the sending as possible. + * It seems reasonable to inject a closure into the KV coord to call + as it constructs the EndTxn. + + 3) (not yet discussed) Removing resources by deleting entries + + * Approach discussed in [this blog post](https://dev.to/ajwerner/quick-and-easy-exactly-once-distributed-work-queues-using-serializable-transactions-jdp). + * Imagine if all "claims" had a FOREIGN KEY relationship with the + sqlliveness table with `ON DELETE SET NULL` + * Nice because it moves the cost of validation to the rare event of + bumping an epoch. Now you just need to read your claim which is likely + in the data you're already updating. + * Big bummer here is that we're talking about sticking foreign key + relationships between system tables. + * Also seems really bad to make marking a node as dead an expensive + proposition. + + 4) Perhaps there's a compromise here between 1) and 3) to be made here. + Let's call this one lazy resolution. + * This is a lot like intent resolution or epoch-based, node-liveness + derived leases. You must observe the transaction record or node liveness + record in order to write the value. + * In order to take a resource, you need to clear its claim. + * In order to clear a claim you have to ensure that it's session + is done. + * If you observe your claim in your transaction, then you know you + hold the resource. + * This is probably the approach that makes the most sense for the jobs + use case (rather than 2). Perhaps 2 should not exist at all.