Playhouse, a collection of addons

Peewee comes with numerous extras which I didn’t really feel like including in the main source module, but which might be interesting to implementers or fun to mess around with.

The playhouse includes modules for different database drivers or database specific functionality:

Modules which expose higher-level python constructs:

As well as tools for working with databases:

apsw, an advanced sqlite driver

The apsw_ext module contains a database class suitable for use with the apsw sqlite driver.

APSW Project page: https://code.google.com/p/apsw/

APSW is a really neat library that provides a thin wrapper on top of SQLite’s C interface, making it possible to use all of SQLite’s advanced features.

Here are just a few reasons to use APSW, taken from the documentation:

  • APSW gives all functionality of SQLite, including virtual tables, virtual file system, blob i/o, backups and file control.
  • Connections can be shared across threads without any additional locking.
  • Transactions are managed explicitly by your code.
  • APSW can handle nested transactions.
  • Unicode is handled correctly.
  • APSW is faster.

For more information on the differences between apsw and pysqlite, check the apsw docs.

How to use the APSWDatabase

from apsw_ext import *

db = APSWDatabase(':memory:')

class BaseModel(Model):
    class Meta:
        database = db

class SomeModel(BaseModel):
    col1 = CharField()
    col2 = DateTimeField()

apsw_ext API notes

class APSWDatabase(database, **connect_kwargs)
Parameters:
  • database (string) – filename of sqlite database
  • connect_kwargs – keyword arguments passed to apsw when opening a connection
transaction([lock_type='deferred'])

Functions just like the Database.transaction() context manager, but accepts an additional parameter specifying the type of lock to use.

Parameters:lock_type (string) – type of lock to use when opening a new transaction
register_module(mod_name, mod_inst)

Provides a way of globally registering a module. For more information, see the documentation on virtual tables.

Parameters:
  • mod_name (string) – name to use for module
  • mod_inst (object) – an object implementing the Virtual Table interface
unregister_module(mod_name)

Unregister a module.

Parameters:mod_name (string) – name to use for module

Note

Be sure to use the Field subclasses defined in the apsw_ext module, as they will properly handle adapting the data types for storage.

Postgresql Extensions

The postgresql extensions module provides a number of “postgres-only” functions, currently:

In the future I would like to add support for more of postgresql’s features. If there is a particular feature you would like to see added, please open a Github issue.

Warning

In order to start using the features described below, you will need to use the extension PostgresqlExtDatabase class instead of PostgresqlDatabase.

The code below will assume you are using the following database and base model:

from playhouse.postgres_ext import *

ext_db = PostgresqlExtDatabase('peewee_test', user='postgres')

class BaseExtModel(Model):
    class Meta:
        database = ext_db

hstore support

Postgresql hstore is an embedded key/value store. With hstore, you can store arbitrary key/value pairs in your database alongside structured relational data.

Currently the postgres_ext module supports the following operations:

  • Store and retrieve arbitrary dictionaries
  • Filter by key(s) or partial dictionary
  • Update/add one or more keys to an existing dictionary
  • Delete one or more keys from an existing dictionary
  • Select keys, values, or zip keys and values
  • Retrieve a slice of keys/values
  • Test for the existence of a key
  • Test that a key has a non-NULL value

Using hstore

To start with, you will need to import the custom database class and the hstore functions from playhouse.postgres_ext (see above code snippet). Then, it is as simple as adding a HStoreField to your model:

class House(BaseExtModel):
    address = CharField()
    features = HStoreField()

You can now store arbitrary key/value pairs on House instances:

>>> h = House.create(address='123 Main St', features={'garage': '2 cars', 'bath': '2 bath'})
>>> h_from_db = House.get(House.id == h.id)
>>> h_from_db.features
{'bath': '2 bath', 'garage': '2 cars'}

You can filter by keys or partial dictionary:

>>> f = House.features
>>> House.select().where(f.contains('garage')) # <-- all houses w/garage key
>>> House.select().where(f.contains(['garage', 'bath'])) # <-- all houses w/garage & bath
>>> House.select().where(f.contains({'garage': '2 cars'})) # <-- houses w/2-car garage

Suppose you want to do an atomic update to the house:

>>> f = House.features
>>> new_features = House.features.update({'bath': '2.5 bath', 'sqft': '1100'})
>>> query = House.update(features=new_features)
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'bath': '2.5 bath', 'garage': '2 cars', 'sqft': '1100'}

Or, alternatively an atomic delete:

>>> query = House.update(features=f.delete('bath'))
>>> query.where(House.id == h.id).execute()
1
>>> h = House.get(House.id == h.id)
>>> h.features
{'garage': '2 cars', 'sqft': '1100'}

Multiple keys can be deleted at the same time:

>>> query = House.update(features=f.delete('garage', 'sqft'))

You can select just keys, just values, or zip the two:

>>> f = House.features
>>> for h in House.select(House.address, f.keys().alias('keys')):
...     print h.address, h.keys

123 Main St [u'bath', u'garage']

>>> for h in House.select(House.address, f.values().alias('vals')):
...     print h.address, h.vals

123 Main St [u'2 bath', u'2 cars']

>>> for h in House.select(House.address, f.items().alias('mtx')):
...     print h.address, h.mtx

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]

You can retrieve a slice of data, for example, all the garage data:

>>> f = House.features
>>> for h in House.select(House.address, f.slice('garage').alias('garage_data')):
...     print h.address, h.garage_data

123 Main St {'garage': '2 cars'}

You can check for the existence of a key and filter rows accordingly:

>>> for h in House.select(House.address, f.exists('garage').alias('has_garage')):
...     print h.address, h.has_garage

123 Main St True

>>> for h in House.select().where(f.exists('garage')):
...     print h.address, h.features['garage'] # <-- just houses w/garage data

123 Main St 2 cars

JSON Support

peewee has basic support for Postgres’ native JSON data type, in the form of JSONField.

Warning

Postgres supports a JSON data type natively as of 9.2 (full support in 9.3). In order to use this functionality you must be using the correct version of Postgres with psycopg2 version 2.5 or greater.

Note

You must be sure your database is an instance of PostgresqlExtDatabase in order to use the JSONField.

Here is an example of how you might declare a model with a JSON field:

import json
import urllib2
from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('my_database')  # note

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

    @classmethod
    def request(cls, url):
        fh = urllib2.urlopen(url)
        return cls.create(url=url, response=json.loads(fh.read()))

APIResponse.create_table()

# Store a JSON response.
offense = APIResponse.request('http://wtf.charlesleifer.com/api/offense/')
booking = APIResponse.request('http://wtf.charlesleifer.com/api/booking/')

# Query a JSON data structure using a nested key lookup:
offense_responses = APIResponse.select().where(
  APIResponse.response['meta']['model'] == 'offense')

# Retrieve a sub-key for each APIResponse. By calling .as_json(), the
# data at the sub-key will be returned as Python objects (dicts, lists,
# etc) instead of serialized JSON.
q = (APIResponse
     .select(
       APIResponse.data['booking']['person'].as_json().alias('person'))
     .where(
       APIResponse.data['meta']['model'] == 'booking'))

for result in q:
    print result.person['name'], result.person['dob']

For more examples, see the JSONField API documentation below.

Server-side cursors

When psycopg2 executes a query, normally all results are fetched and returned to the client by the backend. This can cause your application to use a lot of memory when making large queries. Using server-side cursors, results are returned a little at a time (by default 2000 records). For the definitive reference, please see the psycopg2 documentation.

