Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tart push: use fixed size chunks to allow for better deduplication #821

Merged
merged 1 commit into from
May 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 6 additions & 50 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,22 @@ class DiskV2: Disk {
let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])

// Compress the disk file as multiple individually decompressible streams,
// each equal ``Self.layerLimitBytes`` bytes or slightly larger due to the
// internal compressor's buffer
var offset: UInt64 = 0

while let (compressedData, uncompressedSize, uncompressedDigest) = try compressNextLayerOfLimitBytesOrMore(mappedDisk: mappedDisk, offset: offset) {
offset += uncompressedSize
// each equal ``Self.layerLimitBytes`` bytes or less due to LZ4 compression
for data in mappedDisk.chunks(ofCount: layerLimitBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put layerLimitBytes in the manifest in case it's gonna change at some point?

Copy link
Collaborator Author

@edigaryev edigaryev May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's already there by the virtue of passing the UInt64(data.count) as uncompressedSize argument to OCIManifestLayer(...):

if let uncompressedSize = uncompressedSize {
annotations[uncompressedSizeAnnotation] = String(uncompressedSize)
}

let compressedData = try (data as NSData).compressed(using: .lz4) as Data

let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb)

pushedLayers.append(OCIManifestLayer(
mediaType: diskV2MediaType,
size: compressedData.count,
digest: layerDigest,
uncompressedSize: uncompressedSize,
uncompressedContentDigest: uncompressedDigest
uncompressedSize: UInt64(data.count),
uncompressedContentDigest: Digest.hash(data)
))

// Update progress using a relative value
progress.completedUnitCount += Int64(uncompressedSize)
progress.completedUnitCount += Int64(data.count)
}

return pushedLayers
Expand Down Expand Up @@ -144,45 +141,4 @@ class DiskV2: Disk {
}
}
}

private static func compressNextLayerOfLimitBytesOrMore(mappedDisk: Data, offset: UInt64) throws -> (Data, UInt64, String)? {
var compressedData = Data()
var bytesRead: UInt64 = 0
let digest = Digest()

// Create a compressing filter that we will terminate upon
// reaching ``Self.layerLimitBytes`` of compressed data
let compressingFilter = try InputFilter(.compress, using: .lz4, bufferCapacity: bufferSizeBytes) { (length: Int) -> Data? in
if compressedData.count >= Self.layerLimitBytes {
return nil
}

let readFromByte = Int(offset + bytesRead)

let numBytesToRead = min(mappedDisk.count - readFromByte, bufferSizeBytes)
if numBytesToRead == 0 {
return nil
}

let uncompressedChunk = mappedDisk.subdata(in: readFromByte ..< (readFromByte + numBytesToRead))

bytesRead += UInt64(uncompressedChunk.count)
digest.update(uncompressedChunk)

return uncompressedChunk
}

// Retrieve compressed data chunks, but normally no more than ``Self.layerLimitBytes`` bytes
while let compressedChunk = try compressingFilter.readData(ofLength: Self.bufferSizeBytes) {
compressedData.append(compressedChunk)
}

// Nothing was read this time from the disk,
// signal that to the consumer
if bytesRead == 0 {
return nil
}

return (compressedData, bytesRead, digest.finalize())
}
}
Loading