SQLAlchemy是Python编程语言下的一款开源软件。提供了SQL工具包及对象关系映射(ORM)工具,使用MIT许可证发行。
SQLAlchemy“采用简单的Python语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型”。SQLAlchemy的理念是,SQL数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。因此,SQLAlchmey采用了类似于Java里Hibernate的数据映射模型,而不是其他ORM框架采用的Active Record模型。不过,Elixir和declarative等可选插件可以让用户使用声明语法。
SQLAlchemy首次发行于2006年2月,并迅速地在Python社区中最广泛使用的ORM工具之一,不亚于Django的ORM框架。
1. 安装
1
2
|
# pip install sqlalchemy
# pip install pymysql
|
2. 创建引擎
1
2
|
>>> from sqlalchemy import create_engine
>>> engine = create_engine(‘mysql+pymysql://USERNAME:PASSWORD@DB_HOST:DB_PORT/DB_NAME’, pool_recycle=3600)
|
pool_recycle 指定连接池收回时间。
引擎初始化后,就可以连接数据库了。
3. 连接数据库
1
|
>>> connection = engine.connect()
|
4. metadata
1
2
|
>>> from sqlalchemy import MetaData
>>> metadata = MetaData()
|
5. 定义表
1
2
3
4
5
6
7
8
9
10
11
|
>>> from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey, DateTime
>>> from datetime import datetime
>>> users = Table(‘users’, metadata,
... Column(‘user_id’, Integer(), primary_key=True),
... Column(‘username’, String(15), nullable=False, unique=True),
... Column(’email_address’, String(255), nullable=False),
... Column(‘phone’, String(20), nullable=False),
... Column(‘password’, String(25), nullable=False),
... Column(‘created_on’, DateTime(), default=datetime.now),
... Column(‘updated_on’, DateTime(), default=datetime.now, onupdate=datetime.now)
... )
|
6. 创建表
1
|
>>> metadata.create_all(engine)
|
7. 插入数据
1
2
3
4
5
6
7
8
9
10
11
|
>>> ins = users.insert().values(
... user_id=8888,
... username=‘ttlsa.com’,
... email_address = ‘support@ttlsa.com’,
... phone = 12345678901,
... password = ‘www.ttlsa.com’
... )
>>> print ins
INSERT INTO users (user_id, username, email_address, phone, password, created_on, updated_on) VALUES (:user_id, :username, :email_address, :phone, :password, :created_on, :updated_on)
>>> ins.compile().params
{‘username’: ‘ttlsa.com’, ‘user_id’: 8888, ‘phone’: 12345678901, ‘created_on’: None, ‘updated_on’: None, ‘password’: ‘www.ttlsa.com’, ’email_address’: ‘support@ttlsa.com’}
|
8. 执行
1
2
3
|
>>> result = connection.execute(ins)
>>> result.inserted_primary_key
[8888]
|
9. 插入多条
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
>>> ins = users.insert()
>>> multi_data = [
... {
... ‘user_id’:1,
... ‘username’:‘u1’,
... ’email_address’:‘u1@ttlsa.com’,
... ‘phone’ : 12345678901,
... ‘password’: ‘www.ttlsa.com’
... },
... {
... ‘user_id’:2,
... ‘username’:‘u2’,
... ’email_address’:‘u2@ttlsa.com’,
... ‘phone’ : 12345678901,
... ‘password’: ‘www.ttlsa.com’
... }
... ]
>>> result = connection.execute(ins, multi_data)
>>> result.rowcount
2
|
10. 查询
1
2
3
4
5
6
7
8
9
|
>>> from sqlalchemy import select
>>> s = select([users])
>>> print str(s)
SELECT users.user_id, users.username, users.email_address, users.phone, users.password, users.created_on, users.updated_on
FROM users
>>> rp = connection.execute(s)
>>> results = rp.fetchall()
>>> print results
[(1, ‘u1’, ‘u1@ttlsa.com’, ‘12345678901’, ‘www.ttlsa.com’, datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 11, 26, 5)), (2, ‘u2’, ‘u2@ttlsa.com’, ‘12345678901’, ‘www.ttlsa.com’, datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 11, 26, 5)), (8888, ‘ttlsa.com’, ‘support@ttlsa.com’, ‘12345678901’, ‘www.ttlsa.com’, datetime.datetime(2016, 6, 22, 11, 13, 32), datetime.datetime(2016, 6, 22, 11, 13, 32))]
|
1
2
3
4
5
6
7
8
9
|
>>> first_row = results[0]
>>> print first_row
(1, ‘u1’, ‘u1@ttlsa.com’, ‘12345678901’, ‘www.ttlsa.com’, datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 11, 26, 5))
>>> first_row[1]
‘u1’
>>> first_row.phone
‘12345678901’
>>> first_row[users.c.user_id]
1
|
1
2
3
4
5
6
7
|
>>> rp = connection.execute(s)
>>> for record in rp:
... print record.username
...
u1
u2
ttlsa.com
|
1
2
3
4
5
6
7
8
9
10
11
12
|
>>> s = select([users.c.user_id, users.c.username])
>>> rp = connection.execute(s)
>>> print rp.keys()
[‘user_id’, ‘username’]
>>> print rp.fetchone()
(8888, ‘ttlsa.com’)
>>> print rp.fetchone()
(1, ‘u1’)
>>> print rp.fetchone()
(2, ‘u2’)
>>> print rp.fetchone()
None
|
11. 排序
1
2
3
4
5
6
7
8
9
10
11
12
|
>>> s = select([users.c.user_id, users.c.username])
>>> s = s.order_by(users.c.user_id)
>>> print str(s)
SELECT users.user_id, users.username
FROM users ORDER BY users.user_id
>>> rp = connection.execute(s)
>>> for i in rp:
... print (‘{} – {}’.format(i.user_id, i.username))
...
1 – u1
2 – u2
8888 – ttlsa.com
|
1
2
3
4
5
|
>>> s = select([users.c.user_id, users.c.username])
>>> s = s.order_by(desc(users.c.user_id))
>>> rp = connection.execute(s)
>>> print ([‘{} – {}’.format(i.user_id, i.username) for i in rp])
[‘8888 – ttlsa.com’, ‘2 – u2’, ‘1 – u1’]
|
1
2
3
4
|
>>> s = select([users.c.user_id, users.c.username]).where(users.c.user_id.in_([1,2]))
>>> rp = connection.execute(s)
>>> print ([‘{} – {}’.format(i.user_id, i.username) for i in rp])
[‘1 – u1’, ‘2 – u2’]
|
12. 更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
>>> from sqlalchemy import update
>>> u = update(users).where(users.c.username == ‘u1’)
>>> u = u.values(phone=00000000000)
>>> print str(u)
UPDATE users SET phone=:phone, updated_on=:updated_on WHERE users.username = :username_1
>>> result = connection.execute(u)
>>> print result
<sqlalchemy.engine.result.ResultProxy object at 0x7f6839f1a290>
>>> print result.rowcount
1
>>> s = select([users]).where(users.c.username == ‘u1’)
>>> result = connection.execute(s).first()
>>> print result
(1, ‘u1’, ‘u1@ttlsa.com’, ‘0’, ‘www.ttlsa.com’, datetime.datetime(2016, 6, 22, 11, 26, 5), datetime.datetime(2016, 6, 22, 13, 51, 16))
>>> print result.keys()
[‘user_id’, ‘username’, ’email_address’, ‘phone’, ‘password’, ‘created_on’, ‘updated_on’]
>>> for key in result.keys():
... print(‘{:>20}: {}’.format(key, result[key]))
...
user_id: 1
username: u1
email_address: u1@ttlsa.com
phone: 0
password: www.ttlsa.com
created_on: 2016–06–22 11:26:05
updated_on: 2016–06–22 13:51:16
|
13. 删除
1
2
3
4
5
6
7
8
9
10
11
|
>>> from sqlalchemy import delete
>>> u = delete(users).where(users.c.username == ‘u1’)
>>> result = connection.execute(u)
>>> print result.rowcount
1
>>> print str(u)
DELETE FROM users WHERE users.username = :username_1
>>> s = select([users]).where(users.c.username == ‘u1’)
>>> result = connection.execute(s).fetchall()
>>> print result
[]
|
文章转载来自:ttlsa.com