Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tart {clone,pull}: make deduplication opt-in #924

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Sources/tart/Commands/Clone.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ struct Clone: AsyncParsableCommand {
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
var concurrency: UInt = 4

@Flag(help: .hidden)
var deduplicate: Bool = false

func validate() throws {
if newName.contains("/") {
throw ValidationError("<new-name> should be a local name")
Expand All @@ -45,7 +48,7 @@ struct Clone: AsyncParsableCommand {
if let remoteName = try? RemoteName(sourceName), !ociStorage.exists(remoteName) {
// Pull the VM in case it's OCI-based and doesn't exist locally yet
let registry = try Registry(host: remoteName.host, namespace: remoteName.namespace, insecure: insecure)
try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency)
try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency, deduplicate: deduplicate)
}

let sourceVM = try VMStorageHelper.open(sourceName)
Expand Down
5 changes: 4 additions & 1 deletion Sources/tart/Commands/Pull.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ struct Pull: AsyncParsableCommand {
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
var concurrency: UInt = 4

@Flag(help: .hidden)
var deduplicate: Bool = false

func validate() throws {
if concurrency < 1 {
throw ValidationError("network concurrency cannot be less than 1")
Expand All @@ -43,6 +46,6 @@ struct Pull: AsyncParsableCommand {

defaultLogger.appendNewLine("pulling \(remoteName)...")

try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency)
try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency, deduplicate: deduplicate)
}
}
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/Disk.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import Foundation

protocol Disk {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer]
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?, deduplicate: Bool) async throws
}
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/DiskV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DiskV1: Disk {
return pushedLayers
}

static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil, deduplicate: Bool = false) async throws {
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
}
Expand Down
31 changes: 20 additions & 11 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ class DiskV2: Disk {
}
}

static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil, deduplicate: Bool = false) async throws {
// Support resumable pulls
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)

if !pullResumed {
if let localLayerCache = localLayerCache {
if deduplicate, let localLayerCache = localLayerCache {
// Clone the local layer cache's disk and use it as a base, potentially
// reducing the space usage since some blocks won't be written at all
try FileManager.default.copyItem(at: localLayerCache.diskURL, to: diskURL)
Expand Down Expand Up @@ -151,26 +151,31 @@ class DiskV2: Disk {

// Also open the disk file for reading and verifying
// its contents in case the local layer cache is used
let rdisk: FileHandle? = if localLayerCache != nil {
let rdisk: FileHandle? = if deduplicate && localLayerCache != nil {
try FileHandle(forReadingFrom: diskURL)
} else {
nil
}

// Check if we already have this layer contents in the local layer cache
if let localLayerCache = localLayerCache, let localLayerInfo = localLayerCache.findInfo(digest: diskLayer.digest, offsetHint: diskWritingOffset) {
// indicates that the locally cloned disk image has the same content at the given offset
let localHit = localLayerInfo.uncompressedContentDigest == uncompressedLayerContentDigest
&& localLayerInfo.range.lowerBound == diskWritingOffset
// doesn't seem that localHit can ever be false if the localLayerCache is not nil
// but let's just add extra safety here and check it
if !localHit {
// Check if we already have this layer contents in the local layer cache,
// or perhaps even on the cloned disk (when the deduplication is enabled)
if let localLayerCache = localLayerCache,
let localLayerInfo = localLayerCache.findInfo(digest: diskLayer.digest, offsetHint: diskWritingOffset),
localLayerInfo.uncompressedContentDigest == uncompressedLayerContentDigest {
if deduplicate && localLayerInfo.range.lowerBound == diskWritingOffset {
// Do nothing, because the data is already on the disk that we've inherited from
} else {
// Fulfil the layer contents from the local blob cache
let data = localLayerCache.subdata(localLayerInfo.range)
_ = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
}

try disk.close()

if let rdisk = rdisk {
try rdisk.close()
}

// Update the progress
progress.completedUnitCount += Int64(diskLayer.size)

Expand Down Expand Up @@ -198,6 +203,10 @@ class DiskV2: Disk {
try filter.finalize()

try disk.close()

if let rdisk = rdisk {
try rdisk.close()
}
}

globalDiskWritingOffset += uncompressedLayerSize
Expand Down
7 changes: 4 additions & 3 deletions Sources/tart/VMDirectory+OCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ enum OCIError: Error {
}

extension VMDirectory {
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws {
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?, deduplicate: Bool) async throws {
// Pull VM's config file layer and re-serialize it into a config file
let configLayers = manifest.layers.filter {
$0.mediaType == configMediaType
Expand Down Expand Up @@ -54,12 +54,13 @@ extension VMDirectory {
do {
try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL,
concurrency: concurrency, progress: progress,
localLayerCache: localLayerCache)
localLayerCache: localLayerCache,
deduplicate: deduplicate)
} catch let error where error is FilterError {
throw RuntimeError.PullFailed("failed to decompress disk: \(error.localizedDescription)")
}

if let llc = localLayerCache {
if deduplicate, let llc = localLayerCache {
// set custom attribute to remember deduplicated bytes
diskURL.setDeduplicatedBytes(llc.deduplicatedBytes)
}
Expand Down
10 changes: 7 additions & 3 deletions Sources/tart/VMStorageOCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class VMStorageOCI: PrunableStorage {
try list().filter { (_, _, isSymlink) in !isSymlink }.map { (_, vmDir, _) in vmDir }
}

func pull(_ name: RemoteName, registry: Registry, concurrency: UInt) async throws {
func pull(_ name: RemoteName, registry: Registry, concurrency: UInt, deduplicate: Bool) async throws {
SentrySDK.configureScope { scope in
scope.setContext(value: ["imageName": name.description], key: "OCI")
}
Expand Down Expand Up @@ -203,10 +203,14 @@ class VMStorageOCI: PrunableStorage {
if let llc = localLayerCache {
let deduplicatedHuman = ByteCountFormatter.string(fromByteCount: Int64(llc.deduplicatedBytes), countStyle: .file)

defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
if deduplicate {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
} else {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to avoid fetching \(deduplicatedHuman), will try use it...")
}
}

try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache)
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache, deduplicate: deduplicate)
} recoverFromFailure: { error in
if error is Retryable {
print("Error: \(error.localizedDescription)")
Expand Down
Loading