Note

To use server-side (or named) cursors, you must be using PostgresqlExtDatabase.

To execute a query using a server-side cursor, simply wrap your select query using the ServerSide() helper:

large_query = PageView.select()  # Build query normally.

# Iterate over large query inside a transaction.
for page_view in ServerSide(large_query):
    # do some interesting analysis here.
    pass

# Server-side resources are released.

If you would like all SELECT queries to automatically use a server-side cursor, you can specify this when creating your PostgresqlExtDatabase:

from postgres_ext import PostgresqlExtDatabase

ss_db = PostgresqlExtDatabase('my_db', server_side_cursors=True)

Note

Server-side cursors live only as long as the transaction, so for this reason peewee will not automatically call commit() after executing a SELECT query. If you do not commit after you are done iterating, you will not release the server-side resources until the connection is closed (or the transaction is committed later). Furthermore, since peewee will by default cache rows returned by the cursor, you should always call .iterator() when iterating over a large query.

If you are using the ServerSide() helper, the transaction and call to iterator() will be handled transparently.

postgres_ext API notes

class PostgresqlExtDatabase(database[, server_side_cursors=False[, register_hstore=True[, ...]]])

Identical to PostgresqlDatabase but required in order to support:

Parameters:
  • database (str) – Name of database to connect to.
  • server_side_cursors (bool) – Whether SELECT queries should utilize server-side cursors.
  • register_hstore (bool) – Register the HStore extension with the connection.

If using server_side_cursors, also be sure to wrap your queries with ServerSide().

If you do not wish to use the HStore extension, you can specify register_hstore=False.

ServerSide(select_query)

Wrap the given select query in a transaction, and call it’s iterator() method to avoid caching row instances. In order for the server-side resources to be released, be sure to exhaust the generator (iterate over all the rows).

Parameters:select_query – a SelectQuery instance.
Return type:generator

Usage:

large_query = PageView.select()
for page_view in ServerSide(large_query):
    # Do something interesting.
    pass

# At this point server side resources are released.
class ArrayField([field_class=IntegerField[, dimensions=1]])

Field capable of storing arrays of the provided field_class.

Parameters:
  • field_class – a subclass of Field, e.g. IntegerField.
  • dimensions (int) – dimensions of array.

You can store and retrieve lists (or lists-of-lists):

class BlogPost(BaseModel):
    content = TextField()
    tags = ArrayField(CharField)


post = BlogPost(content='awesome', tags=['foo', 'bar', 'baz'])

Additionally, you can use the __getitem__ API to query values or slices in the database:

# Get the first tag on a given blog post.
first_tag = (BlogPost
             .select(BlogPost.tags[0].alias('first_tag'))
             .where(BlogPost.id == 1)
             .dicts()
             .get())

# first_tag = {'first_tag': 'foo'}

Get a slice of values:

# Get the first two tags.
two_tags = (BlogPost
            .select(BlogPost.tags[:2].alias('two'))
            .dicts()
            .get())
# two_tags = {'two': ['foo', 'bar']}
contains(*items)
Parameters:items – One or more items that must be in the given array field.
# Get all blog posts that are tagged with both "python" and "django".
Blog.select().where(Blog.tags.contains('python', 'django'))
contains_any(*items)
Parameters:items – One or more items to search for in the given array field.

Like contains(), except will match rows where the array contains any of the given items.

# Get all blog posts that are tagged with "flask" and/or "django".
Blog.select().where(Blog.tags.contains_any('flask', 'django'))
class DateTimeTZField(*args, **kwargs)

A timezone-aware subclass of DateTimeField.

class HStoreField(*args, **kwargs)

A field for storing and retrieving arbitrary key/value pairs. For details on usage, see hstore support.

keys()

Returns the keys for a given row.

>>> f = House.features
>>> for h in House.select(House.address, f.keys().alias('keys')):
...     print h.address, h.keys

123 Main St [u'bath', u'garage']
values()

Return the values for a given row.

>>> for h in House.select(House.address, f.values().alias('vals')):
...     print h.address, h.vals

123 Main St [u'2 bath', u'2 cars']
items()

Like python’s dict, return the keys and values in a list-of-lists:

>>> for h in House.select(House.address, f.items().alias('mtx')):
...     print h.address, h.mtx

123 Main St [[u'bath', u'2 bath'], [u'garage', u'2 cars']]
slice(*args)

Return a slice of data given a list of keys.

>>> f = House.features
>>> for h in House.select(House.address, f.slice('garage').alias('garage_data')):
...     print h.address, h.garage_data

123 Main St {'garage': '2 cars'}
exists(key)

Query for whether the given key exists.

>>> for h in House.select(House.address, f.exists('garage').alias('has_garage')):
...     print h.address, h.has_garage

123 Main St True

>>> for h in House.select().where(f.exists('garage')):
...     print h.address, h.features['garage'] # <-- just houses w/garage data

123 Main St 2 cars
defined(key)

Query for whether the given key has a value associated with it.

update(**data)

Perform an atomic update to the keys/values for a given row or rows.

>>> query = House.update(features=House.features.update(
...     sqft=2000,
...     year_built=2012))
>>> query.where(House.id == 1).execute()
delete(*keys)

Delete the provided keys for a given row or rows.

Note

We will use an UPDATE query.

>>> query = House.update(features=House.features.delete(
...     'sqft', 'year_built'))
>>> query.where(House.id == 1).execute()
contains(value)
Parameters:value – Either a dict, a list of keys, or a single key.

Query rows for the existence of either:

  • a partial dictionary.
  • a list of keys.
  • a single key.
>>> f = House.features
>>> House.select().where(f.contains('garage')) # <-- all houses w/garage key
>>> House.select().where(f.contains(['garage', 'bath'])) # <-- all houses w/garage & bath
>>> House.select().where(f.contains({'garage': '2 cars'})) # <-- houses w/2-car garage
contains_any(*keys)
Parameters:keys – One or more keys to search for.

Query rows for the existince of any key.

class JSONField(dumps=None, *args, **kwargs)

Field class suitable for storing and querying arbitrary JSON. When using this on a model, set the field’s value to a Python object (either a dict or a list). When you retrieve your value from the database it will be returned as a Python data structure.

Parameters:dumps – The default is to call json.dumps() or the dumps function. You can override this method to create a customized JSON wrapper.

Note

You must be using Postgres 9.2 / psycopg2 2.5 or greater.

Example model declaration:

db = PostgresqlExtDatabase('my_db')

class APIResponse(Model):
    url = CharField()
    response = JSONField()

    class Meta:
        database = db

Example of storing JSON data:

url = 'http://foo.com/api/resource/'
resp = json.loads(urllib2.urlopen(url).read())
APIResponse.create(url=url, response=resp)

APIResponse.create(url='http://foo.com/baz/', response={'key': 'value'})

To query, use Python’s [] operators to specify nested key or array lookups:

APIResponse.select().where(
    APIResponse.response['key1']['nested-key'] == 'some-value')

To illustrate the use of the [] operators, imagine we have the following data stored in an APIResponse:

{
  "foo": {
    "bar": ["i1", "i2", "i3"],
    "baz": {
      "huey": "mickey",
      "peewee": "nugget"
    }
  }
}

Here are the results of a few queries:

