From 84461f0d91363242111ef758a81d21b7c1d059ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kan=20R=C3=A5berg?= Date: Sat, 11 Nov 2023 18:06:59 +0100 Subject: [PATCH] Remove fast-io and use flexi-streams for in-memory streams. --- .gitmodules | 3 -- _build/fast-io | 1 - endb.asd | 2 +- src/lib/arrow.lisp | 6 ++- src/storage/object-store.lisp | 11 +++-- src/storage/wal.lisp | 17 ++++--- test/storage.lisp | 84 ++++++++++++++++------------------- 7 files changed, 56 insertions(+), 68 deletions(-) delete mode 160000 _build/fast-io diff --git a/.gitmodules b/.gitmodules index eea01099..1087cff3 100644 --- a/.gitmodules +++ b/.gitmodules @@ -41,9 +41,6 @@ [submodule "_build/named-readtables"] path = _build/named-readtables url = https://github.com/melisgl/named-readtables -[submodule "_build/fast-io"] - path = _build/fast-io - url = https://github.com/rpav/fast-io [submodule "_build/static-vectors"] path = _build/static-vectors url = https://github.com/sionescu/static-vectors diff --git a/_build/fast-io b/_build/fast-io deleted file mode 160000 index a4c5ad60..00000000 --- a/_build/fast-io +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a4c5ad600425842e8b6233b1fa22610ffcd874c3 diff --git a/endb.asd b/endb.asd index 8fbf5f4f..ca0eaa87 100644 --- a/endb.asd +++ b/endb.asd @@ -16,7 +16,7 @@ "cl-murmurhash" "cl-ppcre" "com.inuoe.jzon" - "fast-io" + "flexi-streams" "fset" "local-time" "periods" diff --git a/src/lib/arrow.lisp b/src/lib/arrow.lisp index f7c61481..8cba5f0c 100644 --- a/src/lib/arrow.lisp +++ b/src/lib/arrow.lisp @@ -255,7 +255,8 @@ (defun buffer-to-vector (buffer-ptr buffer-size &optional out) (let ((out (or out (make-array buffer-size :element-type '(unsigned-byte 8))))) (assert (<= buffer-size (vector-byte-size out))) - (cffi:with-pointer-to-vector-data (out-ptr out) + (cffi:with-pointer-to-vector-data (out-ptr #+sbcl (sb-ext:array-storage-vector out) + #-sbcl out) (memcpy out-ptr buffer-ptr buffer-size)) out)) @@ -382,7 +383,8 @@ (defun read-arrow-arrays-from-ipc-buffer (buffer) (check-type buffer (vector (unsigned-byte 8))) - (cffi:with-pointer-to-vector-data (buffer-ptr buffer) + (cffi:with-pointer-to-vector-data (buffer-ptr #+sbcl (sb-ext:array-storage-vector buffer) + #-sbcl buffer) (read-arrow-arrays-from-ipc-pointer buffer-ptr (length buffer)))) (defun read-arrow-arrays-from-ipc-file (file) diff --git a/src/storage/object-store.lisp b/src/storage/object-store.lisp index 1866755a..98c03e52 100644 --- a/src/storage/object-store.lisp +++ b/src/storage/object-store.lisp @@ -3,7 +3,7 @@ (:export #:open-tar-object-store #:object-store-get #:object-store-put #:object-store-list #:object-store-close #:make-directory-object-store #:make-memory-object-store) (:import-from :alexandria) (:import-from :archive) - (:import-from :fast-io) + (:import-from :flexi-streams) (:import-from :uiop) (:import-from :bordeaux-threads)) (in-package :endb/storage/object-store) @@ -15,16 +15,15 @@ (defvar *tar-object-store-lock* (bt:make-lock)) -(defun open-tar-object-store (&key (stream (make-instance 'fast-io:fast-input-stream))) +(defun open-tar-object-store (&key (stream (flex:make-in-memory-input-stream (make-array 0 :element-type '(unsigned-byte 8))))) (let ((archive (archive:open-archive 'archive:tar-archive stream))) - (when (typep stream 'fast-io:fast-input-stream) + (when (typep stream 'flex:vector-stream) (setf (slot-value archive 'archive::skippable-p) t)) archive)) (defun %extract-entry (archive entry) - (let* ((out (make-instance 'fast-io:fast-output-stream :buffer-size (archive::size entry)))) - (archive::transfer-entry-data-to-stream archive entry out) - (fast-io:finish-output-stream out))) + (flex:with-output-to-sequence (out) + (archive::transfer-entry-data-to-stream archive entry out))) (defun %wal-read-entry-safe (archive) (when (listen (archive::archive-stream archive)) diff --git a/src/storage/wal.lisp b/src/storage/wal.lisp index 055fa076..a0e0a2d1 100644 --- a/src/storage/wal.lisp +++ b/src/storage/wal.lisp @@ -2,7 +2,7 @@ (:use :cl) (:export #:open-tar-wal #:tar-wal-position-stream-at-end #:wal-append-entry #:wal-read-next-entry #:wal-find-entry #:wal-fsync #:wal-close #:make-memory-wal) (:import-from :archive) - (:import-from :fast-io) + (:import-from :flexi-streams) (:import-from :local-time)) (in-package :endb/storage/wal) @@ -11,9 +11,9 @@ (defgeneric wal-fsync (wal)) (defgeneric wal-close (wal)) -(defun open-tar-wal (&key (stream (make-instance 'fast-io:fast-output-stream)) (direction :output)) +(defun open-tar-wal (&key (stream (flex:make-in-memory-output-stream)) (direction :output)) (let ((archive (archive:open-archive 'archive:tar-archive stream :direction direction))) - (when (input-stream-p stream) + (when (typep stream 'flex:vector-stream) (setf (slot-value archive 'archive::skippable-p) t)) archive)) @@ -36,14 +36,13 @@ :mode +wal-file-mode+ :typeflag archive::+tar-regular-file+ :size (length buffer) - :mtime (local-time:timestamp-to-unix (local-time:now)))) - (stream (make-instance 'fast-io:fast-input-stream :vector buffer))) - (archive:write-entry-to-archive archive entry :stream stream))) + :mtime (local-time:timestamp-to-unix (local-time:now))))) + (flex:with-input-from-sequence (in buffer) + (archive:write-entry-to-archive archive entry :stream in)))) (defun %extract-entry (archive entry) - (let* ((out (make-instance 'fast-io:fast-output-stream :buffer-size (archive::size entry)))) - (archive::transfer-entry-data-to-stream archive entry out) - (fast-io:finish-output-stream out))) + (flex:with-output-to-sequence (out) + (archive::transfer-entry-data-to-stream archive entry out))) (defmethod wal-read-next-entry ((archive archive:tar-archive) &key skip-if) (let* ((entry (archive:read-entry-from-archive archive)) diff --git a/test/storage.lisp b/test/storage.lisp index b0ee1479..f822aa62 100644 --- a/test/storage.lisp +++ b/test/storage.lisp @@ -3,7 +3,7 @@ (:import-from :endb/arrow) (:import-from :endb/lib/arrow) (:import-from :archive) - (:import-from :fast-io) + (:import-from :flexi-streams) (:import-from :fset) (:import-from :trivial-gray-streams) (:import-from :trivial-utf-8) @@ -13,17 +13,8 @@ (in-suite* :storage) -(defmethod trivial-gray-streams:stream-listen ((stream fast-io:fast-input-stream)) - (not (eq :eof (fast-io::peek-byte stream nil nil :eof)))) - -(test fast-io-gray-stream-listen - (let ((in (make-instance 'fast-io:fast-input-stream :vector (make-array 1 :initial-element 42 :element-type '(unsigned-byte 8))))) - (is (listen in)) - (is (= 42 (read-byte in))) - (is (null (listen in))))) - (test tar-wal-and-object-store - (let* ((out (make-instance 'fast-io:fast-output-stream)) + (let* ((out (flex:make-in-memory-output-stream)) (wal (open-tar-wal :stream out))) (wal-append-entry wal "foo.txt" (trivial-utf-8:string-to-utf-8-bytes "foo")) @@ -31,47 +22,48 @@ (wal-fsync wal) (wal-close wal) - (let* ((in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out))) - (wal (open-tar-wal :stream in :direction :input)) - (skip-pred (lambda (path) - (equal "foo.txt" path)))) + (let ((out-buffer (flex:get-output-stream-sequence out))) + (let* ((in (flex:make-in-memory-input-stream out-buffer)) + (wal (open-tar-wal :stream in :direction :input)) + (skip-pred (lambda (path) + (equal "foo.txt" path)))) - (is (archive::skippable-p wal)) + (is (archive::skippable-p wal)) - (multiple-value-bind (buffer name pos) - (wal-read-next-entry wal :skip-if skip-pred) - (is (null buffer)) - (is (equal "foo.txt" name)) - (is (= 1024 pos))) + (multiple-value-bind (buffer name pos) + (wal-read-next-entry wal :skip-if skip-pred) + (is (null buffer)) + (is (equal "foo.txt" name)) + (is (= 1024 pos))) - (multiple-value-bind (buffer name pos) - (wal-read-next-entry wal :skip-if skip-pred) - (is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar") buffer)) - (is (equal "bar.txt" name)) - (is (= 2048 pos))) + (multiple-value-bind (buffer name pos) + (wal-read-next-entry wal :skip-if skip-pred) + (is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar") buffer)) + (is (equal "bar.txt" name)) + (is (= 2048 pos))) - (multiple-value-bind (buffer name pos) - (wal-read-next-entry wal :skip-if skip-pred) - (is (null buffer)) - (is (null name)) - (is (= 2560 pos)))) + (multiple-value-bind (buffer name pos) + (wal-read-next-entry wal :skip-if skip-pred) + (is (null buffer)) + (is (null name)) + (is (= 2560 pos)))) - (let* ((in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out))) - (wal (open-tar-object-store :stream in))) + (let* ((in (flex:make-in-memory-input-stream out-buffer)) + (wal (open-tar-object-store :stream in))) - (is (archive::skippable-p wal)) + (is (archive::skippable-p wal)) - (is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar") - (object-store-get wal "bar.txt"))) + (is (equalp (trivial-utf-8:string-to-utf-8-bytes "bar") + (object-store-get wal "bar.txt"))) - (is (equalp (trivial-utf-8:string-to-utf-8-bytes "foo") - (object-store-get wal "foo.txt"))) + (is (equalp (trivial-utf-8:string-to-utf-8-bytes "foo") + (object-store-get wal "foo.txt"))) - (is (null (object-store-get wal "baz.txt"))) + (is (null (object-store-get wal "baz.txt"))) - (is (equal '("bar.txt" "foo.txt") (object-store-list wal))) - (is (equal '("foo.txt") (object-store-list wal :prefix "foo"))) - (is (equal '("foo.txt") (object-store-list wal :start-after "bar.txt")))))) + (is (equal '("bar.txt" "foo.txt") (object-store-list wal))) + (is (equal '("foo.txt") (object-store-list wal :prefix "foo"))) + (is (equal '("foo.txt") (object-store-list wal :start-after "bar.txt"))))))) (test tar-wal-reopen-and-append (let* ((target-dir (asdf:system-relative-pathname :endb-test "target/")) @@ -156,7 +148,7 @@ (uiop:delete-directory-tree test-dir :validate t))))) (test buffer-pool - (let* ((out (make-instance 'fast-io:fast-output-stream)) + (let* ((out (flex:make-in-memory-output-stream)) (wal (open-tar-wal :stream out)) (batches (list (list (fset:map ("x" 1)) (fset:map ("x" 2)) @@ -174,7 +166,7 @@ (mapcar #'endb/arrow:to-arrow batches))) (wal-close wal) - (let* ((in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out))) + (let* ((in (flex:make-in-memory-input-stream (flex:get-output-stream-sequence out))) (os (open-tar-object-store :stream in)) (bp (make-buffer-pool :object-store os)) (actual (buffer-pool-get bp "foo.arrow"))) @@ -186,11 +178,11 @@ (is (null (buffer-pool-get bp "bar.arrow")))))) (test writable-buffer-pool - (let* ((out (make-instance 'fast-io:fast-output-stream)) + (let* ((out (flex:make-in-memory-output-stream)) (batches (list (list (fset:map ("x" 1)) (fset:map ("x" 2)) (fset:map ("x" 3))))) - (in (make-instance 'fast-io:fast-input-stream :vector (fast-io:finish-output-stream out))) + (in (flex:make-in-memory-input-stream (flex:get-output-stream-sequence out))) (os (open-tar-object-store :stream in)) (bp (make-buffer-pool :object-store os)) (wbp (make-writeable-buffer-pool :parent-pool bp)))