Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Load regex support #513

Merged
merged 23 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added data/sample_videos/1/1.mp4
Binary file not shown.
Binary file added data/sample_videos/1/2.mp4
Binary file not shown.
Binary file added data/sample_videos/2/1.mp4
Binary file not shown.
2 changes: 2 additions & 0 deletions eva/binder/binder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import TYPE_CHECKING, List

from eva.catalog.catalog_utils import is_video_table
from eva.catalog.sql_config import IDENTIFIER_COLUMN

if TYPE_CHECKING:
from eva.binder.statement_binder_context import StatementBinderContext
Expand Down Expand Up @@ -64,6 +65,7 @@ def extend_star(
[
TupleValueExpression(col_name=col_name, table_alias=alias)
for alias, col_name in col_objs
if col_name != IDENTIFIER_COLUMN
]
)
return target_list
Expand Down
15 changes: 9 additions & 6 deletions eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
)
from eva.binder.statement_binder_context import StatementBinderContext
from eva.catalog.catalog_manager import CatalogManager
from eva.catalog.catalog_type import ColumnType, NdArrayType
from eva.catalog.catalog_type import ColumnType, NdArrayType, TableType
from eva.expression.abstract_expression import AbstractExpression
from eva.expression.function_expression import FunctionExpression
from eva.expression.tuple_value_expression import TupleValueExpression
from eva.parser.alias import Alias
from eva.parser.create_index_statement import CreateIndexStatement
from eva.parser.create_mat_view_statement import CreateMaterializedViewStatement
from eva.parser.drop_statement import DropTableStatement
from eva.parser.explain_statement import ExplainStatement
from eva.parser.rename_statement import RenameTableStatement
from eva.parser.select_statement import SelectStatement
from eva.parser.statement import AbstractStatement
from eva.parser.table_ref import TableRef
Expand Down Expand Up @@ -126,10 +126,13 @@ def _bind_create_mat_statement(self, node: CreateMaterializedViewStatement):
self.bind(node.query)
# Todo Verify if the number projected columns matches table

@bind.register(DropTableStatement)
def _bind_drop_table_statement(self, node: DropTableStatement):
for table in node.table_refs:
self.bind(table)
@bind.register(RenameTableStatement)
def _bind_rename_table_statement(self, node: RenameTableStatement):
self.bind(node.old_table_ref)
if node.old_table_ref.table.table_obj.table_type == TableType.STRUCTURED_DATA:
err_msg = "Rename not yet supported on structured data"
logger.exception(err_msg)
raise BinderError(err_msg)

@bind.register(TableRef)
def _bind_tableref(self, node: TableRef):
Expand Down
70 changes: 59 additions & 11 deletions eva/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from eva.catalog.sql_config import IDENTIFIER_COLUMN
from eva.parser.create_statement import ColConstraintInfo, ColumnDefinition
from eva.parser.table_ref import TableInfo
from eva.utils.errors import CatalogError
from eva.utils.generic_utils import generate_file_path
from eva.utils.logging_manager import logger

Expand Down Expand Up @@ -153,7 +154,10 @@ def create_video_metadata(self, name: str) -> DataFrameMetadata:
return metadata

def create_table_metadata(
self, table_info: TableInfo, columns: List[ColumnDefinition]
self,
table_info: TableInfo,
columns: List[ColumnDefinition],
identifier_column: str = "id",
) -> DataFrameMetadata:
table_name = table_info.table_name
column_metadata_list = self.create_columns_metadata(columns)
Expand All @@ -162,6 +166,7 @@ def create_table_metadata(
table_name,
file_url,
column_metadata_list,
identifier_column=identifier_column,
table_type=TableType.STRUCTURED_DATA,
)
return metadata
Expand Down Expand Up @@ -193,7 +198,7 @@ def create_column_metadata(
data_type: ColumnType,
array_type: NdArrayType,
dimensions: List[int],
cci: ColConstraintInfo,
cci: ColConstraintInfo = ColConstraintInfo(),
) -> DataFrameColumn:
"""Create a dataframe column object this column.
This function won't commit this object in the catalog database.
Expand Down Expand Up @@ -342,18 +347,18 @@ def get_udf_outputs(self, udf_obj: UdfMetadata) -> List[UdfIO]:
)
return self._udf_io_service.get_outputs_by_udf_id(udf_obj.id)

def drop_dataset_metadata(self, database_name: str, table_name: str) -> bool:
def drop_dataset_metadata(self, obj: DataFrameMetadata) -> bool:
"""
This method deletes the table along with its columns from df_metadata
and df_columns respectively

Arguments:
table_name: table name to be deleted.
obj: dataframe metadata entry to remove

