前言
Model层最基本的功能是操作数据库,一般的框架(比如java web中的SSH)都具有ORM(对象关系映射)的功能,也就是不需要使用sql语句而是直接通过面向对象的方式来读写数据。为了简单起见,下面将要实现的model层只提供了底层的数据库接口以及简单的ORM功能,当然可以通过配置使用SQLAlchemy,使得它支持更全面的ORM功能。
使用mysql-connector-python
最流行的数据库是mysql,python操作mysql需要调用mysql的python驱动(MySQL Connector/Python),可以从这里下载安装。也可以使用pip install mysql-connector-python安装。先看几个使用该驱动的例子:
1. 建立连接
import mysql.connector
# 默认连接的是localhost:3307,use_unicode=True,让MySQL的DB-API始终返回Unicode
conn = mysql.connector.connect(user='root', password='', database='test', use_unicode=True)
2. 创建表
所有的DDL语句都是通过游标来执行的
cursor = conn.cursor()
cursor.execute('create table user (id int unsigned primary key auto_increment, name varchar(20), birthday date)')
此时test数据库中就已经创建好了user表
3. 插入数据
DML语句也是通过游标来执行的,excute函数的第一个参数是要执行的语句,使用%s作为占位符,python驱动会自动将python的数据类型转换成mysql的类型并加上必要的引号,下面的日期会转化成'199308-09';参数通过list或tuple传进去
cursor.execute('insert into user (name,birthday) values (%s, %s)', ['Asoka',date(1993, 8, 9)])
同样的可以一次插入多条记录
cursor.executemany('insert into user (name,birthday) values (%s, %s)', [('Kitty',date(1993, 8, 10)),('Michael',date(1993,8,11))])
返回受影响的行数
cursor.rowcount
也可以使用dict作为参数
args={'name':'Mila','birth':date(1993,8,8)}
cursor.execute('insert into user (name,birthday) values (%(name)s, %(birth)s)', args)
可以通过cursor的lastrowid获得自增的id
cursor.lastrowid
执行修改后一定要执行commit方法,因为mysql默认使用的数据表是事务类型的InnoDB,而且mysql驱动的auto-commit是关闭的,此时user表中还没有这条记录,不信可以在mysql.exe中执行select * from user;结果为empty。
conn.commit()
MySQL的官网上给出了如下说明:
When you use a transactional storage engine such as InnoDB (the default in MySQL 5.5 and later), you must commit the data after a sequence of INSERT, DELETE, and UPDATE statements.
更新和删除操作同上。
4. 查询
cursor.execute('select * from user')
从cursor中读取查询结果有多种方式:
-
取出游标指向的当前行,返回的结果是一个tuple
cursor.fetchone()
此时返回第一个结果,如果再次执行fetchOne,返回下一条结果
-
还可以返回所有行(从游标当前位置开始),返回的结果是一个由tuple组成的list,每个tuple代表一条记录
cursor.fetchall()
如果要指定返回的行数可以调用fetchmany
-
还可以对cursor直接迭代所有行
for (id, name, birth) in cursor: print "%s(id:%s) was born on %s"%(name, id, birth)
注意:如果当前cursor没有读完就执行execute或close方法将会抛出异常:
mysql.connector.errors.InternalError: Unread result found
避免这种情况的方法是创建CursorBuffered,这种cursor将查询结果一次性传输到客户端,而不是每次执行fetch的时候再进行数据传输
cursor = conn.cursor(buffered=True)
最后关闭游标
cursor.close()
5. 事务回滚
撤销当前事务所做的所有更改
cursor.execute('insert into user (name,birthday) values (%s, %s)', ['Anonymous',date(1993, 8, 9)])
conn.rollback()
6. 关闭连接
conn.close()
封装DB
由上可见,很多操作都是固定的,比如数据库建立连接,关闭连接,获得游标,关闭游标、提交等,如果再抽象一层,可以将这些固定的操作省去。我将实现一个mysql_db模块,该模块提供select、update和insert三个函数,当然调用这些函数还是需要sql语句的,这个模块带来的好处是我们只需要建立一个数据库的实例,就可以对他执行任何sql语句,不需要考虑其他的事情,而且它还支持事务处理。
先从需求入手,我希望提供一个数据库名就可以得到一个数据库实例,之后调用该实例的方法可以轻松的操作数据库,比如:
db = DbCtx('default')
db.update('insert into user (name,birthday) values (%s, %s)', 'Kitty', date(1993, 8, 10))
1. DbCtx
构造函数和析构函数如下:
class DbCtx:
'''
A database context which use lazy connection.
'''
def __init__(self, db_name):
'''
common args are user, password, database, host, port
'''
# paras holds the connection info
if not db_name:
db_name='default'
self.params = DATABASES[db_name]
# turn auto commit on
self.params.update(autocommit=True)
self._conn = None
self.transactions = None
def __del__(self):
self.close()
print 'connection closed'
DATABASES配置信息保存在config.py配置文件中,
# config database info
DATABASES = {
'default': {
'user': 'root',
'password': '',
'database': 'test'
}
}
autocommit=True意味着每次执行update都及时生效,无需调用commit,mysql的驱动默认是False。
该类并非单实例类,因为数据库连接需要并发,所以在不同线程中创建该类的示例就建立了多个连接。
我们的构造函数并未创建连接,而是仅仅保存了配置信息。操作数据库是通过cursor完成的,所以第一次获取cursor时才真正连接数据库,这就是所谓的lazy connection,目的是节省资源,代码如下:
2. connect
@property
def connection(self):
if self._conn is None:
self._conn = mysql.connector.connect(**self.params)
return self._conn
def cursor(self, buf=False):
return self.connection.cursor(buffered=buf)
获取了游标,下面让我们来完善该类的功能
3. select
def select(self, sql, *args, **kw):
'''
The raw select method which return cursor directly
if you specific the buffered to be False then the cursor must be closed after use
'''
buffered = kw.get('buffered', True)
curs = self.cursor(buffered)
curs.execute(sql, args)
return curs
def select_dict(self, sql,*args,**kw):
'''
Return list of dict
'''
try:
cursor=self.select(sql,*args)
names = [x[0] for x in cursor.description]
first=kw.get('first',False)
if first:
values = cursor.fetchone()
if not values:
return None
return dict(zip(names, values))
return [dict(zip(names, x)) for x in cursor.fetchall()]
finally:
if cursor:
cursor.close()
第一个select返回的是游标对象,如果创建cursor时buffered参数为False,则意味着用户使用完查询的结果必须要手动关闭该游标,否则发起下一次查询将会报错,因为此时的cursor还在和数据库服务器建立者连接。如果使用了buffered cursor则可以不必关闭。
第二个select返回的是包含dict的list的查询结果,可以通过传入first=True来限定值获取第一个记录。
4. update
def update(self, sql, *args):
'''
For update and delete
Return the row number affected
'''
try:
cur = self.cursor()
cur.execute(sql, args)
return cur.rowcount
finally:
cur.close()
def insert(self,table,**kw):
keys=kw.keys()
values=kw.values()
import pdb
sql='INSERT INTO %s(%s) VALUES(%s)'%(table, ','.join(keys), ','.join('%s' for i in range(len(keys))))
try:
cur=self.cursor()
cur.execute(sql,values)
return cur.lastrowid
finally:
cur.close()
update返回受影响的行数,insert返回插入的记录的id,这个非常有用。 insert的第一个参数是表名,然后传入键值对。
5. close
def close(self):
self.connection.close()
self._conn = None
该方法不需要手动调用,在Dbctx实例销毁的时候会被自动调用。
6. transaction
class TransactionCtx:
'''
This class can be used by 'with' statement to execute transaction
'''
def __init__(self, db_ctx):
self.db = db_ctx
def __enter__(self):
logging.info('start transaction...')
self.db.connection.start_transaction()
def __exit__(self, exctype, excvalue, traceback):
if exctype is None:
logging.info('commit transaction...')
self.db.connection.commit()
logging.info('commit ok.')
else:
self.db.connection.rollback()
logging.warning('rollback ok.')
在事务开始之前,发送start_trasction命令,如果遇到异常,则调用rollback回滚。
使用很简单:
db = DbCtx()
with TransactionCtx(db):
db.update(
'insert into user (name,birthday) values (%s, %s)', 'Asoka', date.today())
db.update(
'insert into user (name,birthday) values (%s, %s)', 'Alice', date.today())
说明
为什么connection、cursor、close函数前面没有_?将这些函数直接暴露给用户是为了提供最大的灵活性,总会有些情况是必须要直接操作连接对象或游标对象才能完成的。
编写ORM
直接写sql的缺点是工作量大,而且容易出粗,尤其是查询结果转化成对象,或者将对象保存到数据库都很麻烦,于是ORM应运而生。 ORM(对象关系映射)能让我们直接调用对象的方法来操作数据库,而无须小心的构造每一条sql语句。 我将实现一个Model类,web应用中的实体类只要继承该类就和数据库中的表对应起来了。 先来看Model类的定义吧:
class Model(dict):
__metaclass__ = ModelMetaclass
def __init__(self, **kw):
super(Model, self).__init__(**kw)
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError("'%s' instance has no attribute '%s'" % (self.__class__.__name__, key))
def __setattr__(self, key, value):
self[key]=value
Model类继承了dict,使得可以像dict构造函数一样初始化model实例,比如User(name='yxr',age=22)
通过__metaclass__
指定了它的元类,一般元指的就是用来定义一种东西的东西。类似于装饰器用来增强函数的功能一样,元类可以修改类的行为。
ModelMetaclass的定义如下:
class ModelMetaclass(type):
def __new__(cls, name, bases, attrs):
# skip base Model class:
if name == 'Model':
return type.__new__(cls, name, bases, attrs)
if not '__database__' in attrs:
attrs['__database__'] = 'default'
if not '__table__' in attrs:
attrs['__table__'] = name.lower()
attrs['db'] = DbCtx(attrs['__database__'])
return type.__new__(cls, name, bases, attrs)
这个类有什么用呢?它在实体类中保存了有关数据库连接的信息。在创建一个实体类的时候,比如:
class User(Model):
__table__='user'
__database__='default'
__fields__=('name', 'birthday')
通过以下三个参数控制ORM的对应关系:
__table__
:指定了该实体对应的表明,默认为小写的实体名__database__
:指定了数据库名,默认为default__fields__
:必填,指定了该实体类所使用的字段
ModelMetaclass类会在User类中保存四个类变量,除了以上三还包括db变量,该变量就是该实体类的数据库连接上下文(DbCtx),但是并未真正建立连接。 在Model类中定义的方法将被所有实体类继承,让我们来丰富它的功能吧:
@classmethod
def get(cls, id):
'''
Fetch from database by primary key id
'''
d=cls.db.select_dict(
'select * from %s where id=%s' % (cls.__table__, id), first=True)
return cls(**d) if d else None
@classmethod
def filter(cls, where='', *args):
'''
Fetch a list of object according to the condition
'''
sql='select * from `%s` %s' % (cls.__table__,'where %s'%where if where else '')
L=cls.db.select_dict(sql,*args)
return [cls(**d) for d in L]
@classmethod
def filter_one(cls, where='', *args):
'''
Fetch the first object according to the condition
'''
L=cls.filter(where,*args)
if len(L)>0:
return L[0]
return None
def save(self):
'''
Persisted to the database
'''
params={}
for k in self.__fields__:
if hasattr(self, k):
params[k]=getattr(self, k)
# update or insert
if hasattr(self, 'id'):
sql='UPDATE %s SET %s WHERE id=%s' % (self.__table__,
','.join(
['{}=%s'.format(key) for key in params.keys()]),
self.id)
self.db.update(sql, *params.values())
else:
self.id=self.db.insert(self.__table__, **params)
def remove(self):
sql='DELETE FROM {} WHERE id=%s'.format(self.__table__)
self.db.update(sql,self.id)
self=None
每个方法都有注释,这里不再解释。