Try   HackMD

Flask實作_ext_18_Flask-SQLAlchemy_Query

tags: flask flask_ext python sqlalchemy

SQLAlchemy的query類有著很多的method,了解它對我們利用orm操作資料庫有著非常大的幫助,這邊也單純的記錄著個人可以理解的method說明。

註:以專案中的資料表為例
註:Blog_Main是blog的主表、Blog_Post是blog的明細表

作業說明

add_column

add_columns

add_entity

all

  • 回傳所有查詢結果
>>> post = Blog_Post.query.all() >>> len(post) 17 >>> type(post) <class 'list'>

as_scalar

autoflush

column_descriptions

  • 回傳資料表的欄位屬性說明
>>> post = Blog_Main.query.with_entities(Blog_Main.id, Blog_Main.author) >>> pprint(post.column_descriptions) [{'aliased': False, 'entity': <class 'app_blog.blog.model.Blog_Main'>, 'expr': <sqlalchemy.orm.attributes.InstrumentedAttribute object at 0x05661540>, 'name': 'id', 'type': Integer()}, {'aliased': False, 'entity': <class 'app_blog.blog.model.Blog_Main'>, 'expr': <sqlalchemy.orm.attributes.InstrumentedAttribute object at 0x05661870>, 'name': 'author', 'type': Integer()}]

correlate

count

  • 效果同sql的count
>>> post = Blog_Post.query >>> post.count() 17
post = Blog_Post.query.filter_by(blog_main_id=1) post.count() 10

cte

delete

distinct

enable_assertions

enable_eagerloads

except_

except_all

execution_options

exists

filter

  • 能夠較為直述式的使用語法
    • filter_by無法使用>,<
    • 等號的話要使用==
>>> post = Blog_Post.query.filter(Blog_Post.id>1, Blog_Post.blog_main_id==1) >>> str(post.statement) 'SELECT "Blog_Posts".id, "Blog_Posts".title, "Blog_Posts".body, "Blog_Posts".category_id, "Blog_Posts".author_id, "Blog_Posts".blog_main_id, "Blog_Posts".create_date, "Blog_Posts".edit_date, "Blog_Posts".slug, "Blog_Posts".flag \nFROM "Blog_Posts" WHERE "Blog_Posts".id > :id_1 AND "Blog_Posts".blog_main_id = :blog_main_id_1'

filter_by

  • 如同sql的where
  • 有人說是語法糖,可以用來做簡單的查詢
>>> post = Blog_Post.query.filter_by(id=1).all() >>> post [<POST> Flask]
  • 多條件
>>> post = Blog_Post.query.filter_by(id=1, blog_main_id=1) >>> str(post.statement) 'SELECT "Blog_Posts".id, "Blog_Posts".title, "Blog_Posts".body, "Blog_Posts".category_id, "Blog_Posts".author_id, "Blog_Posts".blog_main_id, "Blog_Posts".create_date, "Blog_Posts".edit_date, "Blog_Posts".slug, "Blog_Posts".flag \nFROM "Blog_Posts" WHERE "Blog_Posts".id = :id_1 AND "Blog_Posts".blog_main_id = :blog_main_id_1'

first

  • 回傳第一筆資料,若無查詢資料則回傳None
>>> post = Blog_Post.query.first() >>> post <POST> Flask

from_self

  • 把查詢結果回傳給自己做子查詢
>>> post = Blog_Main.query.with_entities(Blog_Main.id, Blog_Main.author) >>> print(post.statement) SELECT "Blog_Main".id, "Blog_Main".author FROM "Blog_Main" >>> print(post.from_self().statement) SELECT anon_1."Blog_Main_id", anon_1."Blog_Main_author" FROM (SELECT "Blog_Main".id AS "Blog_Main_id", "Blog_Main".author AS "Blog_Main_author" FROM "Blog_Main") AS anon_1
>>> post = Blog_Main.query.filter(Blog_Main.blog_name.like('Flask%')).with_entities(Blog_Main.id, Blog_Main.author) >>> print(post.from_self().statement) SELECT anon_1."Blog_Main_id", anon_1."Blog_Main_author" FROM (SELECT "Blog_Main".id AS "Blog_Main_id", "Blog_Main".author AS "Blog_Main_author" FROM "Blog_Main" WHERE "Blog_Main".blog_name LIKE :blog_name_1) AS anon_1

