Skip to content

Commit

Permalink
Use new data model differentiating single line loop blocks from simpl…
Browse files Browse the repository at this point in the history
…e blocks (#36)

* all parsing tests passing with new data model

* all writing tests passing with new data model

* functional interface at parity

* move typealias import to backport for py3.8/3.9

* add check read/write round trip on a file with simple blocks and loop blocks
  • Loading branch information
alisterburt authored Sep 21, 2023
1 parent 9be107a commit 0dd3666
Show file tree
Hide file tree
Showing 12 changed files with 444 additions and 465 deletions.
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ packages = find:
python_requires = >= 3.8
include_package_data = True
install_requires =
numpy
pandas
typing-extensions


[options.extras_require]
Expand Down
41 changes: 23 additions & 18 deletions starfile/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from .parser import StarParser
from .writer import StarWriter
from .typing import DataBlock

if TYPE_CHECKING:
import pandas as pd
Expand All @@ -21,22 +22,26 @@ def read(filename: PathLike, read_n_blocks: int = None, always_dict: bool = Fals
default behaviour in the case of only one data block being present in the STAR file is to
return only a dataframe, this can be changed by setting 'always_dict=True'
"""
star = StarParser(filename, read_n_blocks=read_n_blocks)
if len(star.dataframes) == 1 and always_dict is False:
return star.first_dataframe
parser = StarParser(filename, n_blocks_to_read=read_n_blocks)
if len(parser.data_blocks) == 1 and always_dict is False:
return list(parser.data_blocks.values())[0]
else:
return star.dataframes


def write(data: Union[pd.DataFrame, Dict[str, pd.DataFrame], List[pd.DataFrame]],
filename: PathLike,
float_format: str = '%.6f', sep: str = '\t', na_rep: str = '<NA>',
overwrite: bool = False, force_loop: bool = True):
"""
Write dataframes from pandas dataframe(s) to a star file
data can be a single dataframe, a list of dataframes or a dict of dataframes
float format defaults to 6 digits after the decimal point
"""
StarWriter(data, filename=filename, float_format=float_format, overwrite=overwrite,
na_rep=na_rep, sep=sep, force_loop=force_loop)
return parser.data_blocks


def write(
data: Union[DataBlock, Dict[str, DataBlock], List[DataBlock]],
filename: PathLike,
float_format: str = '%.6f',
sep: str = '\t',
na_rep: str = '<NA>',
**kwargs,
):
"""Write data blocks as STAR files."""
StarWriter(
data,
filename=filename,
float_format=float_format,
na_rep=na_rep,
separator=sep
)
295 changes: 110 additions & 185 deletions starfile/parser.py
Original file line number Diff line number Diff line change
@@ -1,217 +1,142 @@
from __future__ import annotations

from collections import OrderedDict
from collections import deque
from io import StringIO
from linecache import getline

import numpy as np
import pandas as pd
from pathlib import Path
from typing import TYPE_CHECKING, List, Union, Optional
from typing import TYPE_CHECKING, Union, Optional, Dict, Tuple

from .utils import TextBuffer, TextCrawler
from starfile.typing import DataBlock

if TYPE_CHECKING:
from os import PathLike


class StarParser:
def __init__(self, filename: PathLike, read_n_blocks: Optional[int] = None):
filename: Path
n_lines_in_file: int
n_blocks_to_read: int
current_line_number: int
data_blocks: Dict[DataBlock]

def __init__(self, filename: PathLike, n_blocks_to_read: Optional[int] = None):
# set filename, with path checking
filename = Path(filename)
if not filename.exists():
raise FileNotFoundError(filename)
self.filename = filename

# initialise attributes for parsing
self.text_buffer = TextBuffer()
self.crawler = TextCrawler(self.filename)
self.read_n_blocks = read_n_blocks
self._dataframes = OrderedDict()
self._current_dataframe_index = 0
self._initialise_n_lines()
# setup for parsing
self.data_blocks = {}
self.n_lines_in_file = count_lines(self.filename)
self.n_blocks_to_read = n_blocks_to_read

# parse file
self.current_line_number = 0
self.parse_file()

def parse_file(self):
while self.crawler.current_line_number <= self.n_lines:
if len(self.dataframes) == self.read_n_blocks:
break

elif self.crawler.current_line.startswith('data_'):
self._parse_data_block()

if not self.crawler.current_line.startswith('data_'):
self.crawler.increment_line_number()

self.dataframes_to_numeric()
return

def _parse_data_block(self):
self.current_block_name = self._block_name_from_current_line()

while self.crawler.current_line_number <= self.n_lines:
self.crawler.increment_line_number()
line = self.crawler.current_line

if line.startswith('loop_'):
self._parse_loop_block()
return

elif line.startswith(
'data_') or self.crawler.current_line_number == self.n_lines:
self._parse_simple_block_from_buffer()
return

self.text_buffer.add_line(line)
return

def _parse_simple_block_from_buffer(self):
data = self._clean_simple_block_in_buffer()

df = self._cleaned_simple_block_to_dataframe(data)
df.name = self._current_data_block_name
self._add_dataframe(df)

self.text_buffer.clear()

def _parse_loop_block(self):
self.crawler.increment_line_number()
header = self._parse_loop_header()
df = self._parse_loop_data()
if df is None:
df = pd.DataFrame({h: None for h in header}, index=[0])
df.columns = header
df.name = self._current_data_block_name
self._add_dataframe(df)
return

@property
def filename(self):
return self._filename

@filename.setter
def filename(self, filename: Union[str, Path]):
filename = Path(filename)
if filename.exists():
self._filename = filename
else:
raise FileNotFoundError

@property
def n_lines(self):
return self._n_lines

def _initialise_n_lines(self):
self._n_lines = self.crawler.count_lines()

@property
def dataframes(self):
return self._dataframes
def current_line(self) -> str:
return getline(str(self.filename), self.current_line_number).strip()

def _add_dataframe(self, df: pd.DataFrame):
key = self._get_dataframe_key(df)
self.dataframes[key] = df
self._increment_dataframe_index()

@property
def current_block_name(self):
return self._current_data_block_name

@current_block_name.setter
def current_block_name(self, name: str):
self._current_data_block_name = name

@property
def current_dataframe_index(self):
return self._current_dataframe_index

def _increment_dataframe_index(self):
self._current_dataframe_index += 1

def _get_dataframe_key(self, df):
name = df.name

if name == '' or isinstance(name, int) or name in self.dataframes.keys():
return self._current_dataframe_index
else:
return df.name

def _clean_simple_block_in_buffer(self):
clean_datablock = {}

for line in self.text_buffer.buffer:
if line == '' or line.startswith('#'):
continue

heading_name = self.heading_from_line(line)
value = line.split()[1]
clean_datablock[heading_name] = value

return clean_datablock

@staticmethod
def _cleaned_simple_block_to_dataframe(data: dict):
return pd.DataFrame(data, columns=data.keys(), index=[0])

def _parse_loop_header(self) -> List[str]:
self.text_buffer.clear()

while self.crawler.current_line.startswith('_'):
heading = self.heading_from_line(self.crawler.current_line)
self.text_buffer.add_line(heading)
self.crawler.increment_line_number()

return self.text_buffer.buffer

def _parse_loop_data(self) -> Union[pd.DataFrame, None]:
self.text_buffer.clear()

while self.crawler.current_line_number <= self.n_lines:
current_line = self.crawler.current_line
if current_line.startswith('data_'):
def parse_file(self):
while self.current_line_number <= self.n_lines_in_file:
if len(self.data_blocks) == self.n_blocks_to_read:
break
elif self.current_line.startswith('data_'):
block_name, block = self._parse_data_block()
self.data_blocks[block_name] = block
else:
self.current_line_number += 1

def _parse_data_block(self) -> Tuple[str, DataBlock]:
# current line starts with 'data_foo'
block_name = self.current_line[5:] # 'data_foo' -> 'foo'
self.current_line_number += 1

# iterate over file,
while self.current_line_number <= self.n_lines_in_file:
self.current_line_number += 1
if self.current_line.startswith('loop_'):
return block_name, self._parse_loop_block()
elif self.current_line.startswith('_'): # line is simple block
return block_name, self._parse_simple_block()

def _parse_simple_block(self) -> Dict[str, Union[str, int, float]]:
block = {}
while self.current_line_number <= self.n_lines_in_file:
if self.current_line.startswith('data'):
break
self.text_buffer.add_line(current_line)
self.crawler.increment_line_number()

# check whether the buffer is empty
if self.text_buffer.is_empty:
return None

df = pd.read_csv(
StringIO(self.text_buffer.as_str()),
delim_whitespace=True,
header=None,
comment='#'
)
elif self.current_line.startswith('_'): # '_foo bar'
k, v = self.current_line.split()
block[k[1:]] = numericise(v)
self.current_line_number += 1
return block

def _parse_loop_block(self) -> pd.DataFrame:
# parse loop header
loop_column_names = deque()
self.current_line_number += 1

while self.current_line.startswith('_'):
column_name = self.current_line.split()[0][1:]
loop_column_names.append(column_name)
self.current_line_number += 1

# now parse the loop block data
loop_data = deque()
while self.current_line_number <= self.n_lines_in_file:
if self.current_line.startswith('data_'):
break
loop_data.append(self.current_line)
self.current_line_number += 1
loop_data = '\n'.join(loop_data)
if loop_data[-2:] != '\n':
loop_data += '\n'

# put string data into a dataframe
if loop_data == '\n':
n_cols = len(loop_column_names)
df = pd.DataFrame(np.zeros(shape=(0, n_cols)))
else:
df = pd.read_csv(
StringIO(loop_data),
delim_whitespace=True,
header=None,
comment='#'
)
df = df.apply(pd.to_numeric, errors='ignore')
df.columns = loop_column_names
return df

def dataframes_to_numeric(self):
"""
Converts strings in dataframes into numerical values where possible

applying pd.to_numeric causes loss of 'name' attribute of DataFrame,
need to extract name and reapply inline
"""
for key, df in self.dataframes.items():
name = getattr(df, 'name', None)
self.dataframes[key] = df.apply(pd.to_numeric, errors='ignore')
if name is not None:
self.dataframes[key].name = name
def count_lines(file: Path) -> int:
with open(file, 'rb') as f:
return sum(1 for _ in f)

@staticmethod
def _block_name_from_line(line: str):
return line[5:]

def _block_name_from_current_line(self):
return self._block_name_from_line(self.crawler.current_line)
def block_name_from_line(line: str) -> str:
"""'data_general' -> 'general'"""
return line[5:]

@staticmethod
def heading_from_line(line: str):
return line.split()[0][1:]

@property
def first_dataframe(self):
return self.dataframe_at_index(0)
def heading_from_line(line: str) -> str:
"""'_rlnSpectralIndex #1' -> 'rlnSpectralIndex'."""
return line.split()[0][1:]

def dataframe_at_index(self, idx: int):
return self.dataframes_as_list()[idx]

def dataframes_as_list(self):
return list(self.dataframes.values())
def numericise(value: str) -> Union[str, int, float]:
try:
# Try to convert the string value to an integer
value = int(value)
except ValueError:
try:
# If it's not an integer, try to convert it to a float
value = float(value)
except ValueError:
# If it's not a float either, leave it as a string
value = value
return value
Loading

0 comments on commit 0dd3666

Please sign in to comment.