How to use¶
Mongo¶
def test_using_mongo(mongodb):
db = mongodb['test_database']
db.test.insert({'woof': 'woof'})
documents = db.test.find_one()
from pytest_dbfixtures import factories
mongo_proc2 = factories.mongo_proc(port=27070, params='--nojournal --noauth --nohttpinterface --noprealloc')
mongodb2 = factories.mongodb('mongo_proc2')
def test_second_mongo(mongodb, mongodb2):
test_data_one = {
"test1": "test1",
}
db = mongodb['test_db']
db.test.insert(test_data_one)
assert db.test.find_one()['test1'] == 'test1'
test_data_two = {
"test2": "test2",
}
db = mongodb2['test_db']
db.test.insert(test_data_two)
assert db.test.find_one()['test2'] == 'test2'
MySQL¶
def test_using_mysql(mysql):
mysql.query("SELECT CURRENT_USER()")
# second database
from pytest_dbfixtures import factories
query = '''CREATE TABLE pet (name VARCHAR(20), owner VARCHAR(20),
species VARCHAR(20), sex CHAR(1), birth DATE, death DATE);'''
mysql_proc2 = factories.mysql_proc(port=3308, params='--skip-sync-frm')
mysql2 = factories.mysql('mysql_proc2')
def test_mysql_newfixture(mysql, mysql2):
cursor = mysql.cursor()
cursor.execute(query)
mysql.commit()
cursor.close()
cursor = mysql2.cursor()
cursor.execute(query)
mysql2.commit()
cursor.close()
RabbitMQ¶
def test_using_rabbit(rabbitmq):
channel = rabbitmq.channel()
Redis¶
Minimum supported version is 2.6
def test_using_redis(redisdb):
redisdb.set('woof', 'woof')
woof = redisdb.get('woof')
from pytest_dbfixtures import factories
redis_proc2 = factories.redis_proc(port=6381)
redisdb2 = factories.redisdb('redis_proc2')
def test_using_two_redis(redisdb, redisdb2):
redisdb.set('woof1', 'woof1')
redisdb2.set('woof2', 'woof12')
woof1 = redisdb.get('woof1')
woof2 = redisdb2.get('woof2')
PostgreSQL¶
Minimum supported version is 8.4
def test_main_postgres(postgresql):
cur = postgresql.cursor()
cur.execute('CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);')
postgresql.commit()
cur.close()
from pytest_dbfixtures import factories
postgresql_proc2 = factories.postgresql_proc(port=9876)
postgresql2 = factories.postgresql('postgresql_proc2')
def test_two_postgreses(postgresql, postgresql2):
cur = postgresql.cursor()
cur.execute('CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);')
postgresql.commit()
cur.close()
cur = postgresql2.cursor()
cur.execute('CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);')
postgresql2.commit()
cur.close()
PostgreSQL factory is based on psycopg2
Elasticsearch¶
def test_elastic_process(elasticsearch_proc):
"""Simple test for starting elasticsearch_proc."""
assert elasticsearch_proc.running() is True
def test_elasticsarch(elasticsearch):
"""Tests if elasticsearch fixtures connects to process."""
info = elasticsearch.info()
assert info['status'] == 200
Random process port¶
Instead of specifing precise port that process will be bound to you can pass ‘?’ in port argument or specify port range e.g. ‘2000-3000’ or comma-separated list or ranges e.g. ‘2000-3000,4000-4500,5000’. Library will randomly choose a port that is not used by any other application.
from pytest_dbfixtures import factories
redis_rand_proc = factories.redis_proc(port='?')
redisdb_rand = factories.redisdb('redis_rand_proc')
def test_using_random_ports(redisdb_rand, redisdb):
print redisdb_rand.port # will print randomly selected redis port
print redisdb.port # will print default redis port