Skip to content

Commit

Permalink
fix: fix hybrid chunker token constraint
Browse files Browse the repository at this point in the history
Signed-off-by: Panos Vagenas <[email protected]>
  • Loading branch information
vagenas committed Jan 17, 2025
1 parent 618df13 commit f763699
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 106 deletions.
185 changes: 79 additions & 106 deletions docling_core/transforms/chunker/hybrid_chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ def _patch_tokenizer_and_max_tokens(self) -> Self:
)
return self

def _count_tokens(self, text: Optional[Union[str, list[str]]]):
def _count_text_tokens(self, text: Optional[Union[str, list[str]]]):
if text is None:
return 0
elif isinstance(text, list):
total = 0
for t in text:
total += self._count_tokens(t)
total += self._count_text_tokens(t)
return total
return len(self._tokenizer.tokenize(text, max_length=None))

Expand All @@ -80,102 +80,83 @@ class _ChunkLengthInfo(BaseModel):
text_len: int
other_len: int

def _count_chunk_tokens(self, doc_chunk: DocChunk):
ser_txt = self.serialize(chunk=doc_chunk)
return len(self._tokenizer.tokenize(text=ser_txt, max_length=None))

def _doc_chunk_length(self, doc_chunk: DocChunk):
text_length = self._count_tokens(doc_chunk.text)
headings_length = self._count_tokens(doc_chunk.meta.headings)
captions_length = self._count_tokens(doc_chunk.meta.captions)
total = text_length + headings_length + captions_length
text_length = self._count_text_tokens(doc_chunk.text)
total = self._count_chunk_tokens(doc_chunk=doc_chunk)
return self._ChunkLengthInfo(
total_len=total,
text_len=text_length,
other_len=total - text_length,
)

def _make_chunk_from_doc_items(
self, doc_chunk: DocChunk, window_text: str, window_start: int, window_end: int
self, doc_chunk: DocChunk, window_start: int, window_end: int
):
doc_items = doc_chunk.meta.doc_items[window_start : window_end + 1]
meta = DocMeta(
doc_items=doc_chunk.meta.doc_items[window_start : window_end + 1],
doc_items=doc_items,
headings=doc_chunk.meta.headings,
captions=doc_chunk.meta.captions,
origin=doc_chunk.meta.origin,
)
window_text = (
doc_chunk.text
if len(doc_chunk.meta.doc_items) == 1
else self.delim.join(
[
doc_item.text
for doc_item in doc_items
if isinstance(doc_item, TextItem)
]
)
)
new_chunk = DocChunk(text=window_text, meta=meta)
return new_chunk

def _merge_text(self, t1, t2):
if t1 == "":
return t2
elif t2 == "":
return t1
else:
return f"{t1}{self.delim}{t2}"

