Skip to content

Commit

Permalink
Remove fast-io and use flexi-streams for in-memory streams.
Browse files Browse the repository at this point in the history
  • Loading branch information
hraberg committed Nov 11, 2023
1 parent 5fbb6eb commit 84461f0
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 68 deletions.
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
[submodule "_build/named-readtables"]
path = _build/named-readtables
url = https://github.com/melisgl/named-readtables
[submodule "_build/fast-io"]
path = _build/fast-io
url = https://github.com/rpav/fast-io
[submodule "_build/static-vectors"]
path = _build/static-vectors
url = https://github.com/sionescu/static-vectors
Expand Down
1 change: 0 additions & 1 deletion _build/fast-io
Submodule fast-io deleted from a4c5ad
2 changes: 1 addition & 1 deletion endb.asd
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"cl-murmurhash"
"cl-ppcre"
"com.inuoe.jzon"
"fast-io"
"flexi-streams"
"fset"
"local-time"
"periods"
Expand Down
6 changes: 4 additions & 2 deletions src/lib/arrow.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@
(defun buffer-to-vector (buffer-ptr buffer-size &optional out)
(let ((out (or out (make-array buffer-size :element-type '(unsigned-byte 8)))))
(assert (<= buffer-size (vector-byte-size out)))
(cffi:with-pointer-to-vector-data (out-ptr out)
(cffi:with-pointer-to-vector-data (out-ptr #+sbcl (sb-ext:array-storage-vector out)
#-sbcl out)
(memcpy out-ptr buffer-ptr buffer-size))
out))

Expand Down Expand Up @@ -382,7 +383,8 @@

(defun read-arrow-arrays-from-ipc-buffer (buffer)
(check-type buffer (vector (unsigned-byte 8)))
(cffi:with-pointer-to-vector-data (buffer-ptr buffer)
(cffi:with-pointer-to-vector-data (buffer-ptr #+sbcl (sb-ext:array-storage-vector buffer)
#-sbcl buffer)
(read-arrow-arrays-from-ipc-pointer buffer-ptr (length buffer))))

(defun read-arrow-arrays-from-ipc-file (file)
Expand Down
11 changes: 5 additions & 6 deletions src/storage/object-store.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(:export #:open-tar-object-store #:object-store-get #:object-store-put #:object-store-list #:object-store-close #:make-directory-object-store #:make-memory-object-store)
(:import-from :alexandria)
(:import-from :archive)
(:import-from :fast-io)
(:import-from :flexi-streams)
(:import-from :uiop)
(:import-from :bordeaux-threads))
(in-package :endb/storage/object-store)
Expand All @@ -15,16 +15,15 @@

(defvar *tar-object-store-lock* (bt:make-lock))

(defun open-tar-object-store (&key (stream (make-instance 'fast-io:fast-input-stream)))
(defun open-tar-object-store (&key (stream (flex:make-in-memory-input-stream (make-array 0 :element-type '(unsigned-byte 8)))))
(let ((archive (archive:open-archive 'archive:tar-archive stream)))
(when (typep stream 'fast-io:fast-input-stream)
(when (typep stream 'flex:vector-stream)
(setf (slot-value archive 'archive::skippable-p) t))
archive))

(defun %extract-entry (archive entry)
(let* ((out (make-instance 'fast-io:fast-output-stream :buffer-size (archive::size entry))))
(archive::transfer-entry-data-to-stream archive entry out)
(fast-io:finish-output-stream out)))
(flex:with-output-to-sequence (out)
(archive::transfer-entry-data-to-stream archive entry out)))

(defun %wal-read-entry-safe (archive)
(when (listen (archive::archive-stream archive))
Expand Down
17 changes: 8 additions & 9 deletions src/storage/wal.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(:use :cl)
(:export #:open-tar-wal #:tar-wal-position-stream-at-end #:wal-append-entry #:wal-read-next-entry #:wal-find-entry #:wal-fsync #:wal-close #:make-memory-wal)
(:import-from :archive)
(:import-from :fast-io)
(:import-from :flexi-streams)
(:import-from :local-time))
(in-package :endb/storage/wal)

Expand All @@ -11,9 +11,9 @@
(defgeneric wal-fsync (wal))
(defgeneric wal-close (wal))

(defun open-tar-wal (&key (stream (make-instance 'fast-io:fast-output-stream)) (direction :output))
(defun open-tar-wal (&key (stream (flex:make-in-memory-output-stream)) (direction :output))
(let ((archive (archive:open-archive 'archive:tar-archive stream :direction direction)))
(when (input-stream-p stream)
(when (typep stream 'flex:vector-stream)
(setf (slot-value archive 'archive::skippable-p) t))
archive))

Expand All @@ -36,14 +36,13 @@
:mode +wal-file-mode+
:typeflag archive::+tar-regular-file+
:size (length buffer)
:mtime (local-time:timestamp-to-unix (local-time:now))))
(stream (make-instance 'fast-io:fast-input-stream :vector buffer)))
(archive:write-entry-to-archive archive entry :stream stream)))
:mtime (local-time:timestamp-to-unix (local-time:now)))))
(flex:with-input-from-sequence (in buffer)
(archive:write-entry-to-archive archive entry :stream in))))

(defun %extract-entry (archive entry)
(let* ((out (make-instance 'fast-io:fast-output-stream :buffer-size (archive::size entry))))
(archive::transfer-entry-data-to-stream archive entry out)
(fast-io:finish-output-stream out)))
(flex:with-output-to-sequence (out)
(archive::transfer-entry-data-to-stream archive entry out)))

(defmethod wal-read-next-entry ((archive archive:tar-archive) &key skip-if)
(let* ((entry (archive:read-entry-from-archive archive))
Expand Down
84 changes: 38 additions & 46 deletions test/storage.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(:import-from :endb/arrow)
(:import-from :endb/lib/arrow)
(:import-from :archive)
(:import-from :fast-io)
(:import-from :flexi-streams)
(:import-from :fset)
(:import-from :trivial-gray-streams)
(:import-from :trivial-utf-8)
Expand All @@ -13,65 +13,57 @@

(in-suite* :storage)

(defmethod trivial-gray-streams:stream-listen ((stream fast-io:fast-input-stream))
(not (eq :eof (fast-io::peek-byte stream nil nil :eof))))

(test fast-io-gray-stream-listen
(let ((in (make-instance 'fast-io:fast-input-stream :vector (make-array 1 :initial-element 42 :element-type '(unsigned-byte 8)))))
(is (listen in))
(is (= 42 (read-byte in)))
(is (null (listen in)))))

(test tar-wal-and-object-store
(let* ((out (make-instance 'fast-io:fast-output-stream))
(let* ((out (flex:make-in-memory-output-stream))
(wal (open-tar-wal :stream out)))

(wal-append-entry wal "foo.txt" (trivial-utf-8:string-to-utf-8-bytes "foo"))
(wal-append-entry wal "bar.txt" (trivial-utf-8:string-to-utf-8-bytes "bar"))
(wal-fsync wal)
(wal-close wal)

(let* ((in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out)))
(wal (open-tar-wal :stream in :direction :input))
(skip-pred (lambda (path)
(equal "foo.txt" path))))
(let ((out-buffer (flex:get-output-stream-sequence out)))
(let* ((in (flex:make-in-memory-input-stream out-buffer))
(wal (open-tar-wal :stream in :direction :input))
(skip-pred (lambda (path)
(equal "foo.txt" path))))

(is (archive::skippable-p wal))
(is (archive::skippable-p wal))

(multiple-value-bind (buffer name pos)
(wal-read-next-entry wal :skip-if skip-pred)
(is (null buffer))
(is (equal "foo.txt" name))
(is (= 1024 pos)))
(multiple-value-bind (buffer name pos)
(wal-read-next-entry wal :skip-if skip-pred)
(is (null buffer))
(is (equal "foo.txt" name))
(is (= 1024 pos)))

(multiple-value-bind (buffer name pos)
(wal-read-next-entry wal :skip-if skip-pred)
(is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar") buffer))
(is (equal "bar.txt" name))
(is (= 2048 pos)))
(multiple-value-bind (buffer name pos)
(wal-read-next-entry wal :skip-if skip-pred)
(is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar") buffer))
(is (equal "bar.txt" name))
(is (= 2048 pos)))

(multiple-value-bind (buffer name pos)
(wal-read-next-entry wal :skip-if skip-pred)
(is (null buffer))
(is (null name))
(is (= 2560 pos))))
(multiple-value-bind (buffer name pos)
(wal-read-next-entry wal :skip-if skip-pred)
(is (null buffer))
(is (null name))
(is (= 2560 pos))))

(let* ((in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out)))
(wal (open-tar-object-store :stream in)))
(let* ((in (flex:make-in-memory-input-stream out-buffer))
(wal (open-tar-object-store :stream in)))

(is (archive::skippable-p wal))
(is (archive::skippable-p wal))

(is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar")
(object-store-get wal "bar.txt")))
(is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar")
(object-store-get wal "bar.txt")))

(is (equalp (trivial-utf-8:string-to-utf-8-bytes "foo")
(object-store-get wal "foo.txt")))
(is (equalp (trivial-utf-8:string-to-utf-8-bytes "foo")
(object-store-get wal "foo.txt")))

(is (null (object-store-get wal "baz.txt")))
(is (null (object-store-get wal "baz.txt")))

(is (equal '("bar.txt" "foo.txt") (object-store-list wal)))
(is (equal '("foo.txt") (object-store-list wal :prefix "foo")))
(is (equal '("foo.txt") (object-store-list wal :start-after "bar.txt"))))))
(is (equal '("bar.txt" "foo.txt") (object-store-list wal)))
(is (equal '("foo.txt") (object-store-list wal :prefix "foo")))
(is (equal '("foo.txt") (object-store-list wal :start-after "bar.txt")))))))

(test tar-wal-reopen-and-append
(let* ((target-dir (asdf:system-relative-pathname :endb-test "target/"))
Expand Down Expand Up @@ -156,7 +148,7 @@
(uiop:delete-directory-tree test-dir :validate t)))))

(test buffer-pool
(let* ((out (make-instance 'fast-io:fast-output-stream))
(let* ((out (flex:make-in-memory-output-stream))
(wal (open-tar-wal :stream out))
(batches (list (list (fset:map ("x" 1))
(fset:map ("x" 2))
Expand All @@ -174,7 +166,7 @@
(mapcar #'endb/arrow:to-arrow batches)))
(wal-close wal)

(let* ((in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out)))
(let* ((in (flex:make-in-memory-input-stream (flex:get-output-stream-sequence out)))
(os (open-tar-object-store :stream in))
(bp (make-buffer-pool :object-store os))
(actual (buffer-pool-get bp "foo.arrow")))
Expand All @@ -186,11 +178,11 @@
(is (null (buffer-pool-get bp "bar.arrow"))))))

(test writable-buffer-pool
(let* ((out (make-instance 'fast-io:fast-output-stream))
(let* ((out (flex:make-in-memory-output-stream))
(batches (list (list (fset:map ("x" 1))
(fset:map ("x" 2))
(fset:map ("x" 3)))))
(in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out)))
(in (flex:make-in-memory-input-stream (flex:get-output-stream-sequence out)))
(os (open-tar-object-store :stream in))
(bp (make-buffer-pool :object-store os))
(wbp (make-writeable-buffer-pool :parent-pool bp)))
Expand Down

0 comments on commit 84461f0

Please sign in to comment.