-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathcompression.jl
155 lines (136 loc) · 4.84 KB
/
compression.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Compressor Codec
# ================
struct ZstdCompressor <: TranscodingStreams.Codec
cstream::CStream
level::Int
endOp::LibZstd.ZSTD_EndDirective
end
function Base.show(io::IO, codec::ZstdCompressor)
if codec.endOp == LibZstd.ZSTD_e_end
print(io, "ZstdFrameCompressor(level=$(codec.level))")
else
print(io, summary(codec), "(level=$(codec.level))")
end
end
# Same as the zstd command line tool (v1.2.0).
const DEFAULT_COMPRESSION_LEVEL = 3
"""
ZstdCompressor(;level=$(DEFAULT_COMPRESSION_LEVEL))
Create a new zstd compression codec.
Arguments
---------
- `level`: compression level (1..$(MAX_CLEVEL))
"""
function ZstdCompressor(;level::Integer=DEFAULT_COMPRESSION_LEVEL)
if !(1 ≤ level ≤ MAX_CLEVEL)
throw(ArgumentError("level must be within 1..$(MAX_CLEVEL)"))
end
return ZstdCompressor(CStream(), level)
end
ZstdCompressor(cstream, level) = ZstdCompressor(cstream, level, :continue)
"""
ZstdFrameCompressor(;level=$(DEFAULT_COMPRESSION_LEVEL))
Create a new zstd compression codec that reads the available input and then
closes the frame, encoding the decompressed size of that frame.
Arguments
---------
- `level`: compression level (1..$(MAX_CLEVEL))
"""
function ZstdFrameCompressor(;level::Integer=DEFAULT_COMPRESSION_LEVEL)
if !(1 ≤ level ≤ MAX_CLEVEL)
throw(ArgumentError("level must be within 1..$(MAX_CLEVEL)"))
end
return ZstdCompressor(CStream(), level, :end)
end
# pretend that ZstdFrameCompressor is a compressor type
function TranscodingStreams.transcode(C::typeof(ZstdFrameCompressor), args...)
codec = C()
initialize(codec)
try
return transcode(codec, args...)
finally
finalize(codec)
end
end
const ZstdCompressorStream{S} = TranscodingStream{ZstdCompressor,S} where S<:IO
"""
ZstdCompressorStream(stream::IO; kwargs...)
Create a new zstd compression stream (see `ZstdCompressor` for `kwargs`).
"""
function ZstdCompressorStream(stream::IO; kwargs...)
x, y = splitkwargs(kwargs, (:level,))
return TranscodingStream(ZstdCompressor(;x...), stream; y...)
end
# Methods
# -------
function TranscodingStreams.finalize(codec::ZstdCompressor)
if codec.cstream.ptr != C_NULL
code = free!(codec.cstream)
if iserror(code)
zstderror(codec.cstream, code)
end
codec.cstream.ptr = C_NULL
end
return
end
function TranscodingStreams.startproc(codec::ZstdCompressor, mode::Symbol, error::Error)
if codec.cstream.ptr == C_NULL
codec.cstream.ptr = LibZstd.ZSTD_createCStream()
if codec.cstream.ptr == C_NULL
throw(OutOfMemoryError())
end
i_code = initialize!(codec.cstream, codec.level)
if iserror(i_code)
error[] = ErrorException("zstd initialization error")
return :error
end
end
code = reset!(codec.cstream, 0 #=unknown source size=#)
if iserror(code)
error[] = ErrorException("zstd error")
return :error
end
return :ok
end
function TranscodingStreams.process(codec::ZstdCompressor, input::Memory, output::Memory, error::Error)
if codec.cstream.ptr == C_NULL
error("startproc must be called before process")
end
cstream = codec.cstream
ibuffer_starting_pos = UInt(0)
if codec.endOp == LibZstd.ZSTD_e_end &&
cstream.ibuffer.size != cstream.ibuffer.pos
# While saving a frame, the prior process run did not finish writing the frame.
# A positive code indicates the need for additional output buffer space.
# Re-run with the same cstream.ibuffer.size as pledged for the frame,
# otherwise a "Src size is incorrect" error will occur.
# For the current frame, cstream.ibuffer.size - cstream.ibuffer.pos
# must reflect the remaining data. Thus neither size or pos can change.
# Store the starting pos since it will be non-zero.
ibuffer_starting_pos = cstream.ibuffer.pos
# Set the pointer relative to input.ptr such that
# cstream.ibuffer.src + cstream.ibuffer.pos == input.ptr
cstream.ibuffer.src = input.ptr - cstream.ibuffer.pos
else
cstream.ibuffer.src = input.ptr
cstream.ibuffer.size = input.size
cstream.ibuffer.pos = 0
end
cstream.obuffer.dst = output.ptr
cstream.obuffer.size = output.size
cstream.obuffer.pos = 0
if input.size == 0
code = finish!(cstream)
else
code = compress!(cstream; endOp = codec.endOp)
end
Δin = Int(cstream.ibuffer.pos - ibuffer_starting_pos)
Δout = Int(cstream.obuffer.pos)
if iserror(code)
ptr = LibZstd.ZSTD_getErrorName(code)
error[] = ErrorException("zstd error: " * unsafe_string(ptr))
return Δin, Δout, :error
else
return Δin, Δout, input.size == 0 && code == 0 ? :end : :ok
end
end