def get_data(expression):
    # Helper function to just retrieve the results of a
    # particular expression.
    query = (APIResponse
             .select(expression.alias('my_data'))
             .dicts()
             .get())
    return query['my_data']

# Accessing the foo -> bar subkey will return a JSON
# representation of the list.
get_data(APIResponse.data['foo']['bar'])
# '["i1", "i2", "i3"]'

# In order to retrieve this list as a Python list,
# we will call .as_json() on the expression.
get_data(APIResponse.data['foo']['bar'].as_json())
# ['i1', 'i2', 'i3']

# Similarly, accessing the foo -> baz subkey will
# return a JSON representation of the dictionary.
get_data(APIResponse.data['foo']['baz'])
# '{"huey": "mickey", "peewee": "nugget"}'

# Again, calling .as_json() will return an actual
# python dictionary.
get_data(APIResponse.data['foo']['baz'].as_json())
# {'huey': 'mickey', 'peewee': 'nugget'}

# When dealing with simple values, either way works as
# you expect.
get_data(APIResponse.data['foo']['bar'][0])
# 'i1'

# Calling .as_json() when the result is a simple value
# will return the same thing as the previous example.
get_data(APIResponse.data['foo']['bar'][0].as_json())
# 'i1'
Match(field, query)

Generate a full-text search expression, automatically converting the left-hand operand to a tsvector, and the right-hand operand to a tsquery.

Example:

def blog_search(query):
    return Blog.select().where(
        (Blog.status == Blog.STATUS_PUBLISHED) &
        Match(Blog.content, query))
class TSVectorField

Field type suitable for storing tsvector data. This field will automatically be created with a GIN index for improved search performance.

Note

Data stored in this field will still need to be manually converted to the tsvector type.

Example usage:

class Blog(Model):
    content = TextField()
    search_content = TSVectorField()

content = 'this is a sample blog entry.'
blog_entry = Blog.create(
    content=content,
    search_content=fn.to_tsvector(content))  # Note `to_tsvector()`.

Sqlite Extensions

The SQLite extensions module provides support for some interesting sqlite-only features:

  • Define custom aggregates, collations and functions.
  • Support for FTS3/4 (sqlite full-text search).
  • Specify isolation level in transactions.
  • Basic support for virtual tables.

sqlite_ext API notes

class SqliteExtDatabase(database, **kwargs)

Subclass of the SqliteDatabase that provides some advanced features only offered by Sqlite.

  • Register custom aggregates, collations and functions
  • Specify a row factory
  • Advanced transactions (specify isolation level)
aggregate(num_params[, name])

Class-decorator for registering custom aggregation functions.

Parameters:
  • num_params – integer representing number of parameters the aggregate function accepts.
  • name – string name for the aggregate, defaults to the name of the class.
@db.aggregate(1, 'product')
class Product(object):
    """Like sum, except calculate the product of a series of numbers."""
    def __init__(self):
        self.product = 1

    def step(self, value):
        self.product *= value

    def finalize(self):
        return self.product

# To use this aggregate:
product = (Score
           .select(fn.product(Score.value))
           .scalar())
collation([name])

Function decorator for registering a custom collation.

Parameters:name – string name to use for this collation.
@db.collation()
def collate_reverse(s1, s2):
    return -cmp(s1, s2)

# To use this collation:
Book.select().order_by(collate_reverse.collation(Book.title))

As you might have noticed, the original collate_reverse function has a special attribute called collation attached to it. This extra attribute provides a shorthand way to generate the SQL necessary to use our custom collation.

func([name[, num_params]])

Function decorator for registering user-defined functions.

Parameters:
  • name – name to use for this function.
  • num_params – number of parameters this function accepts. If not provided, peewee will introspect the function for you.
@db.func()
def title_case(s):
    return s.title()

# Use in the select clause...
titled_books = Book.select(fn.title_case(Book.title))

@db.func()
def sha1(s):
    return hashlib.sha1(s).hexdigest()

# Use in the where clause...
user = User.select().where(
    (User.username == username) &
    (fn.sha1(User.password) == password_hash)).get()
granular_transaction([lock_type='deferred'])

With the granular_transaction helper, you can specify the isolation level for an individual transaction. The valid options are:

  • exclusive
  • immediate
  • deferred

Example usage:

with db.granular_transaction('exclusive'):
    # no other readers or writers!
    (Account
     .update(Account.balance=Account.balance - 100)
     .where(Account.id == from_acct)
     .execute())

    (Account
     .update(Account.balance=Account.balance + 100)
     .where(Account.id == to_acct)
     .execute())
class VirtualModel

Subclass of Model that signifies the model operates using a virtual table provided by a sqlite extension.

_extension = 'name of sqlite extension'
class FTSModel

Model class that provides support for Sqlite’s full-text search extension. Models should be defined normally, however there are a couple caveats:

  • Indexes are ignored completely
  • Sqlite will treat all column types as TextField (although you can store other data types, Sqlite will treat them as text).

Therefore it usually makes sense to index the content you intend to search and a single link back to the original document, since all SQL queries except full-text searches and rowid lookups will be slow.

Example:

class Document(FTSModel):
    title = TextField()  # type affinities are ignored by FTS, so use TextField
    content = TextField()

Document.create_table(tokenize='porter')  # use the porter stemmer.

# populate documents using normal operations.
for doc in list_of_docs_to_index:
    Document.create(title=doc['title'], content=doc['content'])

# use the "match" operation for FTS queries.
matching_docs = (Document
                 .select()
                 .where(Document.match('some query')))

# to sort by best match, use the custom "rank" function.
best = (Document
        .select(Document, Rank(Document).alias('score'))
        .where(Document.match('some query'))
        .order_by(SQL('score').desc()))

# or use the shortcut method:
best = Document.search('some phrase')

# you can also use the BM25 algorithm to rank documents:
best = (Document
        .select(
            Document,
            Document.bm25(Document.content).alias('score'))
        .where(Document.match('some query'))
        .order_by(SQL('score').desc()))

# There is a shortcut method for bm25 as well:
best_bm25 = Document.search_bm25('some phrase')

# BM25 allows you to specify a column if your FTS model contains
# multiple fields.
best_bm25 = Document.search_bm25('some phrase', Document.content)

If you have an existing table and would like to add search for a column on that table, you can specify it using the content option:

class Blog(Model):
    title = CharField()
    pub_date = DateTimeField()
    content = TextField()  # we want to search this.

class FTSBlog(FTSModel):
    content = TextField()

Blog.create_table()
FTSBlog.create_table(content=Blog.content)

# Now, we can manage content in the FTSBlog.  To populate it with
# content:
FTSBlog.rebuild()

# Optimize the index.
FTSBlog.optimize()

The content option accepts either a single Field or a Model and can reduce the amount of storage used. However, content will need to be manually moved to/from the associated FTSModel.

classmethod create_table([fail_silently=False[, **options]])
Parameters:
  • fail_silently (boolean) – do not re-create if table already exists.
  • options – options passed along when creating the table, e.g. content.
classmethod rebuild()

Rebuild the search index – this only works when the content option was specified during table creation.

classmethod optimize()

Optimize the search index.

classmethod match(term)

Shorthand for generating a MATCH expression for the given term.

query = Document.select().where(Document.match('search phrase'))
for doc in query:
    print 'match: ', doc.title
classmethod rank()

Calculate the rank based on the quality of the match.

