Skip to content

Commit

Permalink
include schema migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
jkanche committed Dec 13, 2024
1 parent b2361a0 commit 05b46f4
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 1 deletion.
Binary file added cache/BiocFileCache.sqlite
Binary file not shown.
24 changes: 24 additions & 0 deletions src/pybiocfilecache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
RnameExistsError,
RpathTimeoutError,
)
from .migrations import Migrator
from .models import Base, Resource
from .utils import (
calculate_file_hash,
Expand Down Expand Up @@ -69,6 +70,29 @@ def __init__(self, cache_dir: Optional[Union[str, Path]] = None, config: Optiona
self._setup_database()
self._last_cleanup = datetime.now()

# Initialize migrator
self.migrator = Migrator(self.engine)

# Run migrations
try:
self.migrator.migrate()
except Exception as e:
logger.error(f"Failed to run migrations: {e}")
raise

def get_schema_version(self) -> str:
"""Get current schema version."""
return self.migrator.get_current_version()

def migrate_schema(self, target_version: Optional[str] = None) -> None:
"""Migrate schema to specified version.
Args:
target_version: Version to migrate to.
If None, migrates to latest version.
"""
self.migrator.migrate(target_version)

def _setup_cache_dir(self) -> None:
if not self.config.cache_dir.exists():
self.config.cache_dir.mkdir(parents=True, exist_ok=True)
Expand Down
1 change: 0 additions & 1 deletion src/pybiocfilecache/const.py

This file was deleted.

1 change: 1 addition & 0 deletions src/pybiocfilecache/migrations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .migration import Migrator
139 changes: 139 additions & 0 deletions src/pybiocfilecache/migrations/migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import logging
from typing import Optional

from sqlalchemy import text
from sqlalchemy.engine import Engine

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


logger = logging.getLogger(__name__)


class Migration:
"""Base class for migrations."""

version: str
description: str

@staticmethod
def up(engine: Engine) -> None:
"""Upgrade to this version."""
raise NotImplementedError

@staticmethod
def down(engine: Engine) -> None:
"""Downgrade from this version."""
raise NotImplementedError


class Migrator:
"""Handles database schema migrations."""

def __init__(self, engine: Engine):
from .migrationV0_5_0 import MigrationV0_5_0

self.engine = engine
self.migrations = [MigrationV0_5_0]

def _detect_version_from_structure(self) -> str:
"""Detect schema version by examining table structure."""
with self.engine.connect() as conn:
# Check table structure
columns = conn.execute(
text("""
PRAGMA table_info(resource);
""")
).fetchall()
column_names = {col[1] for col in columns}

# Check for columns that indicate version
if "is_compressed" in column_names:
return "0.5.0"
elif "tags" in column_names and "size_bytes" in column_names:
return "0.5.0"
else:
return "0.4.1"

def get_current_version(self) -> Optional[str]:
"""Get current schema version from database.
Will attempt to detect version if `schema_version` is not in metadata.
"""
with self.engine.connect() as conn:
result = conn.execute(
text("""
SELECT value FROM metadata
WHERE key = 'schema_version'
""")
)
row = result.fetchone()

if row is not None:
return row[0]

detected_version = self._detect_version_from_structure()
conn.execute(
text("""
INSERT INTO metadata (key, value)
VALUES ('schema_version', :version);
"""),
{"version": detected_version},
)

return detected_version

def migrate(self, target_version: Optional[str] = None) -> None:
"""Migrate schema to target version.
Args:
target_version:
Version to migrate to.
If None, migrates to latest version.
"""
try:
current = self.get_current_version()
latest_version = self.migrations[-1].version
target_version = target_version or latest_version

if current == target_version:
logger.info("Already at target version")
return

if current == "0.4.1" and target_version == "0.5.0":
logger.info("Upgrading from 0.4.1 to 0.5.0")
self.migrations[0].up(self.engine)
elif current == "0.5.0" and target_version == "0.4.1":
logger.info("Downgrading from 0.5.0 to 0.4.1")
self.migrations[0].down(self.engine)
else:
raise ValueError(f"Unsupported migration path: {current} -> {target_version}")

except Exception as e:
logger.error(f"Migration failed: {e}")
raise

# When we have multiple migrations
# current_idx = next((i for i, m in enumerate(self.migrations) if m.version == current), None)
# target_idx = next((i for i, m in enumerate(self.migrations) if m.version == target_version), None)

# if current_idx is None:
# raise ValueError(f"Unknown current version: {current}")
# if target_idx is None:
# raise ValueError(f"Unknown target version: {target_version}")

# if current_idx < target_idx:
# # Upgrade
# for migration in self.migrations[current_idx : target_idx + 1]:
# logger.info(f"Upgrading to {migration.version}")
# migration.up(self.engine)
# elif current_idx > target_idx:
# # Downgrade
# for migration in reversed(self.migrations[target_idx + 1 : current_idx + 1]):
# logger.info(f"Downgrading from {migration.version}")
# migration.down(self.engine)

def __repr__(self) -> str:
return f"Migrator(current_version={self.get_current_version()})"
102 changes: 102 additions & 0 deletions src/pybiocfilecache/migrations/migrationV0_5_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from sqlalchemy import text
from sqlalchemy.engine import Engine

from .migration import Migration

__author__ = "Jayaram Kancherla"
__copyright__ = "Jayaram Kancherla"
__license__ = "MIT"


class MigrationV0_5_0(Migration):
"""Migration from v0.4.1 to 0.5.0."""

version = "0.5.0"
description = "Add tags, size_bytes, compression flag, and update indexes"

@staticmethod
def up(engine: Engine) -> None:
"""Upgrade from v0.4.1 to v0.5.0."""
with engine.begin() as conn:
# Add new columns
conn.execute(
text("""
ALTER TABLE resource
ADD COLUMN tags TEXT;
""")
)
conn.execute(
text("""
ALTER TABLE resource
ADD COLUMN size_bytes INTEGER;
""")
)
# conn.execute(
# text("""
# ALTER TABLE resource
# ADD COLUMN is_compressed BOOLEAN DEFAULT FALSE;
# """)
# )

# Calculate size_bytes for existing resources
conn.execute(
text("""
UPDATE resource
SET size_bytes = (
SELECT length(readfile(rpath))
FROM resource r2
WHERE r2.id = resource.id
)
WHERE EXISTS (
SELECT 1
FROM resource r3
WHERE r3.id = resource.id
AND r3.rpath IS NOT NULL
);
""")
)

# Add indexes
conn.execute(
text("""
CREATE UNIQUE INDEX IF NOT EXISTS ix_resource_rname
ON resource(rname);
""")
)
conn.execute(
text("""
CREATE INDEX IF NOT EXISTS ix_resource_rid
ON resource(rid);
""")
)

# Update metadata
conn.execute(
text("""
UPDATE metadata
SET value = '0.5.0'
WHERE key = 'schema_version';
""")
)

@staticmethod
def down(engine: Engine) -> None:
"""Downgrade from v0.5.0 to 0.4.1."""
with engine.begin() as conn:
# Remove indexes
conn.execute(text("DROP INDEX IF EXISTS ix_resource_rname;"))
conn.execute(text("DROP INDEX IF EXISTS ix_resource_rid;"))

# Remove columns
# conn.execute(text("ALTER TABLE resource DROP COLUMN is_compressed;"))
conn.execute(text("ALTER TABLE resource DROP COLUMN size_bytes;"))
conn.execute(text("ALTER TABLE resource DROP COLUMN tags;"))

# Update metadata
conn.execute(
text("""
UPDATE metadata
SET value = '0.4.1'
WHERE key = 'schema_version';
""")
)

0 comments on commit 05b46f4

Please sign in to comment.