Returns:
True if successfully deleted else False
"""
return self._dataset_service.drop_dataset_by_name(database_name, table_name)
return self._dataset_service.drop_dataset(obj)

def drop_udf(self, udf_name: str) -> bool:
"""
Expand All @@ -368,12 +373,8 @@ def drop_udf(self, udf_name: str) -> bool:
"""
return self._udf_service.drop_udf_by_name(udf_name)

def rename_table(self, new_name: TableInfo, curr_table: TableInfo):
return self._dataset_service.rename_dataset_by_name(
new_name.table_name,
curr_table.database_name,
curr_table.table_name,
)
def rename_table(self, curr_table: DataFrameMetadata, new_name: TableInfo):
return self._dataset_service.rename_dataset(curr_table, new_name.table_name)

def check_table_exists(self, database_name: str, table_name: str):
metadata = self._dataset_service.dataset_object_by_name(
Expand All @@ -387,6 +388,53 @@ def check_table_exists(self, database_name: str, table_name: str):
def get_all_udf_entries(self):
return self._udf_service.get_all_udfs()

def get_video_metadata_table(
self, input_table: DataFrameMetadata
) -> DataFrameMetadata:
"""Get a video metadata table.
Raise if it does not exists
Args:
input_table (DataFrameMetadata): input video table

Returns:
DataFrameMetadata: metadata table maintained by the system
"""
# use file_url as the metadata table name
video_metadata_name = input_table.file_url
obj = self.get_dataset_metadata(None, video_metadata_name)
if not obj:
err = f"Table with name {video_metadata_name} does not exist in catalog"
logger.exception(err)
raise CatalogError(err)

return obj

def create_video_metadata_table(
self, input_table: DataFrameMetadata
) -> DataFrameMetadata:
"""Get a video metadata table.
Create one if it does not exists
We use this table to store all the video filenames and corresponding information
Args:
input_table (DataFrameMetadata): input video table