query = (Document
         .select(Document, Document.rank().alias('score'))
         .where(Document.match('search phrase'))
         .order_by(SQL('score').desc()))

for search_result in query:
    print search_result.title, search_result.score
classmethod bm25([field=None[, k=1.2[, b=0.75]]])

Calculate the rank based on the quality of the match using the BM25 algorithm.

Note

If no field is specified, then the first TextField on the model will be used. If no TextField is present, the first CharField will be used. Failing either of those conditions, the last overall field on the model will be used.

query = (Document
         .select(
             Document,
             Document.bm25(Document.content).alias('score'))
         .where(Document.match('search phrase'))
         .order_by(SQL('score').desc()))

for search_result in query:
    print search_result.title, search_result.score
classmethod search(term[, alias='score'])

Shorthand way of searching for a term and sorting results by the quality of the match. This is equivalent to the rank() example code presented above.

Parameters:
  • term (str) – Search term to use.
  • alias (str) – Alias to use for the calculated rank score.
docs = Document.search('search term')
for result in docs:
    print result.title, result.score
classmethod search_bm25(term[, field=None[, k=1.2[, b=0.75[, alias='score']]]])

Shorthand way of searching for a term and sorting results by the quality of the match, as determined by the BM25 algorithm. This is equivalent to the bm25() example code presented above.

Parameters:
  • term (str) – Search term to use.
  • field (Field) – A field on the model.
  • k (float) – Parameter for BM25
  • b (float) – Parameter for BM25
  • alias (str) – Alias to use for the calculated rank score.

Note

If no field is specified, then the first TextField on the model will be used. If no TextField is present, the first CharField will be used. Failing either of those conditions, the last overall field on the model will be used.

Note

BM25 only works with FTS4 tables.

docs = Document.search_bm25('search term')
for result in docs:
    print result.title, result.score
match(lhs, rhs)

Generate a SQLite MATCH expression for use in full-text searches.

Document.select().where(match(Document.content, 'search term'))
Rank(model_class)

Calculate the rank of the search results, for use with FTSModel queries using the MATCH operator.

# Search for documents and return results ordered by quality
# of match.
docs = (Document
        .select(Document, Rank(Document).alias('score'))
        .where(Document.match('some search term'))
        .order_by(SQL('score').desc()))
BM25(model_class, field_index)

Calculate the rank of the search results, for use with FTSModel queries using the MATCH operator.

Parameters:
  • model_class (Model) – The FTSModel on which the query is being performed.
  • field_index (int) – The 0-based index of the field being queried.
# Assuming the `content` field has index=2 (0=pk, 1=title, 2=content),
# calculate the BM25 score for each result.
docs = (Document
        .select(Document, BM25(Document, 2).alias('score'))
        .where(Document.match('search term'))
        .order_by(SQL('score').desc()))

Note

BM25 only works with FTS4 tables.

BerkeleyDB backend

BerkeleyDB provides a SQLite-compatible API. BerkeleyDB’s SQL API has many advantages over SQLite:

  • Higher transactions-per-second in multi-threaded environments.
  • Built-in replication and hot backup.
  • Fewer system calls, less resource utilization.
  • Multi-version concurrency control.

For more details, Oracle has published a short technical overview.

In order to use peewee with BerkeleyDB, you need to compile BerkeleyDB with the SQL API enabled. Then compile the Python SQLite driver against BerkeleyDB’s sqlite replacement.

Begin by downloading and compiling BerkeleyDB:

wget http://download.oracle.com/berkeley-db/db-6.0.30.tar.gz
tar xzf db-6.0.30.tar.gz
cd db-6.0.30/build_unix
export CFLAGS='-DSQLITE_ENABLE_FTS3=1 -DSQLITE_ENABLE_RTREE=1 -fPIC'
../dist/configure --enable-static --disable-shared --enable-sql --enable-sql-compat
make
sudo make prefix=/usr/local/ install

Then get a copy of the standard library SQLite driver and build it against BerkeleyDB:

git clone https://github.com/ghaering/pysqlite
cd pysqlite
sed -i "s|#||g" setup.cfg
python setup.py build
sudo python setup.py install

To simplify this process, peewee comes with a script that will automatically build the appropriate libraries for you. The berkeley_build.sh script can be found in the playhouse directory (or you can view the source online).

You can also find step by step instructions on my blog.

class BerkeleyDatabase(database, **kwargs)

Subclass of the SqliteExtDatabase that supports connecting to BerkeleyDB-backed version of SQLite.

Sqlcipher backend

Warning

This module is experimental.

  • Although this extention’s code is short, it has not been propery peer-reviewed yet and may have introduced vulnerabilities.
  • The code contains minimum values for passphrase length and kdf_iter, as well as a default value for the later. Do not regard these numbers as advice. Consult the docs at http://sqlcipher.net/sqlcipher-api/ and security experts.

Also note that this code relies on pysqlcipher and sqlcipher, and the code there might have vulnerabilities as well, but since these are widely used crypto modules, we can expect “short zero days” there.

sqlcipher_ext API notes

class SqlCipherDatabase(database, passphrase, kdf_iter=64000, **kwargs)

Subclass of SqliteDatabase that stores the database encrypted. Instead of the standard sqlite3 backend, it uses pysqlcipher: a python wrapper for sqlcipher, which – in turn – is an encrypted wrapper around sqlite3, so the API is identical to SqliteDatabase‘s, except for object construction parameters:

Parameters:
  • database – Path to encrypted database filename to open [or create].
  • passphrase – Database encryption passphrase: should be at least 8 character long (or an error is raised), but it is strongly advised to enforce better passphrase strength criteria in your implementation.
  • kdf_iter – [Optional] number of PBKDF2 iterations.
  • If the database file doesn’t exist, it will be created with encryption by a key derived from passhprase with kdf_iter PBKDF2 iterations.
  • When trying to open an existing database, passhprase and kdf_iter should be identical to the ones used when it was created.

Notes:

  • [Hopefully] there’s no way to tell whether the passphrase is wrong or the file is corrupt. In both cases – the first time we try to acces the database – a DatabaseError error is raised, with the exact message: "file is encrypted or is not a database".

    As mentioned above, this only happens when you access the databse, so if you need to know right away whether the passphrase was correct, you can trigger this check by calling [e.g.] get_tables() (see example below).

  • Most applications can expect failed attempts to open the database (common case: prompting the user for passphrase), so the database can’t be hardwired into the Meta of model classes, and a Proxy should be used instead.

Example:

db_proxy = peewee.Proxy()

class BaseModel(Model):
    """Parent for all app's models"""
    class Meta:
        # We won't have a valid db until user enters passhrase,
        # so we use a Proxy() instead.
        database = db_proxy

# Derive our model subclasses
class Person(BaseModel):
    name = CharField(primary_key=True)

right_passphrase = False
while not right_passphrase:
    passphrase = None
    db = SqlCipherDatabase('testsqlcipher.db',
                           get_passphrase_from_user())
    try:  # Error only gets triggered when we access the db
        db.get_tables()
        right_passphrase = True
    except DatabaseError as exc:
        # We only allow a specific [somewhat cryptic] error message.
        if exc.message != 'file is encrypted or is not a database':
            raise exc
    tell_user_the_passphrase_was_wrong()

# If we're here, db is ok, we can connect it to Model subclasses
db_proxy.initialize(db)

See also: a slightly more elaborate example.

Django Integration

