Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migration support group name #2457

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sea-orm-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ you should provide the directory of that submodule.",
)]
database_url: Option<String>,

#[arg(
global = true,
short = 'g',
long,
env = "MIGRATION_GROUP",
help = "Migration group name, defaults to the 'default' group."
)]
group: Option<String>,

#[command(subcommand)]
command: Option<MigrateSubcommands>,
},
Expand Down Expand Up @@ -334,12 +343,14 @@ pub async fn main() {
migration_dir,
database_schema,
database_url,
group,
command,
} => run_migrate_command(
command,
&migration_dir,
database_schema,
database_url,
group,
verbose,
)
.unwrap_or_else(handle_error),
Expand Down
4 changes: 4 additions & 0 deletions sea-orm-cli/src/commands/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub fn run_migrate_command(
migration_dir: &str,
database_schema: Option<String>,
database_url: Option<String>,
group: Option<String>,
verbose: bool,
) -> Result<(), Box<dyn Error>> {
match command {
Expand Down Expand Up @@ -62,6 +63,9 @@ pub fn run_migrate_command(
if let Some(database_schema) = &database_schema {
args.extend(["-s", database_schema]);
}
if let Some(group) = &group {
args.extend(["-g", group]);
}
if verbose {
args.push("-v");
}
Expand Down
27 changes: 19 additions & 8 deletions sea-orm-migration/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ where
.database_url
.expect("Environment variable 'DATABASE_URL' not set");
let schema = cli.database_schema.unwrap_or_else(|| "public".to_owned());
let group = cli.group.unwrap_or_else(|| "default".to_owned());

let connect_options = ConnectOptions::new(url)
.set_schema_search_path(schema)
Expand All @@ -29,14 +30,15 @@ where
.await
.expect("Fail to acquire database connection");

run_migrate(migrator, db, cli.command, cli.verbose)
run_migrate(migrator, db, &group, cli.command, cli.verbose)
.await
.unwrap_or_else(handle_error);
}

pub async fn run_migrate<M>(
_: M,
db: &DbConn,
group: &str,
command: Option<MigrateSubcommands>,
verbose: bool,
) -> Result<(), Box<dyn Error>>
Expand Down Expand Up @@ -68,19 +70,19 @@ where
};

match command {
Some(MigrateSubcommands::Fresh) => M::fresh(db).await?,
Some(MigrateSubcommands::Refresh) => M::refresh(db).await?,
Some(MigrateSubcommands::Reset) => M::reset(db).await?,
Some(MigrateSubcommands::Status) => M::status(db).await?,
Some(MigrateSubcommands::Up { num }) => M::up(db, num).await?,
Some(MigrateSubcommands::Down { num }) => M::down(db, Some(num)).await?,
Some(MigrateSubcommands::Fresh) => M::fresh(db, group).await?,
Some(MigrateSubcommands::Refresh) => M::refresh(db, group).await?,
Some(MigrateSubcommands::Reset) => M::reset(db, group).await?,
Some(MigrateSubcommands::Status) => M::status(db, group).await?,
Some(MigrateSubcommands::Up { num }) => M::up(db, group, num).await?,
Some(MigrateSubcommands::Down { num }) => M::down(db, group, Some(num)).await?,
Some(MigrateSubcommands::Init) => run_migrate_init(MIGRATION_DIR)?,
Some(MigrateSubcommands::Generate {
migration_name,
universal_time: _,
local_time,
}) => run_migrate_generate(MIGRATION_DIR, &migration_name, !local_time)?,
_ => M::up(db, None).await?,
_ => M::up(db, group, None).await?,
};

Ok(())
Expand Down Expand Up @@ -112,6 +114,15 @@ pub struct Cli {
)]
database_url: Option<String>,

#[arg(
global = true,
short = 'g',
long,
env = "MIGRATION_GROUP",
help = "Migration group name, defaults to the 'default' group."
)]
group: Option<String>,

