Skip to content

Commit

Permalink
Use keyword paramaters for migration methods for mssql
Browse files Browse the repository at this point in the history
The new alembic (1.11.0) requires parameters to be keyword
parameters in methods used in migrations. Part of the problem has
been fixed in apache#31306 and apache#31302 but still some migrations that are
specifically run in case of mssql need to use migration parameters.
  • Loading branch information
potiuk committed May 16, 2023
1 parent 4dacab3 commit ea1220d
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def upgrade():

conn = op.get_bind()
if conn.dialect.name == "mssql":
op.drop_index("ti_pool", table_name="task_instance")
op.drop_index(index_name="ti_pool", table_name="task_instance")

# use batch_alter_table to support SQLite workaround
with op.batch_alter_table("task_instance") as batch_op:
Expand All @@ -75,14 +75,16 @@ def upgrade():
)

if conn.dialect.name == "mssql":
op.create_index("ti_pool", "task_instance", ["pool", "state", "priority_weight"])
op.create_index(
index_name="ti_pool", table_name="task_instance", columns=["pool", "state", "priority_weight"]
)


def downgrade():
"""Make TaskInstance.pool field nullable."""
conn = op.get_bind()
if conn.dialect.name == "mssql":
op.drop_index("ti_pool", table_name="task_instance")
op.drop_index(index_name="ti_pool", table_name="task_instance")

# use batch_alter_table to support SQLite workaround
with op.batch_alter_table("task_instance") as batch_op:
Expand All @@ -93,7 +95,9 @@ def downgrade():
)

if conn.dialect.name == "mssql":
op.create_index("ti_pool", "task_instance", ["pool", "state", "priority_weight"])
op.create_index(
index_name="ti_pool", table_name="task_instance", columns=["pool", "state", "priority_weight"]
)

with create_session() as session:
session.query(TaskInstance).filter(TaskInstance.pool == "default_pool").update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ class SerializedDagModel(Base):
conn = op.get_bind()
if conn.dialect.name != "sqlite":
if conn.dialect.name == "mssql":
op.drop_index("idx_fileloc_hash", "serialized_dag")
op.drop_index(index_name="idx_fileloc_hash", table_name="serialized_dag")

op.alter_column(
table_name="serialized_dag", column_name="fileloc_hash", type_=sa.BigInteger(), nullable=False
)
if conn.dialect.name == "mssql":
op.create_index("idx_fileloc_hash", "serialized_dag", ["fileloc_hash"])
op.create_index(
index_name="idx_fileloc_hash", table_name="serialized_dag", columns=["fileloc_hash"]
)

sessionmaker = sa.orm.sessionmaker()
session = sessionmaker(bind=conn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def upgrade():
if "id" in xcom_columns:
if conn.dialect.name == "mssql":
constraint_dict = get_table_constraints(conn, "xcom")
drop_column_constraints(bop, "id", constraint_dict)
drop_column_constraints(operator=bop, column_name="id", constraint_dict=constraint_dict)
bop.drop_column("id")
bop.drop_index("idx_xcom_dag_task_date")
# mssql doesn't allow primary keys with nullable columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def upgrade():
bop.alter_column("key", type_=StringID(length=512), nullable=False)
bop.alter_column("execution_date", type_=TIMESTAMP, nullable=False)
if conn.dialect.name == "mssql":
bop.create_primary_key("pk_xcom", ["dag_id", "task_id", "key", "execution_date"])
bop.create_primary_key(
constraint_name="pk_xcom", columns=["dag_id", "task_id", "key", "execution_date"]
)


def downgrade():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,8 @@ def downgrade():
constraints = get_mssql_table_constraints(conn, "xcom")
pk, _ = constraints["PRIMARY KEY"].popitem()
op.drop_constraint(pk, "xcom", type_="primary")
op.create_primary_key("pk_xcom", "xcom", ["dag_id", "task_id", "execution_date", "key"])
op.create_primary_key(
constraint_name="pk_xcom",
table_name="xcom",
columns=["dag_id", "task_id", "execution_date", "key"],
)
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4995c94849a5848e551e85ffeb99d3dda9f652f3c2d7b3cf3aa16c02911690d3
4987842fd67d29e194f1117e127d3291ba60d3fbc3e81cba75ce93884c263321
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# specific language governing permissions and limitations
# under the License.
"""Setup.py for the Airflow project."""
# This file can be modified if you want to make sure the CI build is using "upgrade to newer dependencies"
# Which is useful when you want to check if the dependencies are still compatible with the latest versions
# And they seem to break some unrelated tests in main. You can modify this number = 00001 to trigger it.
from __future__ import annotations

import glob
Expand Down

0 comments on commit ea1220d

Please sign in to comment.