The Django ORM provides a very high-level abstraction over SQL and as a consequence is in some ways limited in terms of flexibility or expressiveness. I wrote a blog post describing my search for a “missing link” between Django’s ORM and the SQL it generates, concluding that no such layer exists. The djpeewee module attempts to provide an easy-to-use, structured layer for generating SQL queries for use with Django’s ORM.

A couple use-cases might be:

  • Joining on fields that are not related by foreign key (for example UUID fields).
  • Performing aggregate queries on calculated values.
  • Features that Django does not support such as CASE statements.
  • Utilizing SQL functions that Django does not support, such as SUBSTR.
  • Replacing nearly-identical SQL queries with reusable, composable data-structures.

Below is an example of how you might use this:

# Django model.
class Event(models.Model):
    start_time = models.DateTimeField()
    end_time = models.DateTimeField()
    title = models.CharField(max_length=255)

# Suppose we want to find all events that are longer than an hour.  Django
# does not support this, but we can use peewee.
from playhouse.djpeewee import translate
P = translate(Event)
query = (P.Event
         .select()
         .where(
             (P.Event.end_time - P.Event.start_time) > timedelta(hours=1)))

# Now feed our peewee query into Django's `raw()` method:
sql, params = query.sql()
Event.objects.raw(sql, params)

Foreign keys and Many-to-many relationships

The translate() function will recursively traverse the graph of models and return a dictionary populated with everything it finds. Back-references are not searched by default, but can be included by specifying backrefs=True.

Example:

>>> from django.contrib.auth.models import User, Group
>>> from playhouse.djpeewee import translate
>>> translate(User, Group)
{'ContentType': peewee.ContentType,
 'Group': peewee.Group,
 'Group_permissions': peewee.Group_permissions,
 'Permission': peewee.Permission,
 'User': peewee.User,
 'User_groups': peewee.User_groups,
 'User_user_permissions': peewee.User_user_permissions}

As you can see in the example above, although only User and Group were passed in to translate(), several other models which are related by foreign key were also created. Additionally, the many-to-many “through” tables were created as separate models since peewee does not abstract away these types of relationships.

Using the above models it is possible to construct joins. The following example will get all users who belong to a group that starts with the letter “A”:

>>> P = translate(User, Group)
>>> query = P.User.select().join(P.User_groups).join(P.Group).where(
...     fn.Lower(fn.Substr(P.Group.name, 1, 1)) == 'a')
>>> sql, params = query.sql()
>>> print sql  # formatted for legibility
SELECT t1."id", t1."password", ...
FROM "auth_user" AS t1
INNER JOIN "auth_user_groups" AS t2 ON (t1."id" = t2."user_id")
INNER JOIN "auth_group" AS t3 ON (t2."group_id" = t3."id")
WHERE (Lower(Substr(t3."name", %s, %s)) = %s)

djpeewee API

translate(*models, **options)

Translate the given Django models into roughly equivalent peewee models suitable for use constructing queries. Foreign keys and many-to-many relationships will be followed and models generated, although back references are not traversed.

Parameters:
  • models – One or more Django model classes.
  • options – A dictionary of options, see note below.
Returns:

A dict-like object containing the generated models, but which supports dotted-name style lookups.

The following are valid options:

  • recurse: Follow foreign keys and many to many (default: True).
  • max_depth: Maximum depth to recurse (default: None, unlimited).
  • backrefs: Follow backrefs (default: False).
  • exclude: A list of models to exclude.

Generic foreign keys

The gfk module provides a Generic ForeignKey (GFK), similar to Django. A GFK is composed of two columns: an object ID and an object type identifier. The object types are collected in a global registry (all_models).

How a GFKField is resolved:

  1. Look up the object type in the global registry (returns a model class)
  2. Look up the model instance by object ID

Note

In order to use Generic ForeignKeys, your application’s models must subclass playhouse.gfk.Model. This ensures that the model class will be added to the global registry.

Note

GFKs themselves are not actually a field and will not add a column to your table.

Like regular ForeignKeys, GFKs support a “back-reference” via the ReverseGFK descriptor.

How to use GFKs

  1. Be sure your model subclasses playhouse.gfk.Model
  2. Add a CharField to store the object_type
  3. Add a field to store the object_id (usually a IntegerField)
  4. Add a GFKField and instantiate it with the names of the object_type and object_id fields.
  5. (optional) On any other models, add a ReverseGFK descriptor

Example:

from playhouse.gfk import *

class Tag(Model):
    tag = CharField()
    object_type = CharField(null=True)
    object_id = IntegerField(null=True)
    object = GFKField('object_type', 'object_id')

class Blog(Model):
    tags = ReverseGFK(Tag, 'object_type', 'object_id')

class Photo(Model):
    tags = ReverseGFK(Tag, 'object_type', 'object_id')

How you use these is pretty straightforward hopefully:

>>> b = Blog.create(name='awesome post')
>>> Tag.create(tag='awesome', object=b)
>>> b2 = Blog.create(name='whiny post')
>>> Tag.create(tag='whiny', object=b2)

>>> b.tags # <-- a select query
<class '__main__.Tag'> SELECT t1."id", t1."tag", t1."object_type", t1."object_id" FROM "tag" AS t1 WHERE ((t1."object_type" = ?) AND (t1."object_id" = ?)) [u'blog', 1]

>>> [x.tag for x in b.tags]
[u'awesome']

>>> [x.tag for x in b2.tags]
[u'whiny']

>>> p = Photo.create(name='picture of cat')
>>> Tag.create(object=p, tag='kitties')
>>> Tag.create(object=p, tag='cats')

>>> [x.tag for x in p.tags]
[u'kitties', u'cats']

>>> [x.tag for x in Blog.tags]
[u'awesome', u'whiny']

>>> t = Tag.get(Tag.tag == 'awesome')
>>> t.object
<__main__.Blog at 0x268f450>

>>> t.object.name
u'awesome post'

GFK API

class GFKField([model_type_field='object_type'[, model_id_field='object_id']])

Provide a clean API for storing “generic” foreign keys. Generic foreign keys are comprised of an object type, which maps to a model class, and an object id, which maps to the primary key of the related model class.

Setting the GFKField on a model will automatically populate the model_type_field and model_id_field. Similarly, getting the GFKField on a model instance will “resolve” the two fields, first looking up the model class, then looking up the instance by ID.

class ReverseGFK(model[, model_type_field='object_type'[, model_id_field='object_id']])

Back-reference support for GFKField.

Key/Value Store

Provides a simple key/value store, using a dictionary API. By default the the KeyStore will use an in-memory sqlite database, but any database will work.

To start using the key-store, create an instance and pass it a field to use for the values.

>>> kv = KeyStore(TextField())
>>> kv['a'] = 'A'
>>> kv['a']
'A'

Note

To store arbitrary python objects, use the PickledKeyStore, which stores values in a pickled BlobField.

Using the KeyStore it is possible to use “expressions” to retrieve values from the dictionary. For instance, imagine you want to get all keys which contain a certain substring:

>>> keys_matching_substr = kv[kv.key % '%substr%']
>>> keys_start_with_a = kv[fn.Lower(fn.Substr(kv.key, 1, 1)) == 'a']

KeyStore API

class KeyStore(value_field[, ordered=False[, database=None]])

Lightweight dictionary interface to a model containing a key and value. Implements common dictionary methods, such as __getitem__, __setitem__, get, pop, items, keys, and values.