Returns:
DataFrameMetadata: metadata table maintained by the system
"""
# use file_url as the metadata table name
video_metadata_name = input_table.file_url
obj = self.get_dataset_metadata(None, video_metadata_name)
if obj:
err_msg = f"Table with name {video_metadata_name} already exists"
logger.exception(err_msg)
raise CatalogError(err_msg)

columns = [ColumnDefinition("file_url", ColumnType.TEXT, None, None)]
obj = self.create_table_metadata(
TableInfo(video_metadata_name), columns, identifier_column=columns[0].name
)
return obj

""" Index related services. """

def create_index(
Expand Down
6 changes: 3 additions & 3 deletions eva/catalog/schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

class SchemaUtils(object):
@staticmethod
def get_sqlalchemy_column(df_column: DataFrameColumn) -> Column:
def xform_to_sqlalchemy_column(df_column: DataFrameColumn) -> Column:
column_type = df_column.type

sqlalchemy_column = None
Expand All @@ -41,7 +41,7 @@ def get_sqlalchemy_column(df_column: DataFrameColumn) -> Column:
return sqlalchemy_column

@staticmethod
def get_sqlalchemy_schema(
def xform_to_sqlalchemy_schema(
column_list: List[DataFrameColumn],
) -> Dict[str, Column]:
"""Converts the list of DataFrameColumns to SQLAlchemyColumns
Expand All @@ -53,6 +53,6 @@ def get_sqlalchemy_schema(
Dict[str, Column]: mapping from column_name to sqlalchemy column object
"""
return {
column.name: SchemaUtils.get_sqlalchemy_column(column)
column.name: SchemaUtils.xform_to_sqlalchemy_column(column)
for column in column_list
}
46 changes: 23 additions & 23 deletions eva/catalog/services/df_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from eva.catalog.catalog_type import TableType
from eva.catalog.models.df_metadata import DataFrameMetadata
from eva.catalog.services.base_service import BaseService
from eva.utils.errors import CatalogError
from eva.utils.logging_manager import logger


Expand All @@ -38,14 +39,21 @@ def create_dataset(
Returns:
DataFrameMetadata object
"""
metadata = self.model(
name=name,
file_url=file_url,
identifier_id=identifier_id,
table_type=int(table_type),
)
metadata = metadata.save()
return metadata
try:
metadata = self.model(
name=name,
file_url=file_url,
identifier_id=identifier_id,
table_type=int(table_type),
)
metadata = metadata.save()
except Exception as e:
logger.exception(
f"Failed to create catalog dataset with exception {str(e)}"
)
raise CatalogError(e)
else:
return metadata

def dataset_by_name(self, name: str) -> int:
"""
Expand Down Expand Up @@ -94,35 +102,27 @@ def dataset_object_by_name(
"""
return self.model.query.filter(self.model._name == dataset_name).one_or_none()

def drop_dataset_by_name(self, database_name: str, dataset_name: str):
def drop_dataset(self, dataset: DataFrameMetadata):
"""Delete dataset from the db
Arguments:
database_name (str): Database to which dataset belongs
dataset_name (str): name of the dataset
dataset (DataFrameMetadata): dataset to delete
Returns:
True if successfully removed else false
"""
try:
dataset = self.dataset_object_by_name(database_name, dataset_name)
dataset.delete()
return True
except Exception as e:
err_msg = "Delete dataset failed for name {} with error {}".format(
dataset_name, str(e)
)
logger.error(err_msg)
raise RuntimeError(err_msg)
err_msg = f"Delete dataset failed for {dataset} with error {str(e)}."
logger.exception(err_msg)
raise CatalogError(err_msg)

def rename_dataset_by_name(
self, new_name: str, curr_database_name: str, curr_dataset_name: str
):
def rename_dataset(self, dataset: DataFrameMetadata, new_name: str):
try:
dataset = self.dataset_object_by_name(curr_database_name, curr_dataset_name)
dataset.update(_name=new_name)

except Exception as e:
err_msg = "Update dataset name failed for {} with error {}".format(
curr_dataset_name, str(e)
dataset.name, str(e)
)
logger.error(err_msg)
raise RuntimeError(err_msg)
1 change: 1 addition & 0 deletions eva/configuration/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from eva.version import VERSION

EVA_INSTALLATION_DIR = Path(eva.__file__).parent
EVA_ROOT_DIR = Path(eva.__file__).parent.parent
# Using eva version to govern the EVA_DEFAULT_DIR
# This means we won't support backward compatibility as each version will maintain its own copy of database.
# Ideally, if the new release is not breaking backward compatibilty, we can keep using the same copy.
Expand Down
1 change: 0 additions & 1 deletion eva/eva.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ storage:
upload_dir: ""
structured_data_engine: "eva.storage.sqlite_storage_engine.SQLStorageEngine"
video_engine: "eva.storage.opencv_storage_engine.OpenCVStorageEngine"
video_engine_version: 0
server:
host: "0.0.0.0"
port: 5432
Expand Down
26 changes: 15 additions & 11 deletions eva/executor/drop_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import ExecutorError
from eva.models.storage.batch import Batch
from eva.parser.table_ref import TableInfo
from eva.planner.drop_plan import DropPlan
from eva.storage.storage_engine import StorageEngine
from eva.utils.logging_manager import logger
Expand All @@ -33,37 +34,40 @@ def validate(self):
def exec(self):
"""Drop table executor"""
catalog_manager = CatalogManager()
if len(self.node.table_refs) > 1:
if len(self.node.table_infos) > 1:
logger.exception("Drop supports only single table")
table_ref = self.node.table_refs[0]
table_info: TableInfo = self.node.table_infos[0]

if not catalog_manager.check_table_exists(
table_ref.table.database_name, table_ref.table.table_name
table_info.database_name, table_info.table_name
):
err_msg = "Table: {} does not exist".format(table_ref)
err_msg = "Table: {} does not exist".format(table_info)
if self.node.if_exists:
logger.warn(err_msg)
return Batch(pd.DataFrame([err_msg]))
else:
logger.exception(err_msg)
raise ExecutorError(err_msg)

try:
storage_engine = StorageEngine.factory(table_ref.table.table_obj)
table_obj = catalog_manager.get_dataset_metadata(
table_info.database_name, table_info.table_name
)
storage_engine = StorageEngine.factory(table_obj)
except RuntimeError as err:
raise ExecutorError(str(err))
storage_engine.drop(table=table_ref.table.table_obj)
storage_engine.drop(table=table_obj)

success = catalog_manager.drop_dataset_metadata(
table_ref.table.database_name, table_ref.table.table_name
)
success = catalog_manager.drop_dataset_metadata(table_obj)

if not success:
err_msg = "Failed to drop {}".format(table_ref)
err_msg = "Failed to drop {}".format(table_info)
logger.exception(err_msg)
raise ExecutorError(err_msg)

yield Batch(
pd.DataFrame(
{"Table Successfully dropped: {}".format(table_ref.table.table_name)},
{"Table Successfully dropped: {}".format(table_info.table_name)},
index=[0],
)
)
23 changes: 22 additions & 1 deletion eva/executor/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import glob
import os
from pathlib import Path
from typing import Generator, List

import cv2

from eva.catalog.catalog_manager import CatalogManager
from eva.expression.abstract_expression import AbstractExpression
Expand Down Expand Up @@ -53,3 +58,19 @@ def handle_if_not_exists(table_info: TableInfo, if_not_exist=False):
raise ExecutorError(err_msg)
else:
return False


def iter_path_regex(path_regex: Path) -> Generator[str, None, None]:
return glob.iglob(os.path.expanduser(path_regex), recursive=True)


def validate_video(video_path: Path) -> bool:
try:
vid = cv2.VideoCapture(str(video_path))
if not vid.isOpened():
return False
return True
except Exception as e:
logger.warning(
f"Unexpected Exception {e} occured while reading video file {video_path}"
)
Loading