Module usage¶
Basic usage of the module is not very different from using Twisted’s adbapi:
from txpostgres import txpostgres
from twisted.internet import reactor
from twisted.python import log, util
# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')
# run the query and print the result
d.addCallback(lambda _: conn.runQuery('select tablename from pg_tables'))
d.addCallback(lambda result: util.println('All tables:', result))
# close the connection, log any errors and stop the reactor
d.addCallback(lambda _: conn.close())
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())
# start the reactor to kick off connection estabilishing
reactor.run()
If you want you can use the Cursor
class directly, with a
interface closer to Psycopg. Note that using this method you have to make sure
never to execute a query before the previous one finishes, as that would
violate the PostgreSQL asynchronous protocol.
from txpostgres import txpostgres
from twisted.internet import reactor
from twisted.python import log, util
# define the libpq connection string and the query to use
connstr = 'dbname=postgres'
query = 'select tablename from pg_tables order by tablename'
# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')
def useCursor(cur):
# execute a query
d = cur.execute(query)
# fetch the first row from the result
d.addCallback(lambda _: cur.fetchone())
# output it
d.addCallback(lambda result: util.println('First table name:', result[0]))
# and close the cursor
return d.addCallback(lambda _: cur.close())
# create a cursor and use it
d.addCallback(lambda _: conn.cursor())
d.addCallback(useCursor)
# log any errors and stop the reactor
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())
# start the reactor to kick off connection estabilishing
reactor.run()
Using transactions¶
Every query executed by txpostgres is committed immediately. If you need to
execute a series of queries in a transaction, use the
runInteraction()
method:
from txpostgres import txpostgres
from twisted.internet import reactor
from twisted.python import log
# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')
def interaction(cur):
"""
A callable that will execute inside a transaction.
"""
# the parameter is a txpostgres Cursor
d = cur.execute('create table test(x integer)')
d.addCallback(lambda _: cur.execute('insert into test values (%s)', (1, )))
return d
# run the interaction, making sure that if the insert fails, the table won't be
# left behind created but empty
d.addCallback(lambda _: conn.runInteraction(interaction))
# close the connection, log any errors and stop the reactor
d.addCallback(lambda _: conn.close())
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())
# start the reactor to kick off connection estabilishing
reactor.run()
Customising the connection and cursor factories¶
You might want to customise the way txpostgres creates connections and cursors
to take advantage of Psycopg features like dictionary cursors. To do that, define a subclass of
Connection
and override
connectionFactory
or cursorFactory
class attributes to use your
custom code. Here’s an example of how to use dict cursors:
import psycopg2
import psycopg2.extras
from txpostgres import txpostgres
from twisted.internet import reactor
from twisted.python import log, util
def dict_connect(*args, **kwargs):
kwargs['connection_factory'] = psycopg2.extras.DictConnection
return psycopg2.connect(*args, **kwargs)
class DictConnection(txpostgres.Connection):
connectionFactory = staticmethod(dict_connect)
# connect using the custom connection class
conn = DictConnection()
d = conn.connect('dbname=postgres')
# run a query and print the result
d.addCallback(lambda _: conn.runQuery('select * from pg_tablespace'))
# access the column by its name
d.addCallback(lambda result: util.println('All tablespace names:',
[row['spcname'] for row in result]))
# close the connection, log any errors and stop the reactor
d.addCallback(lambda _: conn.close())
d.addErrback(log.err)
d.addBoth(lambda _: reactor.stop())
# start the reactor to kick off connection estabilishing
reactor.run()
Listening for database notifications¶
Being an asynchronous driver, txpostgres supports the PostgreSQL NOTIFY feature for sending asynchronous notifications to connections. Here is an example script that connects to the database and listens for notifications on the list channel. Every time a notification is received, it interprets the payload as part of the name of a table and outputs a list of tables with names containing that payload.
from txpostgres import txpostgres
from twisted.internet import reactor
from twisted.python import util
def outputResults(results, payload):
print "Tables with `%s' in their name:" % payload
for result in results:
print result[0]
def observer(notify):
if not notify.payload:
print "No payload"
return
query = ("select tablename from pg_tables "
"where tablename like '%%' || %s || '%%'")
d = conn.runQuery(query, (notify.payload, ))
d.addCallback(outputResults, notify.payload)
# connect to the database
conn = txpostgres.Connection()
d = conn.connect('dbname=postgres')
# add a NOTIFY observer
conn.addNotifyObserver(observer)
# start listening for NOTIFY events on the 'list' channel
d.addCallback(lambda _: conn.runOperation("listen list"))
d.addCallback(lambda _: util.println("Listening on the `list' channel"))
# process events until killed
reactor.run()
To try it execute the example code and then open another session using psql and try sending some NOTIFY events:
$ psql postgres
psql (9.1.2)
Type "help" for help.
postgres=> notify list, 'user';
NOTIFY
postgres=> notify list, 'auth';
NOTIFY
You should see the example program outputting lists of table names containing the payload:
$ python notify_example.py
Listening on the `list' channel
Tables with `user' in their name:
pg_user_mapping
Tables with `auth' in their name:
pg_authid
pg_auth_members
Automatic reconnection¶
The module includes provision for automatically reconnecting to the database in
case the connection gets broken. To use it, pass a
DeadConnectionDetector
instance to
Connection
. You can customise the detector
instance or subclass it to add custom logic. See the documentation for
DeadConnectionDetector
for details.
When a Connection
is configured with a
detector, it will automatically start the reconnection process whenever it
encounters a certain class of errors indicative of a disconnect. See
defaultDeathChecker()
for more.
While the connection is down, all attempts to use it will result in immediate
failures with ConnectionDead
. This is to
prevent sending additional queries down a link that’s known to be down.
Here’s an example of using automatic reconnection in txpostgres:
from txpostgres import txpostgres, reconnection
from twisted.internet import reactor, task
class LoggingDetector(reconnection.DeadConnectionDetector):
def startReconnecting(self, f):
print '[*] database connection is down (error: %r)' % f.value
return reconnection.DeadConnectionDetector.startReconnecting(self, f)
def reconnect(self):
print '[*] reconnecting...'
return reconnection.DeadConnectionDetector.reconnect(self)
def connectionRecovered(self):
print '[*] connection recovered'
return reconnection.DeadConnectionDetector.connectionRecovered(self)
def result(res):
print '-> query returned result: %s' % res
def error(f):
print '-> query failed with %r' % f.value
def connectionError(f):
print '-> connecting failed with %r' % f.value
def runLoopingQuery(conn):
d = conn.runQuery('select 1')
d.addCallbacks(result, error)
def connected(_, conn):
print '-> connected, running a query periodically'
lc = task.LoopingCall(runLoopingQuery, conn)
return lc.start(2)
# connect to the database using reconnection
conn = txpostgres.Connection(detector=LoggingDetector())
d = conn.connect('dbname=postgres')
# if the connection failed, log the error and start reconnecting
d.addErrback(conn.detector.checkForDeadConnection)
d.addErrback(connectionError)
d.addCallback(connected, conn)
# process events until killed
reactor.run()
You can run this snippet and then try restarting the database. Logging lines should appear, as the connection gets automatically recovered.
Choosing a Psycopg implementation¶
To use txpostgres, you will need a recent enough version of Psycopg, namely 2.2.0 or later. Since parts of Psycopg are written in C, it is not available on some Python implementations, like PyPy. When first imported, txpostgres will try to detect if an API-compatible implementation of Psycopg is available.
You can force a certain implementation to be used by exporing an environment variable TXPOSTGRES_PSYCOPG_IMPL. Recognized values are:
- psycopg2
- Force using Psycopg, do not try any fallbacks.
- psycopg2cffi
- Use psycopg2cffi, a psycopg2 implementation based on cffi, known to work on PyPy.
- psycopg2ct
- Use psycopg2ct, an older psycopg2 implementation using ctypes, also compatible with PyPy.