Parameters:
  • value_field (Field) – Field instance to use as value field, e.g. an instance of TextField.
  • ordered (boolean) – Whether the keys should be returned in sorted order
  • database (Database) – Database class to use for the storage backend. If none is supplied, an in-memory Sqlite DB will be used.

Example:

>>> from playhouse.kv import KeyStore
>>> kv = KeyStore(TextField())
>>> kv['a'] = 'foo'
>>> for k, v in kv:
...     print k, v
a foo

>>> 'a' in kv
True
>>> 'b' in kv
False
class PickledKeyStore([ordered=False[, database=None]])

Identical to the KeyStore except anything can be stored as a value in the dictionary. The storage for the value will be a pickled BlobField.

Example:

>>> from playhouse.kv import PickledKeyStore
>>> pkv = PickledKeyStore()
>>> pkv['a'] = 'A'
>>> pkv['b'] = 1.0
>>> list(pkv.items())
[(u'a', 'A'), (u'b', 1.0)]

Shortcuts

This module contains helper functions for expressing things that would otherwise be somewhat verbose or cumbersome using peewee’s APIs.

case(predicate, expression_tuples, default=None)
Parameters:
  • predicate – A SQL expression or can be None.
  • expression_tuples – An iterable containing one or more 2-tuples comprised of an expression and return value.
  • default – default if none of the cases match.

Example SQL case statements:

-- case with predicate --
SELECT "username",
  CASE "user_id"
    WHEN 1 THEN "one"
    WHEN 2 THEN "two"
    ELSE "?"
  END
FROM "users";

-- case with no predicate (inline expressions) --
SELECT "username",
  CASE
    WHEN "user_id" = 1 THEN "one"
    WHEN "user_id" = 2 THEN "two"
    ELSE "?"
  END
FROM "users";

Equivalent function invocations:

User.select(User.username, case(User.user_id, (
  (1, "one"),
  (2, "two")), "?"))

User.select(User.username, case(None, (
  (User.user_id == 1, "one"),  # note the double equals
  (User.user_id == 2, "two")), "?"))

You can specify a value for the CASE expression using the alias() method:

User.select(User.username, case(User.user_id, (
  (1, "one"),
  (2, "two")), "?").alias("id_string"))
model_to_dict(model[, recurse=True[, backrefs=False[, only=None[, exclude=None]]]])

Convert a model instance (and optionally any related instances) to a dictionary.

Parameters:
  • recurse (bool) – Whether foreign-keys should be recursed.
  • backrefs (bool) – Whether lists of related objects should be recursed.
  • only – A list (or set) of field instances which should be included in the result dictionary.
  • exclude – A list (or set) of field instances which should be excluded from the result dictionary.

Examples:

>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}

>>> model_to_dict(user, backrefs=True)
{'id': 1, 'tweets': [], 'username': 'charlie'}

>>> t1 = Tweet.create(user=user, message='tweet-1')
>>> t2 = Tweet.create(user=user, message='tweet-2')
>>> model_to_dict(user, backrefs=True)
{
  'id': 1,
  'tweets': [
    {'id': 1, 'message': 'tweet-1'},
    {'id': 2, 'message': 'tweet-2'},
  ],
  'username': 'charlie'
}

>>> model_to_dict(t1)
{
  'id': 1,
  'message': 'tweet-1',
  'user': {
    'id': 1,
    'username': 'charlie'
  }
}

>>> model_to_dict(t2, recurse=False)
{'id': 1, 'message': 'tweet-2', 'user': 1}

Signal support

Models with hooks for signals (a-la django) are provided in playhouse.signals. To use the signals, you will need all of your project’s models to be a subclass of playhouse.signals.Model, which overrides the necessary methods to provide support for the various signals.

from playhouse.signals import Model, post_save


class MyModel(Model):
    data = IntegerField()

@post_save(sender=MyModel)
def on_save_handler(model_class, instance, created):
    put_data_in_cache(instance.data)

The following signals are provided:

pre_save
Called immediately before an object is saved to the database. Provides an additional keyword argument created, indicating whether the model is being saved for the first time or updated.
post_save
Called immediately after an object is saved to the database. Provides an additional keyword argument created, indicating whether the model is being saved for the first time or updated.
pre_delete
Called immediately before an object is deleted from the database when Model.delete_instance() is used.
post_delete
Called immediately after an object is deleted from the database when Model.delete_instance() is used.
pre_init
Called when a model class is first instantiated
post_init
Called after a model class has been instantiated and the fields have been populated, for example when being selected as part of a database query.

Connecting handlers

Whenever a signal is dispatched, it will call any handlers that have been registered. This allows totally separate code to respond to events like model save and delete.

The Signal class provides a connect() method, which takes a callback function and two optional parameters for “sender” and “name”. If specified, the “sender” parameter should be a single model class and allows your callback to only receive signals from that one model class. The “name” parameter is used as a convenient alias in the event you wish to unregister your signal handler.

Example usage:

from playhouse.signals import *

def post_save_handler(sender, instance, created):
    print '%s was just saved' % instance

# our handler will only be called when we save instances of SomeModel
post_save.connect(post_save_handler, sender=SomeModel)

All signal handlers accept as their first two arguments sender and instance, where sender is the model class and instance is the actual model being acted upon.

If you’d like, you can also use a decorator to connect signal handlers. This is functionally equivalent to the above example:

@post_save(sender=SomeModel)
def post_save_handler(sender, instance, created):
    print '%s was just saved' % instance

Signal API

class Signal

Stores a list of receivers (callbacks) and calls them when the “send” method is invoked.

connect(receiver[, sender=None[, name=None]])

Add the receiver to the internal list of receivers, which will be called whenever the signal is sent.

Parameters:
  • receiver (callable) – a callable that takes at least two parameters, a “sender”, which is the Model subclass that triggered the signal, and an “instance”, which is the actual model instance.
  • sender (Model) – if specified, only instances of this model class will trigger the receiver callback.
  • name (string) – a short alias
from playhouse.signals import post_save
from project.handlers import cache_buster

post_save.connect(cache_buster, name='project.cache_buster')
disconnect([receiver=None[, name=None]])

Disconnect the given receiver (or the receiver with the given name alias) so that it no longer is called. Either the receiver or the name must be provided.

Parameters:
  • receiver (callable) – the callback to disconnect
  • name (string) – a short alias
post_save.disconnect(name='project.cache_buster')
send(instance, *args, **kwargs)

Iterates over the receivers and will call them in the order in which they were connected. If the receiver specified a sender, it will only be called if the instance is an instance of the sender.

Parameters:instance – a model instance

pwiz, a model generator

pwiz is a little script that ships with peewee and is capable of introspecting an existing database and generating model code suitable for interacting with the underlying data. If you have a database already, pwiz can give you a nice boost by generating skeleton code with correct column affinities and foreign keys.

If you install peewee using setup.py install, pwiz will be installed as a “script” and you can just run:

pwiz.py -e postgresql -u postgres my_postgres_db

This will print a bunch of models to standard output. So you can do this:

pwiz.py -e postgresql my_postgres_db > mymodels.py
python # <-- fire up an interactive shell
>>> from mymodels import Blog, Entry, Tag, Whatever
>>> print [blog.name for blog in Blog.select()]
Option Meaning Example
-h show help  
-e database backend -e mysql
-H host to connect to -H remote.db.server
-p port to connect on -p 9001
-u database user -u postgres
-P database password -P secret
-s postgres schema -s public