from_statement

  • 直接使用標準sql語法查詢
  • 需配合from sqlalchemy import text
>>> from sqlalchemy import text >>> str = 'select id from Blog_posts' >>> post = Blog_Post.query.from_statement(text(str)) >>> print(post.statement) select id from Blog_posts
>>> from sqlalchemy import text >>> str = 'select id from Blog_posts' >>> post = Blog_Main.query.from_statement(text(str)) >>> print(post.statement) select id from Blog_posts

雖然是Blog_Main,但是沒有實際query之前這語法還是會過,後面只要接了all、first就會異常。

get

  • 取回get內的索引值資料,若無則回傳None
>>> post = Blog_Post.query.get(1) >>> post <POST> Flask >>> post.id 1

group_by

  • 效果如標準sql語法的gorup_by
>>> post = Blog_Post.query.with_entities(Blog_Post.blog_main_id).group_by(Blog_Post.blog_main_id) >>> print(post.statement) SELECT "Blog_Posts".blog_main_id FROM "Blog_Posts" GROUP BY "Blog_Posts".blog_main_id

having

  • 效果如標準sql語法的having
  • from sqlalchemy import func
>>> post = Blog_Post.query.with_entities(Blog_Post.blog_main_id).group_by(Blog_Post.blog_main_id).having(func.count(Blog_Post.blog_main_id)>1) >>> print(post.statement) SELECT "Blog_Posts".blog_main_id FROM "Blog_Posts" GROUP BY "Blog_Posts".blog_main_id HAVING count("Blog_Posts".blog_main_id) > :count_1

instances

intersect

intersect_all

join

  • join_relationship
    • 利用Model設置的relationship來做join
>>> blog = Blog_Main.query.join(Blog_Main.posts) >>> print(blog.statement) SELECT "Blog_Main".id, "Blog_Main".blog_name, "Blog_Main".blog_descri, "Blog_Main".blog_create_date, "Blog_Main".blog_cover_url, "Blog_Main".author FROM "Blog_Main" JOIN "Blog_Posts" ON "Blog_Main".id = "Blog_Posts".blog_main_id
  • join_entity
    • 利用模型來做join,會自動以fk來做join
    • 如果模型間沒有fk的話會造成異常
>>> blog = Blog_Main.query.join(Blog_Post) >>> print(blog.statement) SELECT "Blog_Main".id, "Blog_Main".blog_name, "Blog_Main".blog_descri, "Blog_Main".blog_create_date, "Blog_Main".blog_cover_url, "Blog_Main".author FROM "Blog_Main" JOIN "Blog_Posts" ON "Blog_Main".id = "Blog_Posts".blog_main_id

下面是故意以沒有fk值的model來做join

>>> blog = Blog_Main.query.join(Role) Traceback (most recent call last): File "D:\proPycharm\app_blog\venv\lib\site-packages\sqlalchemy\orm\query.py", line 2386, in _join_to_left clause, right, onclause, isouter=outerjoin, full=full)
  • join_on
    • 手動設置join on
>>> blog = Blog_Main.query.join(Blog_Post, Blog_Main.id==Blog_Post.blog_main_id) >>> print(blog.statement) SELECT "Blog_Main".id, "Blog_Main".blog_name, "Blog_Main".blog_descri, "Blog_Main".blog_create_date, "Blog_Main".blog_cover_url, "Blog_Main".author FROM "Blog_Main" JOIN "Blog_Posts" ON "Blog_Main".id = "Blog_Posts".blog_main_id
  • join_subquery
    • 可將另一個subquery物件拿來做join
    • 有設置relationship
>>> blog = Blog_Main.query.filter_by(author=1).subquery() >>> user = UserRegister.query.join(blog, UserRegister.blog_mains) >>> print(user.statement) SELECT "UserRgeisters".id, "UserRgeisters".username, "UserRgeisters".email, "UserRgeisters".password_hash, "UserRgeisters".confirm, "UserRgeisters".about_me, "UserRgeisters".location, "UserRgeisters".gender, "UserRgeisters".regist_date, "UserRgeisters".last_login FROM "UserRgeisters" JOIN (SELECT "Blog_Main".id AS id, "Blog_Main".blog_name AS blog_name, "Blog_Main".blog_descri AS blog_descri, "Blog_Main".blog_create_date AS blog_create_date, "Blog_Main".blog_cover_url AS blog_cover_url, "Blog_Main".author AS author FROM "Blog_Main" WHERE "Blog_Main".author = :author_1) AS anon_1 ON "UserRgeisters".id = anon_1.author

