How to use

Mongo

def test_using_mongo(mongodb):
    db = mongodb['test_database']
    db.test.insert({'woof': 'woof'})
    documents = db.test.find_one()


from pytest_dbfixtures import factories
mongo_proc2 = factories.mongo_proc(port=27070, params='--nojournal --noauth --nohttpinterface --noprealloc')
mongodb2 = factories.mongodb('mongo_proc2')

def test_second_mongo(mongodb, mongodb2):
    test_data_one = {
        "test1": "test1",
    }
    db = mongodb['test_db']
    db.test.insert(test_data_one)
    assert db.test.find_one()['test1'] == 'test1'

    test_data_two = {
        "test2": "test2",
    }
    db = mongodb2['test_db']
    db.test.insert(test_data_two)
    assert db.test.find_one()['test2'] == 'test2'

MySQL

def test_using_mysql(mysql):
    mysql.query("SELECT CURRENT_USER()")


# second database
from pytest_dbfixtures import factories

query = '''CREATE TABLE pet (name VARCHAR(20), owner VARCHAR(20),
species VARCHAR(20), sex CHAR(1), birth DATE, death DATE);'''

mysql_proc2 = factories.mysql_proc(port=3308, params='--skip-sync-frm')
mysql2 = factories.mysql('mysql_proc2')

def test_mysql_newfixture(mysql, mysql2):
    cursor = mysql.cursor()
    cursor.execute(query)
    mysql.commit()
    cursor.close()

    cursor = mysql2.cursor()
    cursor.execute(query)
    mysql2.commit()
    cursor.close()

RabbitMQ

def test_using_rabbit(rabbitmq):
    channel = rabbitmq.channel()

Redis

Minimum supported version is 2.6

def test_using_redis(redisdb):
    redisdb.set('woof', 'woof')
    woof = redisdb.get('woof')

from pytest_dbfixtures import factories

redis_proc2 = factories.redis_proc(port=6381)
redisdb2 = factories.redisdb('redis_proc2')

def test_using_two_redis(redisdb, redisdb2):
    redisdb.set('woof1', 'woof1')
    redisdb2.set('woof2', 'woof12')

    woof1 = redisdb.get('woof1')
    woof2 = redisdb2.get('woof2')

PostgreSQL

Minimum supported version is 8.4

def test_main_postgres(postgresql):
    cur = postgresql.cursor()
    cur.execute('CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);')
    postgresql.commit()
    cur.close()

from pytest_dbfixtures import factories


postgresql_proc2 = factories.postgresql_proc(port=9876)
postgresql2 = factories.postgresql('postgresql_proc2')


def test_two_postgreses(postgresql, postgresql2):
    cur = postgresql.cursor()
    cur.execute('CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);')
    postgresql.commit()
    cur.close()

    cur = postgresql2.cursor()
    cur.execute('CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);')
    postgresql2.commit()
    cur.close()

PostgreSQL factory is based on psycopg2

Elasticsearch

def test_elastic_process(elasticsearch_proc):
    """Simple test for starting elasticsearch_proc."""
    assert elasticsearch_proc.running() is True


def test_elasticsarch(elasticsearch):
    """Tests if elasticsearch fixtures connects to process."""

    info = elasticsearch.info()
    assert info['status'] == 200

Random process port

Instead of specifing precise port that process will be bound to you can pass ‘?’ in port argument or specify port range e.g. ‘2000-3000’ or comma-separated list or ranges e.g. ‘2000-3000,4000-4500,5000’. Library will randomly choose a port that is not used by any other application.

from pytest_dbfixtures import factories

redis_rand_proc = factories.redis_proc(port='?')
redisdb_rand = factories.redisdb('redis_rand_proc')

def test_using_random_ports(redisdb_rand, redisdb):
    print redisdb_rand.port  # will print randomly selected redis port
    print redisdb.port  # will print default redis port