The following are valid parameters for the engine:

  • sqlite
  • mysql
  • postgresql

Schema Migrations

Peewee now supports schema migrations, with well-tested support for Postgresql, SQLite and MySQL. Unlike other schema migration tools, peewee’s migrations do not handle introspection and database “versioning”. Rather, peewee provides a number of helper functions for generating and running schema-altering statements. This engine provides the basis on which a more sophisticated tool could some day be built.

Migrations can be written as simple python scripts and executed from the command-line. Since the migrations only depend on your applications Database object, it should be easy to manage changing your model definitions and maintaining a set of migration scripts without introducing dependencies.

Example usage

Begin by importing the helpers from the migrate module:

from playhouse.migrate import *

Instantiate a migrator. The SchemaMigrator class is responsible for generating schema altering operations, which can then be run sequentially by the migrate() helper.

# Postgres example:
my_db = PostgresqlDatabase(...)
migrator = PostgresqlMigrator(my_db)

# SQLite example:
my_db = SqliteDatabase('my_database.db')
migrator = SqliteMigrator(my_db)

Use migrate() to execute one or more operations:

title_field = CharField(default='')
status_field = IntegerField(null=True)

migrate(
    migrator.add_column('some_table', 'title', title_field),
    migrator.add_column('some_table', 'status', status_field),
    migrator.drop_column('some_table', 'old_column'),
)

Warning

Migrations are not run inside a transaction. If you wish the migration to run in a transaction you will need to wrap the call to migrate in a transaction block, e.g.

with my_db.transaction():
    migrate(...)

Supported Operations

Add new field(s) to an existing model:

# Create your field instances. For non-null fields you must specify a
# default value.
pubdate_field = DateTimeField(null=True)
comment_field = TextField(default='')

# Run the migration, specifying the database table, field name and field.
migrate(
    migrator.add_column('comment_tbl', 'pub_date', pubdate_field),
    migrator.add_column('comment_tbl', 'comment', comment_field),
)

Renaming a field:

# Specify the table, original name of the column, and its new name.
migrate(
    migrator.rename_column('story', 'pub_date', 'publish_date'),
    migrator.rename_column('story', 'mod_date', 'modified_date'),
)

Dropping a field:

migrate(
    migrator.drop_column('story', 'some_old_field'),
)

Making a field nullable or not nullable:

# Note that when making a field not null that field must not have any
# NULL values present.
migrate(
    # Make `pub_date` allow NULL values.
    migrator.drop_not_null('story', 'pub_date'),

    # Prevent `modified_date` from containing NULL values.
    migrator.add_not_null('story', 'modified_date'),
)

Renaming a table:

migrate(
    migrator.rename_table('story', 'stories_tbl'),
)

Adding an index:

# Specify the table, column names, and whether the index should be
# UNIQUE or not.
migrate(
    # Create an index on the `pub_date` column.
    migrator.add_index('story', ('pub_date',), False),

    # Create a multi-column index on the `pub_date` and `status` fields.
    migrator.add_index('story', ('pub_date', 'status'), False),

    # Create a unique index on the category and title fields.
    migrator.add_index('story', ('category_id', 'title'), True),
)

Dropping an index:

# Specify the index name.
migrate(migrator.drop_index('story', 'story_pub_date_status'))

Migrations API

migrate(*operations)

Execute one or more schema altering operations.

Usage:

migrate(
    migrator.add_column('some_table', 'new_column', CharField(default='')),
    migrator.create_index('some_table', ('new_column',)),
)
class SchemaMigrator(database)
Parameters:database – a Database instance.

The SchemaMigrator is responsible for generating schema-altering statements.

add_column(table, column_name, field)
Parameters:
  • table (str) – Name of the table to add column to.
  • column_name (str) – Name of the new column.
  • field (Field) – A Field instance.

Add a new column to the provided table. The field provided will be used to generate the appropriate column definition.

Note

If the field is not nullable it must specify a default value.

Note

For non-null fields, the field will initially be added as a null field, then an UPDATE statement will be executed to populate the column with the default value. Finally, the column will be marked as not null.

drop_column(table, column_name[, cascade=True])
Parameters:
  • table (str) – Name of the table to drop column from.
  • column_name (str) – Name of the column to drop.
  • cascade (bool) – Whether the column should be dropped with CASCADE.
rename_column(table, old_name, new_name)
Parameters:
  • table (str) – Name of the table containing column to rename.
  • old_name (str) – Current name of the column.
  • new_name (str) – New name for the column.
add_not_null(table, column)
Parameters:
  • table (str) – Name of table containing column.
  • column (str) – Name of the column to make not nullable.
drop_not_null(table, column)
Parameters:
  • table (str) – Name of table containing column.
  • column (str) – Name of the column to make nullable.
rename_table(old_name, new_name)
Parameters:
  • old_name (str) – Current name of the table.
  • new_name (str) – New name for the table.
add_index(table, columns[, unique=False])
Parameters:
  • table (str) – Name of table on which to create the index.
  • columns (list) – List of columns which should be indexed.
  • unique (bool) – Whether the new index should specify a unique constraint.
drop_index(table, index_name)

:param str table Name of the table containing the index to be dropped. :param str index_name: Name of the index to be dropped.

class PostgresqlMigrator(database)

Generate migrations for Postgresql databases.

class SqliteMigrator(database)

Generate migrations for SQLite databases.

class MySQLMigrator(database)

Generate migrations for MySQL databases.

Warning

The MySQL migrations are not well tested.

CSV Loader

This module contains helpers for loading CSV data into a database. CSV files can be introspected to generate an appropriate model class for working with the data. This makes it really easy to explore the data in a CSV file using Peewee and SQL.

Here is how you would load a CSV file into an in-memory SQLite database. The call to load_csv() returns a Model instance suitable for working with the CSV data:

from peewee import *
from playhouse.csv_loader import load_csv
db = SqliteDatabase(':memory:')
ZipToTZ = load_csv(db, 'zip_to_tz.csv')

Now we can run queries using the new model.

# Get the timezone for a zipcode.
>>> ZipToTZ.get(ZipToTZ.zip == 66047).timezone
'US/Central'

# Get all the zipcodes for my town.
>>> [row.zip for row in ZipToTZ.select().where(
...     (ZipToTZ.city == 'Lawrence') && (ZipToTZ.state == 'KS'))]
[66044, 66045, 66046, 66047, 66049]

For more information and examples check out this blog post.

CSV Loader API

load_csv(db_or_model, filename[, fields=None[, field_names=None[, has_header=True[, sample_size=10[, converter=None[, db_table=None[, **reader_kwargs]]]]]]])

Load a CSV file into the provided database or model class, returning a Model suitable for working with the CSV data.

Parameters:
  • db_or_model – Either a Database instance or a Model class. If a model is not provided, one will be automatically generated for you.
  • filename (str) – Path of CSV file to load.
  • fields (list) – A list of Field instances mapping to each column in the CSV. This allows you to manually specify the column types. If not provided, and a model is not provided, the field types will be determined automatically.
  • field_names (list) – A list of strings to use as field names for each column in the CSV. If not provided, and a model is not provided, the field names will be determined by looking at the header row of the file. If no header exists, then the fields will be given generic names.
  • has_header (bool) – Whether the first row is a header.
  • sample_size (int) – Number of rows to look at when introspecting data types. If set to 0, then a generic field type will be used for all fields.
  • converter (RowConverter) – a RowConverter instance to use for introspecting the CSV. If not provided, one will be created.
  • db_table (str) – The name of the database table to load data into. If this value is not provided, it will be determined using the filename of the CSV file. If a model is provided, this value is ignored.
  • reader_kwargs – Arbitrary keyword arguments to pass to the csv.reader object, such as the dialect, separator, etc.
