Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include schema migrations for 0.5.0+ #25

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/pybiocfilecache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
RnameExistsError,
RpathTimeoutError,
)
from .migrations import Migrator
from .models import Base, Resource
from .utils import (
calculate_file_hash,
Expand Down Expand Up @@ -69,6 +70,29 @@ def __init__(self, cache_dir: Optional[Union[str, Path]] = None, config: Optiona
self._setup_database()
self._last_cleanup = datetime.now()

# Initialize migrator
self.migrator = Migrator(self.engine)

# Run migrations
try:
self.migrator.migrate()
except Exception as e:
logger.error(f"Failed to run migrations: {e}")
raise

def get_schema_version(self) -> str:
"""Get current schema version."""
return self.migrator.get_current_version()

def migrate_schema(self, target_version: Optional[str] = None) -> None:
"""Migrate schema to specified version.

Args:
target_version: Version to migrate to.
If None, migrates to latest version.
"""
self.migrator.migrate(target_version)

def _setup_cache_dir(self) -> None:
if not self.config.cache_dir.exists():
self.config.cache_dir.mkdir(parents=True, exist_ok=True)
Expand Down
1 change: 0 additions & 1 deletion src/pybiocfilecache/const.py

This file was deleted.

1 change: 1 addition & 0 deletions src/pybiocfilecache/migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .migration import Migrator
139 changes: 139 additions & 0 deletions src/pybiocfilecache/migrations/migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import logging
from typing import Optional

from sqlalchemy import text
from sqlalchemy.engine import Engine

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


logger = logging.getLogger(__name__)


class Migration:
"""Base class for migrations."""

version: str
description: str

@staticmethod
def up(engine: Engine) -> None:
"""Upgrade to this version."""
raise NotImplementedError

@staticmethod
def down(engine: Engine) -> None:
"""Downgrade from this version."""
raise NotImplementedError


class Migrator:
"""Handles database schema migrations."""

def __init__(self, engine: Engine):
from .migrationV0_5_0 import MigrationV0_5_0

self.engine = engine
self.migrations = [MigrationV0_5_0]

def _detect_version_from_structure(self) -> str:
"""Detect schema version by examining table structure."""
with self.engine.connect() as conn:
# Check table structure
columns = conn.execute(
text("""
PRAGMA table_info(resource);
""")
).fetchall()
column_names = {col[1] for col in columns}

# Check for columns that indicate version
# if "is_compressed" in column_names:
# return "0.5.0"
if "tags" in column_names and "size_bytes" in column_names:
return "0.5.0"
else:
return "0.4.1"

def get_current_version(self) -> Optional[str]:
"""Get current schema version from database.

Will attempt to detect version if `schema_version` is not in metadata.
"""
with self.engine.connect() as conn:
result = conn.execute(
text("""
SELECT value FROM metadata
WHERE key = 'schema_version'
""")
)
row = result.fetchone()

if row is not None:
return row[0]

detected_version = self._detect_version_from_structure()
conn.execute(
text("""
INSERT INTO metadata (key, value)
VALUES ('schema_version', :version);
"""),
{"version": detected_version},
)

return detected_version

def migrate(self, target_version: Optional[str] = None) -> None:
"""Migrate schema to target version.

Args:
target_version:
Version to migrate to.
If None, migrates to latest version.
"""
try:
current = self.get_current_version()
latest_version = self.migrations[-1].version
target_version = target_version or latest_version

if current == target_version:
logger.info("Already at target version")
return

if current == "0.4.1" and target_version == "0.5.0":
logger.info("Upgrading from 0.4.1 to 0.5.0")
self.migrations[0].up(self.engine)
elif current == "0.5.0" and target_version == "0.4.1":
logger.info("Downgrading from 0.5.0 to 0.4.1")
self.migrations[0].down(self.engine)
else:
raise ValueError(f"Unsupported migration path: {current} -> {target_version}")

except Exception as e:
logger.error(f"Migration failed: {e}")
raise

# When we have multiple migrations
# current_idx = next((i for i, m in enumerate(self.migrations) if m.version == current), None)
# target_idx = next((i for i, m in enumerate(self.migrations) if m.version == target_version), None)

# if current_idx is None:
# raise ValueError(f"Unknown current version: {current}")
# if target_idx is None:
# raise ValueError(f"Unknown target version: {target_version}")

# if current_idx < target_idx:
# # Upgrade
# for migration in self.migrations[current_idx : target_idx + 1]:
# logger.info(f"Upgrading to {migration.version}")
# migration.up(self.engine)
# elif current_idx > target_idx:
# # Downgrade
# for migration in reversed(self.migrations[target_idx + 1 : current_idx + 1]):
# logger.info(f"Downgrading from {migration.version}")
# migration.down(self.engine)

def __repr__(self) -> str:
return f"Migrator(current_version={self.get_current_version()})"
102 changes: 102 additions & 0 deletions src/pybiocfilecache/migrations/migrationV0_5_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from sqlalchemy import text
from sqlalchemy.engine import Engine

from .migration import Migration

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


class MigrationV0_5_0(Migration):
"""Migration from v0.4.1 to 0.5.0."""

version = "0.5.0"
description = "Add tags, size_bytes, compression flag, and update indexes"

@staticmethod
def up(engine: Engine) -> None:
"""Upgrade from v0.4.1 to v0.5.0."""
with engine.begin() as conn:
# Add new columns
conn.execute(
text("""
ALTER TABLE resource
ADD COLUMN tags TEXT;
""")
)
conn.execute(
text("""
ALTER TABLE resource
ADD COLUMN size_bytes INTEGER;
""")
)
# conn.execute(
# text("""
# ALTER TABLE resource
# ADD COLUMN is_compressed BOOLEAN DEFAULT FALSE;
# """)
# )

# Calculate size_bytes for existing resources
conn.execute(
text("""
UPDATE resource
SET size_bytes = (
SELECT length(readfile(rpath))
FROM resource r2
WHERE r2.id = resource.id
)
WHERE EXISTS (
SELECT 1
FROM resource r3
WHERE r3.id = resource.id
AND r3.rpath IS NOT NULL
);
""")
)

# Add indexes
conn.execute(
text("""
CREATE UNIQUE INDEX IF NOT EXISTS ix_resource_rname
ON resource(rname);
""")
)
conn.execute(
text("""
CREATE INDEX IF NOT EXISTS ix_resource_rid
ON resource(rid);
""")
)

# Update metadata
conn.execute(
text("""
UPDATE metadata
SET value = '0.5.0'
WHERE key = 'schema_version';
""")
)

@staticmethod
def down(engine: Engine) -> None:
"""Downgrade from v0.5.0 to 0.4.1."""
with engine.begin() as conn:
# Remove indexes
conn.execute(text("DROP INDEX IF EXISTS ix_resource_rname;"))
conn.execute(text("DROP INDEX IF EXISTS ix_resource_rid;"))

# Remove columns
# conn.execute(text("ALTER TABLE resource DROP COLUMN is_compressed;"))
conn.execute(text("ALTER TABLE resource DROP COLUMN size_bytes;"))
conn.execute(text("ALTER TABLE resource DROP COLUMN tags;"))

# Update metadata
conn.execute(
text("""
UPDATE metadata
SET value = '0.4.1'
WHERE key = 'schema_version';
""")
)
Loading