A command line tool for manipulating and querying bunch of counters stored in an SQLite database. This demonstrates basic use of dbkit.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | """A counter management tool."""
from contextlib import closing
from os import path
import sqlite3
import sys
from dbkit import connect, transaction, \
execute, query, query_value, query_column
def get_counter(counter):
"""Get the value of a counter."""
print query_value(
'SELECT value FROM counters WHERE counter = ?',
(counter,),
default=0)
def set_counter(counter, value):
"""Set a counter."""
with transaction():
execute(
'REPLACE INTO counters (counter, value) VALUES (?, ?)',
(counter, value))
def increment_counter(counter, by):
"""Modify the value of a counter by a certain amount."""
with transaction():
execute(
'UPDATE counters SET value = value + ? WHERE counter = ?',
(by, counter))
def delete_counter(counter):
"""Delete a counter."""
with transaction():
execute(
'DELETE FROM counters WHERE counter = ?',
(counter,))
def list_counters():
"""List the names of all the stored counters."""
print "\n".join(query_column('SELECT counter FROM counters'))
def dump_counters():
"""Query the database for all counters and their values."""
return query('SELECT counter, value FROM counters')
def print_counters_and_values():
"""List all the counters and their values."""
for counter, value in dump_counters():
print "%s: %d" % (counter, value)
def print_help(filename, table, dest=sys.stdout):
"""Print help to the given destination file object."""
cmds = '|'.join(sorted(table.keys()))
print >> dest, "Syntax: %s %s [args]" % (path.basename(filename), cmds)
def dispatch(table, args):
"""Dispatches to a function based on the contents of `args`."""
# No arguments: print help.
if len(args) == 1:
print_help(args[0], table)
sys.exit(0)
# Bad command or incorrect number of arguments: print help to stderr.
if args[1] not in table or len(args) != len(table[args[1]]) + 1:
print_help(args[0], table, dest=sys.stderr)
sys.exit(1)
# Cast all the arguments to fit their function's signature to ensure
# they're correct and to make them safe for consumption.
sig = table[args[1]]
try:
fixed_args = [type_(arg) for arg, type_ in zip(args[2:], sig[1:])]
except TypeError:
# If any are wrong, complain to stderr.
print_help(args[0], table, dest=sys.stderr)
sys.exit(1)
# Dispatch the call to the correct function.
sig[0](*fixed_args)
def main():
# This table tells us the subcommands, the functions to dispatch to,
# and their signatures.
command_table = {
'set': (set_counter, str, int),
'del': (delete_counter, str),
'get': (get_counter, str),
'list': (list_counters,),
'incr': (increment_counter, str, int),
'dump': (print_counters_and_values,),
}
with connect(sqlite3, 'counters.sqlite') as ctx:
with closing(ctx):
dispatch(command_table, sys.argv)
if __name__ == '__main__':
main()
|
A small web application, built using web.py, pystache, and psycopg2, to say that prints “Hello, name” based on the URL fetched, and which records how many times it’s said hello to a particular name.
This demonstrates use of connection pools.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | import web
import psycopg2
import pystache
from dbkit import Pool, transactional, query, query_value, execute, dict_set
urls = (
'/(.*)', 'hello'
)
app = web.application(urls, globals())
pool = Pool(psycopg2, 2, "dbname=namecounter user=keith")
TEMPLATE = """<!DOCTYPE html>
<html>
<head>
<title>Hello!</title>
</head>
<body>
<p>Hello, {{name}}!</p>
<p>Previously, I've said hello to:</p>
<ul>
{{#hellos}}
<li>{{name}}, {{n}} times</li>
{{/hellos}}
</ul>
</body>
</html>"""
@transactional
def save_name(name):
if query_value("SELECT n FROM greeted WHERE name = %s", (name,), 0) == 0:
execute("INSERT INTO greeted (name, n) VALUES (%s, 1)", (name,))
else:
execute("UPDATE greeted SET n = n + 1 WHERE name = %s", (name,))
def get_names():
return query("SELECT name, n FROM greeted ORDER BY n", factory=dict_set)
class hello(object):
def GET(self, name):
ctx = pool.connect()
if not name:
name = 'World'
with ctx:
hellos = list(get_names())
save_name(name)
return pystache.render(TEMPLATE, {'name': name, 'hellos': hellos})
if __name__ == '__main__':
try:
app.run()
finally:
pool.finalise()
|