Return type:

A Model suitable for querying the CSV data.

Basic example – field names and types will be introspected:

from peewee import *
from playhouse.csv_loader import *
db = SqliteDatabase(':memory:')
User = load_csv(db, 'users.csv')

Using a pre-defined model:

class ZipToTZ(Model):
    zip = IntegerField()
    timezone = CharField()

load_csv(ZipToTZ, 'zip_to_tz.csv')

Specifying fields:

fields = [DecimalField(), IntegerField(), IntegerField(), DateField()]
field_names = ['amount', 'from_acct', 'to_acct', 'timestamp']
Payments = load_csv(db, 'payments.csv', fields=fields, field_names=field_names, has_header=False)

Connection pool

Warning

This module should be considered experimental.

The pool module contains a helper class to pool database connections, as well as implementations for PostgreSQL and MySQL. The pool works by overriding the methods on the Database class that open and close connections to the backend. The pool can specify a timeout after which connections are recycled, as well as an upper bound on the number of open connections.

If your application is single-threaded, only one connection will be opened.

If your application is multi-threaded (this includes green threads) and you specify threadlocals=True when instantiating your database, then up to max_connections will be opened.

Note

If you intend to open multiple concurrent connections, specify threadlocals=True when creating your database, e.g.

db = PooledPostgresqlDatabase(
    'my_db',
    max_connections=8,
    stale_timeout=600,
    user='postgres',
    threadlocals=True)
class PooledDatabase(database[, max_connections=20[, stale_timeout=None[, **kwargs]]])

Mixin class intended to be used with a subclass of Database.

Parameters:
  • database (str) – The name of the database or database file.
  • max_connections (int) – Maximum number of connections. Provide None for unlimited.
  • stale_timeout (int) – Number of seconds to allow connections to be used.
  • kwargs – Arbitrary keyword arguments passed to database class.

Note

Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.

Note

If the number of open connections exceeds max_connections, a ValueError will be raised.

manual_close()

Close the currently-open connection without returning it to the pool.

_connect(*args, **kwargs)

Request a connection from the pool. If there are no available connections a new one will be opened.

_close(conn[, close_conn=False])

By default conn will not be closed and instead will be returned to the pool of available connections. If close_conn=True, then conn will be closed and not be returned to the pool.

class PooledPostgresqlDatabase

Subclass of PostgresqlDatabase that mixes in the PooledDatabase helper.

class PooledPostgresqlExtDatabase

Subclass of PostgresqlExtDatabase that mixes in the PooledDatabase helper. The PostgresqlExtDatabase is a part of the Postgresql Extensions module and provides support for many Postgres-specific features.

class PooledMySQLDatabase

Subclass of MySQLDatabase that mixes in the PooledDatabase helper.

Read Slaves

The read_slave module contains a Model subclass that can be used to automatically execute SELECT queries against different database(s). This might be useful if you have your databases in a master / slave configuration.

class ReadSlaveModel

Model subclass that will route SELECT queries to a different database.

Master and read-slaves are specified using Model.Meta:

# Declare a master and two read-replicas.
master = PostgresqlDatabase('master')
replica_1 = PostgresqlDatabase('replica_1')
replica_2 = PostgresqlDatabase('replica_2')

# Declare a BaseModel, the normal best-practice.
class BaseModel(ReadSlaveModel):
    class Meta:
        database = master
        read_slaves = (replica_1, replica_2)

# Declare your models.
class User(BaseModel):
    username = CharField()

When you execute writes (or deletes), they will be executed against the master database:

User.create(username='Peewee')  # Executed against master.

When you execute a read query, it will run against one of the replicas:

users = User.select().where(User.username == 'Peewee')

Note

To force a SELECT query against the master database, manually create the SelectQuery.

SelectQuery(User)  # master database.

Note

Queries will be dispatched among the read_slaves in round-robin fashion.

Test Utils

Contains utilities helpful when testing peewee projects.

class test_database(db, models[, create_tables=True[, fail_silently=False]])

Context manager that lets you use a different database with a set of models. Models can also be automatically created and dropped.

This context manager helps make it possible to test your peewee models using a “test-only” database.

Parameters:
  • db (Database) – Database to use with the given models
  • models – a list of Model classes to use with the db
  • create_tables (boolean) – Whether tables should be automatically created and dropped.
  • fail_silently (boolean) – Whether the table create / drop should fail silently.

Example:

from unittest import TestCase
from playhouse.test_utils import test_database
from peewee import *

from my_app.models import User, Tweet

test_db = SqliteDatabase(':memory:')

class TestUsersTweets(TestCase):
    def create_test_data(self):
        # ... create a bunch of users and tweets
        for i in range(10):
            User.create(username='user-%d' % i)

    def test_timeline(self):
        with test_database(test_db, (User, Tweet)):
            # This data will be created in `test_db`
            self.create_test_data()

            # Perform assertions on test data inside ctx manager.
            self.assertEqual(Tweet.timeline('user-0') [...])

        # once we exit the context manager, we're back to using the normal database
class count_queries([only_select=False])

Context manager that will count the number of queries executed within the context.

Parameters:only_select (bool) – Only count SELECT queries.
with count_queries() as counter:
    huey = User.get(User.username == 'huey')
    huey_tweets = [tweet.message for tweet in huey.tweets]

assert counter.count == 2
count

The number of queries executed.

get_queries()

Return a list of 2-tuples consisting of the SQL query and a list of parameters.

assert_query_count(expected[, only_select=False])

Function or method decorator that will raise an AssertionError if the number of queries executed in the decorated function does not equal the expected number.

class TestMyApp(unittest.TestCase):
    @assert_query_count(1)
    def test_get_popular_blogs(self):
        popular_blogs = Blog.get_popular()
        self.assertEqual(
            [blog.title for blog in popular_blogs],
            ["Peewee's Playhouse!", "All About Huey", "Mickey's Adventures"])

This function can also be used as a context manager:

class TestMyApp(unittest.TestCase):
    def test_expensive_operation(self):
        with assert_query_count(1):
            perform_expensive_operation()

pskel

I often find myself writing very small scripts with peewee. pskel will generate the boilerplate code for a basic peewee script.

Usage:

pskel [options] model1 model2 ...

pskel accepts the following options:

Option Default Description
-l,--logging False Log all queries to stdout.
-e,--engine sqlite Database driver to use.
-d,--database :memory: Database to connect to.

Example:

$ pskel -e postgres -d my_database User Tweet

This will print the following code to stdout (which you can redirect into a file using >):

#!/usr/bin/env python

import logging

from peewee import *
from peewee import create_model_tables

db = PostgresqlDatabase('my_database')

class BaseModel(Model):
    class Meta:
        database = db

class User(BaseModel):
    pass

class Tweet(BaseModel):
    pass

def main():
    create_model_tables([User, Tweet], fail_silently=True)

if __name__ == '__main__':
    main()