You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
As proposal, here is my implementation of CTE + WITH RECURSIVE
(now even sqlite can do this)
Usage example
frompydal.objectsimportExpressionfromwhatever_see_belowimportCTE, XQuery#just useful shortcutexp=lambdae, as_, type='string': Expression(db, e, type=type).with_alias(as_)
works_for=db(org.name=='Alica').for_CTE(
org.id,
org.name.with_alias('top_boss'), # i.e. Alica is top_bossorg.name,
exp("1", as_='xdepth', type='integer')
).with_alias('works_for')
# turn into CTE and add recursive term by .union() or .union_all() works_for=CTE(works_for).union(
db(org.boss==works_for.id).for_CTE(
org.idworks_for.top_boss,
org.name,
(works_for.xdepth+1).with_alias('xdepth') # how many bosses are up to Alice?
)
)
# query with CTE should be in format# XQuery(foo_cte, bar_cte, ..., baz_cte, regular_query).select(...) - or ._select(...) XQuery( works_for, (works_for.name=='John') & (works_for.xdepth<4)).select(works_for.ALL)
Implementation
# --------------- monkey patch ------------------------# this small monkey patch so anyone can try# - could be placed in the db.pydefmonkey_patch():
''' The goal of the patch is to inject `for_CTE` method into pyDAL Set-class instance of which is returned by db(...) `for_CTE()` returns the same as `nested_select()` except that for_CTE().with_alias('foo').query_name(...) returns just 'foo' instead of '(SELECT ...) AS foo' '''deffor_CTE(self, *a, **kw):
# get nested_selectsel=self.nested_select(*a, **kw)
# get `with_alias` method itself_wa=sel.with_alias# may be already patched, if so - do nothingifnothasattr(_wa, '_patched'):
# define patched `with_alias`defwith_alias(s, *a, **kw):
# call original `with_alias` to get cloneother=_wa(*a, **kw)
# replace query_name in the cloneother.query_name=lambda*a, **kw: (other.sql_shortref,)
returnotherwith_alias._patched=True# replace `with_alias` in `nested_select`-resultsetattr(sel,'with_alias', with_alias.__get__(sel, sel.__class__))
returnsel# inject `for_CTE` method into Set-classsetattr(pydal.objects.Set, 'for_CTE' , for_CTE)
# don't forget to call itmonkey_patch()
# --------------- the core: CTE and XQuery ------------------------classCTE(object):
def__init__(self, select, recursive=None):
''' select - must be db(...).for_CTE(...).with_alias(...) recursive - may be .nested_select() or .for_CTE(...) '''self.select=selectself.recursive=recursiveself.union_type='UNION ALL'defunion(self, recursive):
self.union_type='UNION'self.recursive=recursivereturnselfdefunion_all(self, recursive):
self.union_type='UNION ALL'self.recursive=recursivereturnselfdef__call__(self):
sql='{t}({fields}) AS ({select}) 'recursive=''ifself.recursive:
# we cant use just str(self.recursive), because it is in format '(SELECT ...) AS alias'# but there is _sql_cache that returns non-aliased nested_select definition!# force generate _sql_cachestr(self.recursive)
recursive=self.recursive._sql_cache[:-1] # [:-1] - need to remove ';' at the endsql=sql.replace('{select}', '{select} {union_type} {recursive}')
str(self.select)
returnsql.format(
t=self.select._tablename, # alias in factfields=','.join(self.select.fields),
select=self.select._sql_cache[:-1],
union_type=self.union_type,
recursive=recursive,
)
def__getattribute__(self, k):
sel=object.__getattribute__(self, 'select')
ifk=='ALL':
returnsel.ALLelifkinsel.fields:
returnsel[k]
returnobject.__getattribute__(self, k)
@propertydeffields(self):
returnself.select.fieldsdef__iter__(self):
returnself.selectclassXQuery(object):
def__init__(self, *args):
self.ctes= []
iflen(args)>1:
self.ctes=args[:-1]
self.q=args[-1]
self.db=self.q._dbdef_select(self, *fields, **kw):
db=self.dbsql=self.ctesand'WITH RECURSIVE {ctes} {select}'or'{select}'returnsql.format(
ctes=','.join([c() forcinself.ctes]),
select=db(self.q)._select(*fields, **kw)[:-1]
)
defselect(self, *fields, **kw):
db=self.dbsql=self._select(*fields, **kw)
colnames= []
_fields= []
forfinfields:
ifisinstance(f, SQLALL):
fortfinf._table:
_fields.append(tf)
colnames.append(str(tf))
continue_fields.append(f)
c=str(f)
ifnotisinstance(f, Field):
as_alias=re.match('^.+? AS (\w+[\w0-9]*)$', c)
ifas_alias:
c=as_alias.group(1)
colnames.append(c)
returndb.executesql(sql, fields=_fields, colnames=colnames)
The text was updated successfully, but these errors were encountered:
As proposal, here is my implementation of CTE + WITH RECURSIVE
(now even sqlite can do this)
Usage example
Implementation
The text was updated successfully, but these errors were encountered: