Examples

counters.py

A command line tool for manipulating and querying bunch of counters stored in an SQLite database. This demonstrates basic use of dbkit.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
A counter management tool.
"""

from contextlib import closing
from os import path
import sqlite3
import sys

from dbkit import (
    connect,
    execute,
    query,
    query_column,
    query_value,
    transactional,
)


def get_counter(counter):
    """
    Get the value of a counter.
    """
    print query_value(
        'SELECT value FROM counters WHERE counter = ?',
        (counter,),
        default=0)


@transactional
def set_counter(counter, value):
    """
    Set a counter.
    """
    execute(
        'REPLACE INTO counters (counter, value) VALUES (?, ?)',
        (counter, value))


@transactional
def increment_counter(counter, by):
    """
    Modify the value of a counter by a certain amount.
    """
    execute(
        'UPDATE counters SET value = value + ? WHERE counter = ?',
        (by, counter))


@transactional
def delete_counter(counter):
    """
    Delete a counter.
    """
    execute(
        'DELETE FROM counters WHERE counter = ?',
        (counter,))


def list_counters():
    """
    List the names of all the stored counters.
    """
    print "\n".join(query_column('SELECT counter FROM counters'))


def dump_counters():
    """
    Query the database for all counters and their values.
    """
    return query('SELECT counter, value FROM counters')


def print_counters_and_values():
    """
    List all the counters and their values.
    """
    for counter, value in dump_counters():
        print "%s: %d" % (counter, value)


def print_help(filename, table, dest=sys.stdout):
    """
    Print help to the given destination file object.
    """
    cmds = '|'.join(sorted(table.keys()))
    print >> dest, "Syntax: %s %s [args]" % (path.basename(filename), cmds)


def dispatch(table, args):
    """
    Dispatches to a function based on the contents of `args`.
    """
    # No arguments: print help.
    if len(args) == 1:
        print_help(args[0], table)
        sys.exit(0)

    # Bad command or incorrect number of arguments: print help to stderr.
    if args[1] not in table or len(args) != len(table[args[1]]) + 1:
        print_help(args[0], table, dest=sys.stderr)
        sys.exit(1)

    # Cast all the arguments to fit their function's signature to ensure
    # they're correct and to make them safe for consumption.
    sig = table[args[1]]
    try:
        fixed_args = [type_(arg) for arg, type_ in zip(args[2:], sig[1:])]
    except TypeError:
        # If any are wrong, complain to stderr.
        print_help(args[0], table, dest=sys.stderr)
        sys.exit(1)

    # Dispatch the call to the correct function.
    sig[0](*fixed_args)


def main():
    # This table tells us the subcommands, the functions to dispatch to,
    # and their signatures.
    command_table = {
        'set': (set_counter, str, int),
        'del': (delete_counter, str),
        'get': (get_counter, str),
        'list': (list_counters,),
        'incr': (increment_counter, str, int),
        'dump': (print_counters_and_values,),
    }
    with connect(sqlite3, 'counters.sqlite') as ctx:
        with closing(ctx):
            dispatch(command_table, sys.argv)


if __name__ == '__main__':
    main()

pools.py

A small web application, built using web.py, pystache, and psycopg2, to say that prints “Hello, name” based on the URL fetched, and which records how many times it’s said hello to a particular name.

This demonstrates use of connection pools.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import web
import psycopg2
import pystache
from dbkit import (
    dict_set,
    execute,
    Pool,
    query,
    query_value,
    transactional,
)


urls = (
    '/(.*)', 'hello',
)
app = web.application(urls, globals())
pool = Pool(psycopg2, 2, "dbname=namecounter user=keith")


TEMPLATE = """<!DOCTYPE html>
<html>
    <head>
        <title>Hello!</title>
    </head>
    <body>
        <p>Hello, {{name}}!</p>
        <p>Previously, I've said hello to:</p>
        <ul>
        {{#hellos}}
            <li>{{name}}, {{n}} times</li>
        {{/hellos}}
        </ul>
    </body>
</html>"""


@transactional
def save_name(name):
    if query_value("SELECT n FROM greeted WHERE name = %s", (name,), 0) == 0:
        execute("INSERT INTO greeted (name, n) VALUES (%s, 1)", (name,))
    else:
        execute("UPDATE greeted SET n = n + 1 WHERE name = %s", (name,))


def get_names():
    return query("SELECT name, n FROM greeted ORDER BY n", factory=dict_set)


class hello(object):

    def GET(self, name):
        ctx = pool.connect()
        if not name:
            name = 'World'
        with ctx:
            hellos = list(get_names())
            save_name(name)
        return pystache.render(TEMPLATE, {'name': name, 'hellos': hellos})


if __name__ == '__main__':
    try:
        app.run()
    finally:
        pool.finalise()