sqlalchemy-core

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey

engine = create_engine('mysql+pymysql://root:@localhost/python')

metadata = MetaData()

users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
Column('fullname', String(20))
)

addresses = Table('addresses', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', None, ForeignKey('users.id')),
Column('email', String(20), nullable=False, key='email')
)

创建表 删除表drop_all 创建删除单个表 users.create(engine)

```
1
2
3
4
5
6
7
8
9
10
11
12
13

### 插入数据
``` ins = users.insert().values(name="mash", fullname='mashuai')
# print(str(ins))
# print(ins.compile().params)
## 执行
conn = engine.connect()
result = conn.execute(ins)

## or 这样插入数据
ins = users.insert()
conn = engine.connect()
conn.execute(ins, name="mash1", fullname="mashuai1")

执行多个语句

1
2
3
4
5
6
conn = engine.connect()
conn.execute(ins, [
{"name":"mash2", "fullname":"mashuai2"},
{"name":"mash3", "fullname":"mashuai3"},
{"name":"mash4", "fullname":"mashuai4"},
])

select

from sqlalchemy.sql import select

1
2
3
4
5
6
7
result = engine.connect().execute(s)
for row in result:
print(row)
# print(result.fetchone())
## 还有另一种方法,其效用将在以后变得明显, 是使用Column直接作为键的对象:
for row in result:
print("name:", row[users.c.name], ", fullname:", row[users.c.fullname])

如果想要控制select的列

1
2
3
result = engine.connect().execute(s)
for row in result:
print(row)

####加条件

1
2
result = engine.connect().execute(s)
print(result.fetchone())

算子

1
print(users.c.id == 2)

连词 and_ or_ not_

排序 及 函数 .order_by(users.c.name.desc())

sqlalchemy import func, desc
1
2
3
4
5
stmt = select([users.c.name, func.count(users.c.id).label('count')]).group_by('id').order_by(desc('id'), desc('name'))
result = engine.connect().execute(stmt).fetchall()
for row in result:
print(row)
print(row['count'])

使用别名

1
2
3
4
5
6
7
8
9
10
11
12
### 连接 join() outerjoin()
``` print(users.join(addresses))
print(users.join(addresses,
addresses.c.email_address.like(users.c.name + '%')
)
)
s = select([users.c.fullname]).select_from(
users.join(addresses, addresses.c.email.like(users.c.name + '%'))
)
result = engine.connect().execute(s).fetchall()
s = select([users.c.fullname]).select_from(users.outerjoin(addresses))
print(s)

绑定参数对象

from sqlalchemy.sql import bindparam

1
2
3
result = engine.connect().execute(s, username='mash4').fetchall()
for row in result:
print(row)

排序 分组 限制 偏移

1
.having(users.c.name != 'mash1').distinct().limit(1).offset(1)

插入 更新 删除

1
engine.connect().execute(s)
1
2
3
4
engine.connect().execute(s, [
{'oldname':"mash1", 'newname':'mash11'},
{'oldname':"mash2", 'newname':'mash22'},
])

删除

# delete from addresses
1
engine.connect().execute(users.delete().where(users.c.name == 'mash4'))

获取影响行数

1
result.rowcount
分享