#[command(subcommand)]
command: Option<MigrateSubcommands>,
}
Expand Down
99 changes: 67 additions & 32 deletions sea-orm-migration/src/migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ use sea_orm::sea_query::{
self, extension::postgres::Type, Alias, Expr, ForeignKey, IntoIden, JoinType, Order, Query,
SelectStatement, SimpleExpr, Table,
};
use sea_orm::{
ActiveModelTrait, ActiveValue, Condition, ConnectionTrait, DbBackend, DbErr, DeriveIden,
DynIden, EntityTrait, FromQueryResult, Iterable, QueryFilter, Schema, Statement,
TransactionTrait,
};
use sea_orm::{ActiveModelTrait, ActiveValue, ColumnTrait, Condition, ConnectionTrait, DbBackend, DbErr, DeriveIden, DynIden, EntityTrait, FromQueryResult, Iterable, QueryFilter, Schema, Statement, TransactionTrait};
use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite};

use super::{seaql_migrations, IntoSchemaManagerConnection, MigrationTrait, SchemaManager};
Expand Down Expand Up @@ -77,13 +73,17 @@ pub trait MigratorTrait: Send {
}

/// Get list of applied migrations from database
async fn get_migration_models<C>(db: &C) -> Result<Vec<seaql_migrations::Model>, DbErr>
async fn get_migration_models<C>(
db: &C,
group: &str,
) -> Result<Vec<seaql_migrations::Model>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
let stmt = Query::select()
.table_name(Self::migration_table_name())
.cond_where(seaql_migrations::Column::Group.eq(group))
.columns(seaql_migrations::Column::iter().map(IntoIden::into_iden))
.order_by(seaql_migrations::Column::Version, Order::Asc)
.to_owned();
Expand All @@ -94,13 +94,13 @@ pub trait MigratorTrait: Send {
}

/// Get list of migrations with status
async fn get_migration_with_status<C>(db: &C) -> Result<Vec<Migration>, DbErr>
async fn get_migration_with_status<C>(db: &C, group: &str) -> Result<Vec<Migration>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
let mut migration_files = Self::get_migration_files();
let migration_models = Self::get_migration_models(db).await?;
let migration_models = Self::get_migration_models(db, group).await?;

let migration_in_db: HashSet<String> = migration_models
.into_iter()
Expand Down Expand Up @@ -133,25 +133,25 @@ pub trait MigratorTrait: Send {
}

/// Get list of pending migrations
async fn get_pending_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
async fn get_pending_migrations<C>(db: &C, group: &str) -> Result<Vec<Migration>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
Ok(Self::get_migration_with_status(db)
Ok(Self::get_migration_with_status(db, group)
.await?
.into_iter()
.filter(|file| file.status == MigrationStatus::Pending)
.collect())
}

/// Get list of applied migrations
async fn get_applied_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
async fn get_applied_migrations<C>(db: &C, group: &str) -> Result<Vec<Migration>, DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;
Ok(Self::get_migration_with_status(db)
Ok(Self::get_migration_with_status(db, group)
.await?
.into_iter()
.filter(|file| file.status == MigrationStatus::Applied)
Expand All @@ -174,75 +174,98 @@ pub trait MigratorTrait: Send {
}

/// Check the status of all migrations
async fn status<C>(db: &C) -> Result<(), DbErr>
async fn status<C>(db: &C, group: &str) -> Result<(), DbErr>
where
C: ConnectionTrait,
{
Self::install(db).await?;

info!("Checking migration status");

for Migration { migration, status } in Self::get_migration_with_status(db).await? {
info!("Migration '{}'... {}", migration.name(), status);
for Migration { migration, status } in Self::get_migration_with_status(db, group).await? {
info!(
"Migration '{}' => '{}'... {}",
group,
migration.name(),
status
);
}

Ok(())
}

/// Drop all tables from the database, then reapply all migrations
async fn fresh<'c, C>(db: C) -> Result<(), DbErr>
async fn fresh<'c, C, G: Into<String> + Send>(db: C, group: G) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let group = group.into();
exec_with_connection::<'_, _, _>(db, move |manager| {
Box::pin(async move { exec_fresh::<Self>(manager).await })
let group = group.clone();
Box::pin(async move { exec_fresh::<Self>(manager, &group).await })
})
.await
}