或是利用一般的on的作法

user = UserRegister.query.join(blog, blog.c.author==UserRegister.id)

c的意義

  • join_select_from
    • 注意到join是以select_from為主,不再是query的物件
>>> user = UserRegister.query.select_from(Blog_Main).join(Blog_Main.user) >>> print(user.statement) SELECT "UserRgeisters".id, "UserRgeisters".username, "UserRgeisters".email, "UserRgeisters".password_hash, "UserRgeisters".confirm, "UserRgeisters".about_me, "UserRgeisters".location, "UserRgeisters".gender, "UserRgeisters".regist_date, "UserRgeisters".last_login FROM "Blog_Main" JOIN "UserRgeisters" ON "UserRgeisters".id = "Blog_Main".author

label

lazy_loaded_from

limit

merge_result

offset

one

  • 回傳一筆資料集,但是查詢結果若超過一筆資料或無資料會拋出異常
    • sqlalchemy.orm.exc.MultipleResultsFound
    • sqlalchemy.orm.exc.NoResultFound
>>> user = UserRegister.query.filter_by(id=1).one() >>> user username:Shaoe.chen

one_or_none

  • 回傳一筆資料集,但是查詢結果若超過一筆資料會拋出異常,無資料則回傳None
>>> user = UserRegister.query.filter_by(id=11).one_or_none() >>> type(user) <class 'NoneType'>

only_return_tuples

  • 回傳tuple,應用於對單一元素的查詢
>>> user = UserRegister.query.only_return_tuples(True) >>> user <flask_sqlalchemy.BaseQuery object at 0x0060CF30> >>> for u in user: ... print(u) ... (username:Shaoe.chen,)

options

order_by

  • 排序
>>> post = Blog_Post.query.order_by(Blog_Post.blog_main_id) >>> print(post.statement) SELECT "Blog_Posts".id, "Blog_Posts".title, "Blog_Posts".body, "Blog_Posts".category_id, "Blog_Posts".author_id, "Blog_Posts".blog_main_id, "Blog_Posts".create_date, "Blog_Posts".edit_date, "Blog_Posts".slug, "Blog_Posts".flag FROM "Blog_Posts" ORDER BY "Blog_Posts".blog_main_id

outerjoin

  • 用法與join同,就是select的主表不同的差異而以
    • 創建一個left outer join的關聯

params

  • 搜尋中以參數來做條件
>>> role = Role.query.filter("id=:id").params(id=1)
>>> print(role.statement)
SELECT roles.id, roles.name 
FROM roles 
WHERE id=:id
>>> role.all()
[Role is ADMIN]

populate_existing

prefix_with

reset_joinpoint

scalar

select_entity_from

select_from

  • 通常跟join搭配一起使用,可看join的使用說明

selectable

slice

statement

  • 回傳解析的sql語法
>>> post = Blog_Post.query >>> print(post.statement) SELECT "Blog_Posts".id, "Blog_Posts".title, "Blog_Posts".body, "Blog_Posts".category_id, "Blog_Posts".author_id, "Blog_Posts".blog_main_id, "Blog_Posts".create_date, "Blog_Posts".edit_date, "Blog_Posts".slug, "Blog_Posts".flag FROM "Blog_Posts"

subquery

suffix_with

union

union_all

update

value

values

whereclause

with_entities

  • 可將query物件再來做再一次的搜尋,在flask_sqlalchemy中可以用來做搜尋欄位選擇
>>> post = Blog_Post.query.with_entities(Blog_Post.id, Blog_Post.author_id) >>> print(post.statement) SELECT "Blog_Posts".id, "Blog_Posts".author_id FROM "Blog_Posts"

參考_stackoverflow

with_for_update

with_hint

with_labels

with_lockmode

with_parent

with_polymorphic

with_session

with_statement_hint

with_transformation

yield_per