Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tart pull: try to re-use APFS blocks by cloning the base image #864

Merged
merged 10 commits into from
Jul 25, 2024
10 changes: 9 additions & 1 deletion Sources/tart/LocalLayerCache.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import Foundation

struct LocalLayerCache {
let name: String
let deduplicatedBytes: UInt64
let diskURL: URL

private let mappedDisk: Data
private var digestToRange: [String : Range<Data.Index>] = [:]

init?(_ diskURL: URL, _ manifest: OCIManifest) throws {
init?(_ name: String, _ deduplicatedBytes: UInt64, _ diskURL: URL, _ manifest: OCIManifest) throws {
self.name = name
self.deduplicatedBytes = deduplicatedBytes
self.diskURL = diskURL

// mmap(2) the disk that contains the layers from the manifest
self.mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])

Expand Down
72 changes: 65 additions & 7 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Foundation
import Compression
import System

class DiskV2: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
Expand Down Expand Up @@ -37,8 +38,17 @@ class DiskV2: Disk {
// Support resumable pulls
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)

if !pullResumed && !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
if !pullResumed {
if let localLayerCache = localLayerCache {
// Clone the local layer cache's disk and use it as a base, potentially
// reducing the space usage since some blocks won't be written at all
try FileManager.default.copyItem(at: localLayerCache.diskURL, to: diskURL)
} else {
// Otherwise create an empty disk
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
}
}
}