/// Rollback all applied migrations, then reapply all migrations
async fn refresh<'c, C>(db: C) -> Result<(), DbErr>
async fn refresh<'c, C, G: Into<String> + Send>(db: C, group: G) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let group = group.into();
exec_with_connection::<'_, _, _>(db, move |manager| {
let group = group.clone();
Box::pin(async move {
exec_down::<Self>(manager, None).await?;
exec_up::<Self>(manager, None).await
exec_down::<Self>(manager, &group, None).await?;
exec_up::<Self>(manager, &group, None).await
})
})
.await
}

/// Rollback all applied migrations
async fn reset<'c, C>(db: C) -> Result<(), DbErr>
async fn reset<'c, C, G: Into<String> + Send>(db: C, group: G) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let group = group.into();
exec_with_connection::<'_, _, _>(db, move |manager| {
Box::pin(async move { exec_down::<Self>(manager, None).await })
let group = group.clone();
Box::pin(async move { exec_down::<Self>(manager, &group, None).await })
})
.await
}

/// Apply pending migrations
async fn up<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
async fn up<'c, C, G: Into<String> + Send>(
db: C,
group: G,
steps: Option<u32>,
) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let group = group.into();
exec_with_connection::<'_, _, _>(db, move |manager| {
Box::pin(async move { exec_up::<Self>(manager, steps).await })
let group = group.clone();
Box::pin(async move { exec_up::<Self>(manager, &group, steps).await })
})
.await
}

/// Rollback applied migrations
async fn down<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
async fn down<'c, C, G: Into<String> + Send>(
db: C,
group: G,
steps: Option<u32>,
) -> Result<(), DbErr>
where
C: IntoSchemaManagerConnection<'c>,
{
let group = group.into();
exec_with_connection::<'_, _, _>(db, move |manager| {
Box::pin(async move { exec_down::<Self>(manager, steps).await })
let group = group.clone();
Box::pin(async move { exec_down::<Self>(manager, &group, steps).await })
})
.await
}
Expand Down Expand Up @@ -271,7 +294,7 @@ where
}
}

async fn exec_fresh<M>(manager: &SchemaManager<'_>) -> Result<(), DbErr>
async fn exec_fresh<M>(manager: &SchemaManager<'_>, group: &str) -> Result<(), DbErr>
where
M: MigratorTrait + ?Sized,
{
Expand Down Expand Up @@ -353,10 +376,14 @@ where
}

// Reapply all migrations
exec_up::<M>(manager, None).await
exec_up::<M>(manager, group, None).await
}

async fn exec_up<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
async fn exec_up<M>(
manager: &SchemaManager<'_>,
group: &str,
mut steps: Option<u32>,
) -> Result<(), DbErr>
where
M: MigratorTrait + ?Sized,
{
Expand All @@ -370,7 +397,7 @@ where
info!("Applying all pending migrations");
}

let migrations = M::get_pending_migrations(db).await?.into_iter();
let migrations = M::get_pending_migrations(db, group).await?.into_iter();
if migrations.len() == 0 {
info!("No pending migrations");
}
Expand All @@ -388,6 +415,7 @@ where
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!");
seaql_migrations::Entity::insert(seaql_migrations::ActiveModel {
group: ActiveValue::Set(group.to_owned()),
version: ActiveValue::Set(migration.name().to_owned()),
applied_at: ActiveValue::Set(now.as_secs() as i64),
})
Expand All @@ -399,7 +427,11 @@ where
Ok(())
}

async fn exec_down<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
async fn exec_down<M>(
manager: &SchemaManager<'_>,
group: &str,
mut steps: Option<u32>,
) -> Result<(), DbErr>
where
M: MigratorTrait + ?Sized,
{
Expand All @@ -413,7 +445,10 @@ where
info!("Rolling back all applied migrations");
}

let migrations = M::get_applied_migrations(db).await?.into_iter().rev();
let migrations = M::get_applied_migrations(db, group)
.await?
.into_iter()
.rev();
if migrations.len() == 0 {
info!("No applied migrations");
}
Expand Down
1 change: 1 addition & 0 deletions sea-orm-migration/src/seaql_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use sea_orm::entity::prelude::*;
// One should override the name of migration table via `MigratorTrait::migration_table_name` method
#[sea_orm(table_name = "seaql_migrations")]
pub struct Model {
pub group: String,
#[sea_orm(primary_key, auto_increment = false)]
pub version: String,
pub applied_at: i64,
Expand Down
Loading