def _split_by_doc_items(self, doc_chunk: DocChunk) -> list[DocChunk]:
if doc_chunk.meta.doc_items is None or len(doc_chunk.meta.doc_items) <= 1:
return [doc_chunk]
length = self._doc_chunk_length(doc_chunk)
if length.total_len <= self.max_tokens:
return [doc_chunk]
else:
chunks = []
window_start = 0
window_end = 0
window_text = ""
window_text_length = 0
other_length = length.other_len
num_items = len(doc_chunk.meta.doc_items)
while window_end < num_items:
doc_item = doc_chunk.meta.doc_items[window_end]
if isinstance(doc_item, TextItem):
text = doc_item.text
else:
raise RuntimeError("Non-TextItem split not implemented yet")
text_length = self._count_tokens(text)
if (
text_length + window_text_length + other_length < self.max_tokens
and window_end < num_items - 1
):
chunks = []
window_start = 0
window_end = 0 # an inclusive index
num_items = len(doc_chunk.meta.doc_items)
while window_end < num_items:
new_chunk = self._make_chunk_from_doc_items(
doc_chunk=doc_chunk,
window_start=window_start,
window_end=window_end,
)
if self._count_chunk_tokens(doc_chunk=new_chunk) <= self.max_tokens:
if window_end < num_items - 1:
window_end += 1
# Still room left to add more to this chunk AND still at least one
# item left
window_end += 1
window_text_length += text_length
window_text = self._merge_text(window_text, text)
elif text_length + window_text_length + other_length < self.max_tokens:
continue
else:
# All the items in the window fit into the chunk and there are no
# other items left
window_text = self._merge_text(window_text, text)
new_chunk = self._make_chunk_from_doc_items(
doc_chunk, window_text, window_start, window_end
)
chunks.append(new_chunk)
window_end = num_items
elif window_start == window_end:
# Only one item in the window and it doesn't fit into the chunk. So
# we'll just make it a chunk for now and it will get split in the
# plain text splitter.
window_text = self._merge_text(window_text, text)
new_chunk = self._make_chunk_from_doc_items(
doc_chunk, window_text, window_start, window_end
)
chunks.append(new_chunk)
window_start = window_end + 1
window_end = window_start
window_text = ""
window_text_length = 0
else:
# Multiple items in the window but they don't fit into the chunk.
# However, the existing items must have fit or we wouldn't have
# gotten here. So we put everything but the last item into the chunk
# and then start a new window INCLUDING the current window end.
new_chunk = self._make_chunk_from_doc_items(
doc_chunk, window_text, window_start, window_end - 1
)
chunks.append(new_chunk)
window_start = window_end
window_text = ""
window_text_length = 0
return chunks
window_end = num_items # signalizing the last loop
elif window_start == window_end:
# Only one item in the window and it doesn't fit into the chunk. So
# we'll just make it a chunk for now and it will get split in the
# plain text splitter.
window_end += 1
window_start = window_end
else:
# Multiple items in the window but they don't fit into the chunk.
# However, the existing items must have fit or we wouldn't have
# gotten here. So we put everything but the last item into the chunk
# and then start a new window INCLUDING the current window end.
new_chunk = self._make_chunk_from_doc_items(
doc_chunk=doc_chunk,
window_start=window_start,
window_end=window_end - 1,
)
window_start = window_end
chunks.append(new_chunk)
return chunks

def _split_using_plain_text(
self,
Expand Down Expand Up @@ -204,53 +185,45 @@ def _split_using_plain_text(
def _merge_chunks_with_matching_metadata(self, chunks: list[DocChunk]):
output_chunks = []
window_start = 0
window_end = 0
window_end = 0 # an inclusive index
num_chunks = len(chunks)
while window_end < num_chunks:
chunk = chunks[window_end]
lengths = self._doc_chunk_length(chunk)
headings_and_captions = (chunk.meta.headings, chunk.meta.captions)
ready_to_append = False
if window_start == window_end:
# starting a new block of chunks to potentially merge
current_headings_and_captions = headings_and_captions
window_text = chunk.text
window_other_length = lengths.other_len
window_text_length = lengths.text_len
window_items = chunk.meta.doc_items
window_end += 1
first_chunk_of_window = chunk
elif (
headings_and_captions == current_headings_and_captions
and window_text_length + window_other_length + lengths.text_len
<= self.max_tokens
):
# there is room to include the new chunk so add it to the window and
# continue
window_text = self._merge_text(window_text, chunk.text)
window_text_length += lengths.text_len
window_items = window_items + chunk.meta.doc_items
window_end += 1
else:
ready_to_append = True

chks = chunks[window_start : window_end + 1]
doc_items = [it for chk in chks for it in chk.meta.doc_items]
candidate = DocChunk(
text=self.delim.join([chk.text for chk in chks]),
meta=DocMeta(
doc_items=doc_items,
headings=current_headings_and_captions[0],
captions=current_headings_and_captions[1],
origin=chunk.meta.origin,
),
)
if (
headings_and_captions == current_headings_and_captions
and self._count_chunk_tokens(doc_chunk=candidate) <= self.max_tokens
):
# there is room to include the new chunk so add it to the window and
# continue
window_end += 1
new_chunk = candidate
else:
ready_to_append = True
if ready_to_append or window_end == num_chunks:
# no more room OR the start of new metadata. Either way, end the block
# and use the current window_end as the start of a new block
if window_start + 1 == window_end:
# just one chunk so use it as is
output_chunks.append(first_chunk_of_window)
else:
new_meta = DocMeta(
doc_items=window_items,
headings=current_headings_and_captions[0],
captions=current_headings_and_captions[1],
origin=chunk.meta.origin,
)
new_chunk = DocChunk(
text=window_text,
meta=new_meta,
)
output_chunks.append(new_chunk)
# no need to reset window_text, etc. because that will be reset in the
# next iteration in the if window_start == window_end block
Expand Down
Loading

0 comments on commit f763699

Please sign in to comment.