// Calculate the uncompressed disk size
Expand All @@ -58,6 +68,15 @@ class DiskV2: Disk {
try disk.truncate(atOffset: uncompressedDiskSize)
try disk.close()

// Determine the file system block size
var st = stat()
if stat(diskURL.path, &st) == -1 {
let details = Errno(rawValue: errno)

throw RuntimeError.PullFailed("failed to stat(2) disk \(diskURL.path): \(details)")
}
let fsBlockSize = UInt64(st.st_blksize)

// Concurrently fetch and decompress layers
try await withThrowingTaskGroup(of: Void.self) { group in
var globalDiskWritingOffset: UInt64 = 0
Expand Down Expand Up @@ -89,13 +108,21 @@ class DiskV2: Disk {
return
}

// Open the disk file
// Open the disk file for writing
let disk = try FileHandle(forWritingTo: diskURL)

// Also open the disk file for reading and verifying
// its contents in case the local layer cache is used
let rdisk: FileHandle? = if localLayerCache != nil {
try FileHandle(forReadingFrom: diskURL)
} else {
nil
}

// Check if we already have this layer contents in the local layer cache
if let localLayerCache = localLayerCache, let data = localLayerCache.find(diskLayer.digest), Digest.hash(data) == uncompressedLayerContentDigest {
// Fulfil the layer contents from the local blob cache
_ = try zeroSkippingWrite(disk, diskWritingOffset, data)
_ = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
try disk.close()

// Update the progress
Expand All @@ -112,7 +139,7 @@ class DiskV2: Disk {
return
}

diskWritingOffset = try zeroSkippingWrite(disk, diskWritingOffset, data)
diskWritingOffset = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
}

try await registry.pullBlob(diskLayer.digest) { data in
Expand All @@ -132,7 +159,7 @@ class DiskV2: Disk {
}
}

private static func zeroSkippingWrite(_ disk: FileHandle, _ offset: UInt64, _ data: Data) throws -> UInt64 {
private static func zeroSkippingWrite(_ disk: FileHandle, _ rdisk: FileHandle?, _ fsBlockSize: UInt64, _ offset: UInt64, _ data: Data) throws -> UInt64 {
let holeGranularityBytes = 64 * 1024

// A zero chunk for faster than byte-by-byte comparisons
Expand All @@ -152,7 +179,38 @@ class DiskV2: Disk {
var offset = offset

for chunk in data.chunks(ofCount: holeGranularityBytes) {
// Only write chunks that are not zero
// If the local layer cache is used, only write chunks that differ
// since the base disk can contain anything at any position
if let rdisk = rdisk {
// F_PUNCHHOLE requires the holes to be aligned to file system block boundaries
let isHoleAligned = (offset % fsBlockSize) == 0 && (UInt64(chunk.count) % fsBlockSize) == 0

if isHoleAligned && chunk == zeroChunk {
var arg = fpunchhole_t(fp_flags: 0, reserved: 0, fp_offset: off_t(offset), fp_length: off_t(chunk.count))

if fcntl(disk.fileDescriptor, F_PUNCHHOLE, &arg) == -1 {
let details = Errno(rawValue: errno)

throw RuntimeError.PullFailed("failed to punch hole: \(details)")
}
} else {
try rdisk.seek(toOffset: offset)
let actualContentsOnDisk = try rdisk.read(upToCount: chunk.count)

if chunk != actualContentsOnDisk {
try disk.seek(toOffset: offset)
disk.write(chunk)
}
}

offset += UInt64(chunk.count)

continue
}

// Otherwise, only write chunks that are not zero
// since the base disk is created from scratch and
// is zeroed via truncate(2)
if chunk != zeroChunk {
try disk.seek(toOffset: offset)
disk.write(chunk)
Expand Down
5 changes: 1 addition & 4 deletions Sources/tart/VMDirectory+OCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ enum OCIError: Error {
}

extension VMDirectory {
private static let bufferSizeBytes = 64 * 1024 * 1024
private static let layerLimitBytes = 500 * 1000 * 1000

func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws {
// Pull VM's config file layer and re-serialize it into a config file
let configLayers = manifest.layers.filter {
Expand Down Expand Up @@ -80,7 +77,7 @@ extension VMDirectory {
}
try nvram.close()

// Serialize VM's manifest to enable better de-duplication on subsequent "tart pull"'s
// Serialize VM's manifest to enable better deduplication on subsequent "tart pull"'s
try manifest.toJSON().write(to: manifestURL)
}

Expand Down
22 changes: 16 additions & 6 deletions Sources/tart/VMStorageOCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,18 @@ class VMStorageOCI: PrunableStorage {
// Choose the best base image which has the most deduplication ratio
let localLayerCache = try await chooseLocalLayerCache(name, manifest, registry)

if let llc = localLayerCache {
fkorotkov marked this conversation as resolved.
Show resolved Hide resolved
let deduplicatedHuman = ByteCountFormatter.string(fromByteCount: Int64(llc.deduplicatedBytes), countStyle: .file)

defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
}

try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache)
} recoverFromFailure: { error in
if error is RuntimeError {
return .throw
}

print("Error: \(error.localizedDescription)")
print("Attempting to re-try...")

Expand Down Expand Up @@ -246,15 +256,15 @@ class VMStorageOCI: PrunableStorage {

func chooseLocalLayerCache(_ name: RemoteName, _ manifest: OCIManifest, _ registry: Registry) async throws -> LocalLayerCache? {
// Establish a closure that will calculate how much bytes
// we'll de-duplicate if we re-use the given manifest
// we'll deduplicate if we re-use the given manifest
let target = Swift.Set(manifest.layers)

let calculateDeduplicatedBytes = { (manifest: OCIManifest) -> Int in
target.intersection(manifest.layers).map({ $0.size }).reduce(0, +)
let calculateDeduplicatedBytes = { (manifest: OCIManifest) -> UInt64 in
target.intersection(manifest.layers).map({ UInt64($0.size) }).reduce(0, +)
}

// Load OCI VM images and their manifests (if present)
var candidates: [(name: String, vmDir: VMDirectory, manifest: OCIManifest, deduplicatedBytes: Int)] = []
var candidates: [(name: String, vmDir: VMDirectory, manifest: OCIManifest, deduplicatedBytes: UInt64)] = []

for (name, vmDir, isSymlink) in try list() {
if isSymlink {
Expand Down Expand Up @@ -285,13 +295,13 @@ class VMStorageOCI: PrunableStorage {
candidates.append((name.description, vmDir, manifest, calculateDeduplicatedBytes(manifest)))
}

// Now, find the best match based on how many bytes we'll de-duplicate
// Now, find the best match based on how many bytes we'll deduplicate
let choosen = candidates.max { left, right in
return left.deduplicatedBytes < right.deduplicatedBytes
}

return try choosen.flatMap({ choosen in
try LocalLayerCache(choosen.vmDir.diskURL, choosen.manifest)
try LocalLayerCache(choosen.name, choosen.deduplicatedBytes, choosen.vmDir.diskURL, choosen.manifest)
})
}
}
Expand Down