diff --git a/dissect/esedb/btree.py b/dissect/esedb/btree.py new file mode 100644 index 0000000..1fa862f --- /dev/null +++ b/dissect/esedb/btree.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from dissect.esedb.exceptions import KeyNotFoundError, NoNeighbourPageError + +if TYPE_CHECKING: + from dissect.esedb.esedb import EseDB + from dissect.esedb.page import Node, Page + + +class BTree: + """A simple implementation for searching the ESE B+Trees. + + This is a stateful interactive class that moves an internal cursor to a position within the BTree. + + Args: + esedb: An instance of :class:`~dissect.esedb.esedb.EseDB`. + page: The page to open a BTree on. + """ + + def __init__(self, esedb: EseDB, root: int | Page): + self.esedb = esedb + + if isinstance(root, int): + page_num = root + root = esedb.page(page_num) + else: + page_num = root.num + + self.root = root + + self._page = root + self._page_num = page_num + self._node_num = 0 + + def reset(self) -> None: + """Reset the internal state to the root of the BTree.""" + self._page = self.root + self._page_num = self._page.num + self._node_num = 0 + + def node(self) -> Node: + """Return the node the BTree is currently on.""" + return self._page.node(self._node_num) + + def next(self) -> Node: + """Move the BTree to the next node and return it. + + Can move the BTree to the next page as a side effect. + """ + if self._node_num + 1 > self._page.node_count - 1: + self.next_page() + else: + self._node_num += 1 + + return self.node() + + def next_page(self) -> None: + """Move the BTree to the next page in the tree. + + Raises: + NoNeighbourPageError: If the current page has no next page. + """ + if self._page.next_page: + self._page = self.esedb.page(self._page.next_page) + self._node_num = 0 + else: + raise NoNeighbourPageError(f"{self._page} has no next page") + + def prev(self) -> Node: + """Move the BTree to the previous node and return it. + + Can move the BTree to the previous page as a side effect. + """ + if self._node_num - 1 < 0: + self.prev_page() + else: + self._node_num -= 1 + + return self.node() + + def prev_page(self) -> None: + """Move the BTree to the previous page in the tree. + + Raises: + NoNeighbourPageError: If the current page has no previous page. + """ + if self._page.previous_page: + self._page = self.esedb.page(self._page.previous_page) + self._node_num = self._page.node_count - 1 + else: + raise NoNeighbourPageError(f"{self._page} has no previous page") + + def search(self, key: bytes, exact: bool = True) -> Node: + """Search the tree for the given key. + + Moves the BTree to the matching node, or on the last node that is less than the requested key. + + Args: + key: The key to search for. + exact: Whether to only return successfully on an exact match. + + Raises: + KeyNotFoundError: If an ``exact`` match was requested but not found. + """ + page = self._page + while True: + node = find_node(page, key) + + if page.is_branch: + page = self.esedb.page(node.child) + else: + self._page = page + self._page_num = page.num + self._node_num = node.num + break + + if exact and key != node.key: + raise KeyNotFoundError(f"Can't find key: {key}") + + return self.node() + + +def find_node(page: Page, key: bytes) -> Node: + """Search a page for a node matching ``key``. + + Args: + page: The page to search. + key: The key to search. + """ + first_node_idx = 0 + last_node_idx = page.node_count - 1 + + node = None + while first_node_idx < last_node_idx: + node_idx = (first_node_idx + last_node_idx) // 2 + node = page.node(node_idx) + + # It turns out that the way BTree keys are compared matches 1:1 with how Python compares bytes + # First compare data, then length + if key < node.key: + last_node_idx = node_idx + elif key == node.key: + if page.is_branch: + # If there's an exact match on a key on a branch page, the actual leaf nodes are in the next branch + # Page keys for branch pages appear to be non-inclusive upper bounds + node_idx = min(node_idx + 1, page.node_count - 1) + node = page.node(node_idx) + + return node + else: + first_node_idx = node_idx + 1 + + # We're at the last node + return page.node(first_node_idx) diff --git a/dissect/esedb/c_esedb.py b/dissect/esedb/c_esedb.py index 018b7df..dfe3c15 100644 --- a/dissect/esedb/c_esedb.py +++ b/dissect/esedb/c_esedb.py @@ -426,6 +426,34 @@ DotNetGuid = 0x00040000, // index over GUID column according to .Net GUID sort order ImmutableStructure = 0x00080000, // Do not write to the input structures during a JetCreateIndexN call. }; + +flag IDBFLAG : uint16 { + Unique = 0x0001, // Duplicate keys not allowed + AllowAllNulls = 0x0002, // Make entries for NULL keys (all segments are null) + AllowFirstNull = 0x0004, // First index column NULL allowed in index + AllowSomeNulls = 0x0008, // Make entries for keys with some null segments + NoNullSeg = 0x0010, // Don't allow a NULL key segment + Primary = 0x0020, // Index is the primary index + LocaleSet = 0x0040, // Index locale information (locale name) is set (JET_bitIndexUnicode was specified). + Multivalued = 0x0080, // Has a multivalued segment + TemplateIndex = 0x0100, // Index of a template table + DerivedIndex = 0x0200, // Index derived from template table + // Note that this flag is persisted, but + // never used in an in-memory IDB, because + // we use the template index IDB instead. + LocalizedText = 0x0400, // Has a unicode text column? (code page is 1200) + SortNullsHigh = 0x0800, // NULL sorts after data + // Jan 2012: MSU is being removed. fidbUnicodeFixupOn should no longer be referenced. + UnicodeFixupOn_Deprecated = 0x1000, // Track entries with undefined Unicode codepoints + CrossProduct = 0x2000, // all combinations of multi-valued columns are indexed + DisallowTruncation = 0x4000, // fail update rather than allow key truncation + NestedTable = 0x8000, // combinations of multi-valued columns of same itagSequence are indexed +}; + +flag IDXFLAG : uint16 { + ExtendedColumns = 0x0001, // IDXSEGs are comprised of JET_COLUMNIDs, not FIDs + DotNetGuid = 0x0002, // GUIDs sort according to .Net rules +}; """ # noqa E501 c_esedb = cstruct().load(esedb_def) @@ -444,6 +472,8 @@ TAGFLD_HEADER = c_esedb.TAGFLD_HEADER CODEPAGE = c_esedb.CODEPAGE COMPRESSION_SCHEME = c_esedb.COMPRESSION_SCHEME +IDBFLAG = c_esedb.IDBFLAG +IDXFLAG = c_esedb.IDXFLAG CODEPAGE_MAP = { CODEPAGE.UNICODE: "utf-16-le", diff --git a/dissect/esedb/cursor.py b/dissect/esedb/cursor.py index 5cb0fbf..b65fd9e 100644 --- a/dissect/esedb/cursor.py +++ b/dissect/esedb/cursor.py @@ -2,145 +2,187 @@ from typing import TYPE_CHECKING -from dissect.esedb.exceptions import KeyNotFoundError, NoNeighbourPageError +from dissect.esedb.btree import BTree +from dissect.esedb.exceptions import NoNeighbourPageError +from dissect.esedb.record import Record if TYPE_CHECKING: - from dissect.esedb.esedb import EseDB - from dissect.esedb.page import Node, Page + from collections.abc import Iterator + + from dissect.esedb.index import Index + from dissect.esedb.page import Node class Cursor: - """A simple cursor implementation for searching the ESE B+Trees + """A simple cursor implementation for searching the ESE indexes. Args: - esedb: An instance of :class:`~dissect.esedb.esedb.EseDB`. - page: The page to open a cursor on. + index: The index to create the cursor for. """ - def __init__(self, esedb: EseDB, page: int | Page): - self.esedb = esedb + def __init__(self, index: Index): + self.index = index + self.table = index.table + self.esedb = index.esedb - if isinstance(page, int): - page_num = page - page = esedb.page(page_num) - else: - page_num = page.num + self._first = BTree(self.esedb, index.root) + self._secondary = None if index.is_primary else BTree(self.esedb, self.table.root) - self._page = page - self._page_num = page_num - self._node_num = 0 + def __iter__(self) -> Iterator[Record]: + while True: + yield self._record() - def node(self) -> Node: - """Return the node the cursor is currently on.""" - return self._page.node(self._node_num) + try: + self._first.next() + except NoNeighbourPageError: + break - def next(self) -> Node: - """Move the cursor to the next node and return it. + def _node(self) -> Node: + """Return the node the cursor is currently on. Resolves the secondary index if needed.""" + node = self._first.node() + if self._secondary: + self._secondary.reset() + node = self._secondary.search(node.data.tobytes(), exact=True) + return node - Can move the cursor to the next page as a side effect. - """ - if self._node_num + 1 > self._page.node_count - 1: - self.next_page() - else: - self._node_num += 1 + def _record(self) -> Record: + """Return the record the cursor is currently on.""" + return Record(self.table, self._node()) - return self.node() + def reset(self) -> None: + """Reset the internal state.""" + self._first.reset() + if self._secondary: + self._secondary.reset() - def next_page(self) -> None: - """Move the cursor to the next page in the tree. + def search(self, **kwargs) -> Record: + """Search the index for the requested values. - Raises: - NoNeighbourPageError: If the current page has no next page. + Searching modifies the cursor state. Searching again will search from the current position. + Reset the cursor with :meth:`reset` to start from the beginning. + + Args: + **kwargs: The columns and values to search for. """ - if self._page.next_page: - self._page = self.esedb.page(self._page.next_page) - self._node_num = 0 - else: - raise NoNeighbourPageError(f"{self._page} has no next page") + key = self.index.make_key(kwargs) + return self.search_key(key, exact=True) - def prev(self) -> Node: - """Move the cursor to the previous node and return it. + def search_key(self, key: bytes, exact: bool = True) -> Record: + """Search for a record with the given key. - Can move the cursor to the previous page as a side effect. + Args: + key: The key to search for. + exact: If ``True``, search for an exact match. If ``False``, sets the cursor on the + next record that is greater than or equal to the key. """ - if self._node_num - 1 < 0: - self.prev_page() - else: - self._node_num -= 1 + self._first.search(key, exact) + return self._record() - return self.node() + def seek(self, **kwargs) -> None: + """Seek to the record with the given values. - def prev_page(self) -> None: - """Move the cursor to the previous page in the tree. + Args: + **kwargs: The columns and values to seek to. + """ + key = self.index.make_key(kwargs) + self.search_key(key, exact=False) - Raises: - NoNeighbourPageError: If the current page has no previous page. + def seek_key(self, key: bytes) -> None: + """Seek to the record with the given key. + + Args: + key: The key to seek to. """ - if self._page.previous_page: - self._page = self.esedb.page(self._page.previous_page) - self._node_num = self._page.node_count - 1 - else: - raise NoNeighbourPageError(f"{self._page} has no previous page") + self._first.search(key, exact=False) - def search(self, key: bytes, exact: bool = True) -> Node: - """Search the tree for the given key. + def find(self, **kwargs) -> Record | None: + """Find a record in the index. - Moves the cursor to the matching node, or on the last node that is less than the requested key. + This differs from :meth:`search` in that it will allow additional filtering on non-indexed columns. Args: - key: The key to search for. - exact: Whether to only return successfully on an exact match. + **kwargs: The columns and values to search for. + """ + return next(self.find_all(**kwargs), None) + + def find_all(self, **kwargs) -> Iterator[Record]: + """Find all records in the index that match the given values. + + This differs from :meth:`search` in that it will allows additional filtering on non-indexed columns. + If you only search on indexed columns, this will yield all records that match the indexed columns. - Raises: - KeyNotFoundError: If an ``exact`` match was requested but not found. + Args: + **kwargs: The columns and values to search for. """ - page = self._page + indexed_columns = {c.name: kwargs.pop(c.name) for c in self.index.columns} + other_columns = kwargs + + # We need at least an exact match on the indexed columns + self.search(**indexed_columns) + + current_key = self._first.node().key + + # Check if we need to move the cursor back to find the first record + while True: + if current_key != self._first.node().key: + self._first.next() + break + + try: + self._first.prev() + except NoNeighbourPageError: + break + while True: - node = find_node(page, key) + # Entries with the same indexed columns are guaranteed to be adjacent + if current_key != self._first.node().key: + break - if page.is_branch: - page = self.esedb.page(node.child) + record = self._record() + for k, v in other_columns.items(): + value = record.get(k) + # If the record value is a list, we do a check based on the queried value + if isinstance(value, list): + # If the queried value is also a list, we check if they are equal + if isinstance(v, list): + if value != v: + break + # Otherwise we check if the queried value is in the record value + elif v not in value: + break + else: + if value != v: + break else: - self._page = page - self._page_num = page.num - self._node_num = node.num + yield record + + try: + self._first.next() + except NoNeighbourPageError: break - if exact and key != node.key: - raise KeyNotFoundError(f"Can't find key: {key}") + def record(self) -> Record: + """Return the record the cursor is currently on.""" + return self._record() - return self.node() + def next(self) -> Record: + """Move the cursor to the next record and return it. + Can move the cursor to the next page as a side effect. + """ + try: + self._first.next() + except NoNeighbourPageError: + raise IndexError("No next record") + return self._record() -def find_node(page: Page, key: bytes) -> Node: - """Search the tree, starting from the given ``page`` and search for ``key``. + def prev(self) -> Record: + """Move the cursor to the previous node and return it. - Args: - page: The page to start searching from. Should be a branch page. - key: The key to search. - """ - first_node_idx = 0 - last_node_idx = page.node_count - 1 - - node = None - while first_node_idx < last_node_idx: - node_idx = (first_node_idx + last_node_idx) // 2 - node = page.node(node_idx) - - # It turns out that the way BTree keys are compared matches 1:1 with how Python compares bytes - # First compare data, then length - if key < node.key: - last_node_idx = node_idx - elif key == node.key: - if page.is_branch: - # If there's an exact match on a key on a branch page, the actual leaf nodes are in the next branch - # Page keys for branch pages appear to be non-inclusive upper bounds - node_idx = min(node_idx + 1, page.node_count - 1) - node = page.node(node_idx) - - return node - else: - first_node_idx = node_idx + 1 - - # We're at the last node - return page.node(first_node_idx) + Can move the cursor to the previous page as a side effect. + """ + try: + self._first.prev() + except NoNeighbourPageError: + raise IndexError("No previous record") + return self._record() diff --git a/dissect/esedb/index.py b/dissect/esedb/index.py index acd0cec..648aa27 100644 --- a/dissect/esedb/index.py +++ b/dissect/esedb/index.py @@ -5,13 +5,13 @@ from functools import cached_property from typing import TYPE_CHECKING -from dissect.esedb.c_esedb import CODEPAGE, JET_bitIndex, JET_coltyp, RecordValue +from dissect.esedb.c_esedb import CODEPAGE, IDBFLAG, IDXFLAG, JET_coltyp, RecordValue from dissect.esedb.cursor import Cursor from dissect.esedb.lcmapstring import map_string -from dissect.esedb.record import Record if TYPE_CHECKING: from dissect.esedb.page import Node, Page + from dissect.esedb.record import Record from dissect.esedb.table import Column, Table @@ -29,16 +29,25 @@ class Index: record: The record in the catalog for this index. """ - def __init__(self, table: Table, record: Record = None): + def __init__(self, table: Table, record: Record | None = None): self.table = table self.record = record self.esedb = table.esedb self.name = record.get("Name") - self.flags = JET_bitIndex(record.get("Flags")) + flags = record.get("Flags") + self.idb_flags = IDBFLAG(flags & 0xFFFF) + self.idx_flags = IDXFLAG(flags >> 16) self._key_most = record.get("KeyMost") or JET_cbKeyMost_OLD self._var_seg_mac = record.get("VarSegMac") or self._key_most + def __repr__(self) -> str: + return f"" + + @property + def is_primary(self) -> bool: + return bool(self.idb_flags & IDBFLAG.Primary) + @cached_property def root(self) -> Page: """Return the root page of this index.""" @@ -60,14 +69,17 @@ def columns(self) -> list[Column]: """Return a list of all columns that are used in this index.""" return [self.table._column_id_map[cid] for cid in self.column_ids] + def cursor(self) -> Cursor: + """Create a new cursor for this index.""" + return Cursor(self) + def search(self, **kwargs) -> Record: """Search the index for the requested values. - Specify the column and value as a keyword argument. + Args: + **kwargs: The columns and values to search for. """ - key = self.make_key(kwargs) - node = self.search_key(key) - return Record(self.table, node) + return self.cursor().search(**kwargs) def search_key(self, key: bytes) -> Node: """Search the index for a specific key. @@ -75,8 +87,7 @@ def search_key(self, key: bytes) -> Node: Args: key: The key to search for. """ - cursor = Cursor(self.esedb, self.root) - return cursor.search(key) + return self.cursor().search_key(key) def key_from_record(self, record: Record) -> bytes: """Generate a key for this index from a record. @@ -112,9 +123,6 @@ def make_key(self, values: dict[str, RecordValue]) -> bytes: key = key[: self._key_most] return key - def __repr__(self) -> str: - return f"" - bPrefixNull = 0x00 bPrefixZeroLength = 0x40 diff --git a/dissect/esedb/page.py b/dissect/esedb/page.py index 83fbb04..538ed5f 100644 --- a/dissect/esedb/page.py +++ b/dissect/esedb/page.py @@ -203,7 +203,7 @@ def __init__(self, page: Page, num: int): self.flags = TAG_FLAG(flags) def __repr__(self) -> str: - return f"" + return f"" class Node: diff --git a/dissect/esedb/record.py b/dissect/esedb/record.py index 498db84..ddafab9 100644 --- a/dissect/esedb/record.py +++ b/dissect/esedb/record.py @@ -256,7 +256,7 @@ def _parse_value(self, column: Column, value: bytes, tag_field: TagField = None) return value - def _parse_multivalue(self, value: bytes, tag_field: TagField) -> bytes: + def _parse_multivalue(self, value: bytes, tag_field: TagField) -> list[bytes]: fSeparatedInstance = 0x8000 if tag_field.flags & TAGFLD_HEADER.TwoValues: @@ -281,6 +281,8 @@ def _parse_multivalue(self, value: bytes, tag_field: TagField) -> bytes: data = self.table.get_long_value(bytes(data)) values.append(data) value = values + else: + raise ValueError(f"Unknown flags for tag field: {tag_field}") if tag_field.flags & TAGFLD_HEADER.Compressed: # Only the first entry appears to be compressed diff --git a/dissect/esedb/table.py b/dissect/esedb/table.py index ff8ccef..917a8d4 100644 --- a/dissect/esedb/table.py +++ b/dissect/esedb/table.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any from dissect.esedb import compression +from dissect.esedb.btree import BTree from dissect.esedb.c_esedb import ( CODEPAGE, COLUMN_TYPE_MAP, @@ -12,7 +13,6 @@ ColumnType, JET_coltyp, ) -from dissect.esedb.cursor import Cursor from dissect.esedb.exceptions import NoNeighbourPageError from dissect.esedb.index import Index from dissect.esedb.record import Record @@ -20,6 +20,7 @@ if TYPE_CHECKING: from collections.abc import Iterator + from dissect.esedb.cursor import Cursor from dissect.esedb.esedb import EseDB from dissect.esedb.page import Page @@ -57,7 +58,7 @@ def __init__( self.name = name self.root_page = root_page self.columns: list[Column] = [] - self.indexes = [] + self.indexes: list[Index] = [] # Set by the catalog during parsing self._long_value_record: Record = None @@ -76,7 +77,7 @@ def __init__( self.record = record def __repr__(self) -> str: - return f"" + return f"
" @cached_property def root(self) -> Page: @@ -114,6 +115,21 @@ def column_names(self) -> list[str]: """Return a list of all the column names.""" return list(self._column_name_map.keys()) + @property + def primary_index(self) -> Index | None: + # It's generally the first index, but loop just in case + for index in self.indexes: + if index.is_primary: + return index + return None + + def cursor(self) -> Cursor | None: + """Create a new cursor for this table.""" + primary_idx = self.primary_index + if primary_idx: + return primary_idx.cursor() + return None + def index(self, name: str) -> Index: """Return the index with the given name. @@ -128,6 +144,39 @@ def index(self, name: str) -> Index: except KeyError: raise KeyError(f"No index with this name in table {self.name}: {name}") + def find_index(self, column_names: list[str]) -> Index | None: + """Find the most suitable index to search for the given columns. + + Args: + column_names: A list of column names to find the best index for. + """ + best_match = 0 + best_index = None + for index in self.indexes: + # We want to find the index that has the most matching columns in the order they are indexed + i = 0 + for column in index.columns: + if column.name not in column_names: + break + i += 1 + + if i > best_match: + best_index = index + best_match = i + + return best_index + + def search(self, **kwargs) -> Record | None: + """Search for a record in the table. + + Args: + **kwargs: The columns and values to search for. + + Returns: + The first record that matches the search criteria, or None if no record was found. + """ + return self.cursor().search(**kwargs) + def records(self) -> Iterator[Record]: """Return an iterator of all the records of the table.""" for node in self.root.iter_leaf_nodes(): @@ -140,8 +189,8 @@ def get_long_value(self, key: bytes) -> bytes: key: The lookup key for the long value. """ rkey = key[::-1] - cursor = Cursor(self.esedb, self.lv_page) - header = cursor.search(rkey) + btree = BTree(self.esedb, self.lv_page) + header = btree.search(rkey) _, size = struct.unpack("<2I", header.data) chunks = [] @@ -149,7 +198,7 @@ def get_long_value(self, key: bytes) -> bytes: while True: try: - node = cursor.next() + node = btree.next() if not node.key.startswith(rkey): break except NoNeighbourPageError: @@ -201,6 +250,9 @@ def __init__(self, identifier: int, name: str, type_: JET_coltyp, record: Record self.record = record + def __repr__(self) -> str: + return f"" + @property def offset(self) -> int: return self._offset @@ -247,9 +299,6 @@ def encoding(self) -> CODEPAGE | None: def ctype(self) -> ColumnType: return COLUMN_TYPE_MAP[self.type.value] - def __repr__(self) -> str: - return f"" - class Catalog: """Parse and interact with the catalog table. diff --git a/dissect/esedb/tools/sru.py b/dissect/esedb/tools/sru.py index c36a151..5a8c78d 100644 --- a/dissect/esedb/tools/sru.py +++ b/dissect/esedb/tools/sru.py @@ -146,7 +146,7 @@ def __getattr__(self, attr: str) -> RecordValue: def __repr__(self) -> str: column_values = serialise_record_column_values(self.record) - return f"" + return f"" def main() -> None: diff --git a/tests/test_cursor.py b/tests/test_cursor.py new file mode 100644 index 0000000..cf8e096 --- /dev/null +++ b/tests/test_cursor.py @@ -0,0 +1,59 @@ +from typing import BinaryIO + +from dissect.esedb.esedb import EseDB + + +def test_cursor(basic_db: BinaryIO) -> None: + db = EseDB(basic_db) + table = db.table("basic") + idx = table.index("IxId") + + cursor = idx.cursor() + record = cursor.search(Id=1) + assert record.Id == 1 + record = cursor.next() + assert record.Id == 2 + record = cursor.prev() + assert record.Id == 1 + assert record.Id == cursor.record().Id + + +def test_cursor_iterator(basic_db: BinaryIO) -> None: + db = EseDB(basic_db) + table = db.table("basic") + idx = table.index("IxId") + + cursor = idx.cursor() + records = list(cursor) + assert len(records) == 2 + assert records[0].Id == 1 + assert records[1].Id == 2 + + +def test_cursor_search(ual_db: BinaryIO) -> None: + db = EseDB(ual_db) + table = db.table("CLIENTS") + idx = table.index("Username_RoleGuid_TenantId_index") + + cursor = idx.cursor() + records = list( + cursor.find_all( + AuthenticatedUserName="blackclover\\administrator", + RoleGuid="ad495fc3-0eaa-413d-ba7d-8b13fa7ec598", + TenantId="2417e4c3-5467-40c5-809b-12b59a86c102", + ) + ) + + assert len(records) == 5 + + cursor.reset() + records = list( + cursor.find_all( + AuthenticatedUserName="blackclover\\administrator", + RoleGuid="ad495fc3-0eaa-413d-ba7d-8b13fa7ec598", + TenantId="2417e4c3-5467-40c5-809b-12b59a86c102", + Day204=4, + ) + ) + + assert len(records) == 1 diff --git a/tests/test_index.py b/tests/test_index.py index 80ec47a..02acc7b 100644 --- a/tests/test_index.py +++ b/tests/test_index.py @@ -13,76 +13,95 @@ def test_index(index_db: BinaryIO) -> None: assert table.indexes[0].name == "IxId" assert table.indexes[0].column_ids == [1] - assert table.indexes[0].search(Id=1) + record = table.indexes[0].search(Id=1) + assert record.Id == 1 assert table.indexes[1].name == "IxBit" assert table.indexes[1].column_ids == [2] - assert table.indexes[1].search(Bit=False) + record = table.indexes[1].search(Bit=False) + assert record.Bit is False assert table.indexes[2].name == "IxUnsignedByte" assert table.indexes[2].column_ids == [3] - assert table.indexes[2].search(UnsignedByte=213) + record = table.indexes[2].search(UnsignedByte=213) + assert record.UnsignedByte == 213 assert table.indexes[3].name == "IxShort" assert table.indexes[3].column_ids == [4] - assert table.indexes[3].search(Short=-1337) + record = table.indexes[3].search(Short=-1337) + assert record.Short == -1337 assert table.indexes[4].name == "IxLong" assert table.indexes[4].column_ids == [5] - assert table.indexes[4].search(Long=-13371337) + record = table.indexes[4].search(Long=-13371337) + assert record.Long == -13371337 assert table.indexes[5].name == "IxCurrenc" assert table.indexes[5].column_ids == [6] - assert table.indexes[5].search(Currency=1337133713371337) + record = table.indexes[5].search(Currency=1337133713371337) + assert record.Currency == 1337133713371337 assert table.indexes[6].name == "IxIEEESingle" assert table.indexes[6].column_ids == [7] - assert table.indexes[6].search(IEEESingle=1.0) + record = table.indexes[6].search(IEEESingle=1.0) + assert record.IEEESingle == 1.0 assert table.indexes[7].name == "IxIEEEDouble" assert table.indexes[7].column_ids == [8] - assert table.indexes[7].search(IEEEDouble=13371337.13371337) + record = table.indexes[7].search(IEEEDouble=13371337.13371337) + assert record.IEEEDouble == 13371337.13371337 assert table.indexes[8].name == "IxDateTime" assert table.indexes[8].column_ids == [9] - assert table.indexes[8].search(DateTime=4675210852477960192) + record = table.indexes[8].search(DateTime=4675210852477960192) + assert record.DateTime == 4675210852477960192 assert table.indexes[9].name == "IxUnsignedLong" assert table.indexes[9].column_ids == [10] - assert table.indexes[9].search(UnsignedLong=13371337) + record = table.indexes[9].search(UnsignedLong=13371337) + assert record.UnsignedLong == 13371337 assert table.indexes[10].name == "IxLongLong" assert table.indexes[10].column_ids == [11] - assert table.indexes[10].search(LongLong=-13371337) + record = table.indexes[10].search(LongLong=-13371337) + assert record.LongLong == -13371337 assert table.indexes[11].name == "IxGUID" assert table.indexes[11].column_ids == [12] - assert table.indexes[11].search(GUID="3f360af1-6766-46dc-9af2-0dacf295c2a1") + record = table.indexes[11].search(GUID="3f360af1-6766-46dc-9af2-0dacf295c2a1") + assert record.GUID == "3f360af1-6766-46dc-9af2-0dacf295c2a1" assert table.indexes[12].name == "IxUnsignedShort" assert table.indexes[12].column_ids == [13] - assert table.indexes[12].search(UnsignedShort=1337) + record = table.indexes[12].search(UnsignedShort=1337) + assert record.UnsignedShort == 1337 assert table.indexes[13].name == "IxBinary" assert table.indexes[13].column_ids == [128] - assert table.indexes[13].search(Binary=b"test binary data") + record = table.indexes[13].search(Binary=b"test binary data") + assert record.Binary == b"test binary data" assert table.indexes[14].name == "IxLongBinary" assert table.indexes[14].column_ids == [256] - assert table.indexes[14].search(LongBinary=b"test long binary data " + (b"a" * 1024)) + record = table.indexes[14].search(LongBinary=b"test long binary data " + (b"a" * 1000)) + assert record.LongBinary == b"test long binary data " + (b"a" * 1000) assert table.indexes[15].name == "IxASCII" assert table.indexes[15].column_ids == [129] - assert table.indexes[15].search(ASCII="Simple ASCII text") + record = table.indexes[15].search(ASCII="Simple ASCII text") + assert record.ASCII == "Simple ASCII text" assert table.indexes[16].name == "IxUnicode" assert table.indexes[16].column_ids == [130] - assert table.indexes[16].search(Unicode="Simple Unicode text 🦊") + record = table.indexes[16].search(Unicode="Simple Unicode text 🦊") + assert record.Unicode == "Simple Unicode text 🦊" assert table.indexes[17].name == "IxLongASCII" assert table.indexes[17].column_ids == [257] - assert table.indexes[17].search(LongASCII="Long ASCII text " + ("a" * 1024)) + record = table.indexes[17].search(LongASCII="Long ASCII text " + ("a" * 1024)) + assert record.LongASCII == "Long ASCII text " + ("a" * 1024) assert table.indexes[18].name == "IxLongUnicode" assert table.indexes[18].column_ids == [258] - assert table.indexes[18].search(LongUnicode="Long Unicode text 🦊 " + ("a" * 1024)) + record = table.indexes[18].search(LongUnicode="Long Unicode text 🦊 " + ("a" * 1024)) + assert record.LongUnicode == "Long Unicode text 🦊 " + ("a" * 1024) diff --git a/tests/test_table.py b/tests/test_table.py new file mode 100644 index 0000000..5260240 --- /dev/null +++ b/tests/test_table.py @@ -0,0 +1,32 @@ +from unittest.mock import MagicMock + +from dissect.esedb.table import Table + + +def test_find_index() -> None: + mock_column_id = MagicMock() + mock_column_id.name = "Id" + mock_column_bit = MagicMock() + mock_column_bit.name = "Bit" + mock_column_unsigned_byte = MagicMock() + mock_column_unsigned_byte.name = "UnsignedByte" + + mock_idx_id = MagicMock(name="IxId") + mock_idx_id.is_primary = True + mock_idx_id.columns = [mock_column_id] + mock_idx_bit = MagicMock(name="IxBit") + mock_idx_bit.is_primary = False + mock_idx_bit.columns = [mock_column_bit] + mock_idx_multiple = MagicMock(name="IxMultiple") + mock_idx_multiple.is_primary = False + mock_idx_multiple.columns = [mock_column_bit, mock_column_unsigned_byte] + + table = Table(MagicMock(), 69, "index", indexes=[mock_idx_id, mock_idx_bit, mock_idx_multiple]) + + assert table.find_index(["Id"]) == mock_idx_id + assert table.find_index(["Bit"]) == mock_idx_bit + assert table.find_index(["Bit", "UnsignedByte"]) == mock_idx_multiple + assert table.find_index(["UnsignedByte", "Bit"]) == mock_idx_multiple + assert table.find_index(["UnsignedByte"]) is None + assert table.find_index(["Id", "Bit"]) == mock_idx_id + assert table.find_index(["Bit", "SomethingElse"]) == mock_idx_bit