python sqlite3
skypp
|
1#
skypp 发表于 2007-04-01 22:49
python sqlite3
Python 2.5 Documentation=>Library Reference=>Data Persistence=>sqlite3
conn = sqlite3.connect('/tmp/example') ####### c = conn.cursor() # Create table c.execute('''create table stocks (date text, trans text, symbol text, qty real, price real)''') # Insert a row of data c.execute("""insert into stocks values ('2006-01-05','BUY','RHAT',100,35.14)""") ####### # Never do this -- symbol = 'IBM' c.execute("... where symbol = '%s'" % symbol) # Do this instead t = (symbol,) c.execute('select * from stocks where symbol=?', t) # Larger example for t in (('2006-03-28', 'BUY', 'IBM', 1000, 45.00), ('2006-04-05', 'BUY', 'MSOFT', 1000, 72.00), ('2006-04-06', 'SELL', 'IBM', 500, 53.00), ): c.execute('insert into stocks values (?,?,?,?,?)', t) ####### # A minimal SQLite shell for experiments import sqlite3 con = sqlite3.connect(":memory:") con.isolation_level = None cur = con.cursor() buffer = "" print "Enter your SQL commands to execute in sqlite3." print "Enter a blank line to exit." while True: line = raw_input() if line == "": break buffer += line if sqlite3.complete_statement(buffer): try: buffer = buffer.strip() cur.execute(buffer) if buffer.lstrip().upper().startswith("SELECT"): print cur.fetchall() except sqlite3.Error, e: print "An error occurred:", e.args[0] buffer = "" con.close() ####### import sqlite3 import md5 def md5sum(t): return md5.md5(t).hexdigest() con = sqlite3.connect(":memory:") con.create_function("md5", 1, md5sum) cur = con.cursor() cur.execute("select md5(?)", ("foo",)) print cur.fetchone()[0] ####### import sqlite3 class MySum: def __init__(self): self.count = 0 def step(self, value): self.count += value def finalize(self): return self.count con = sqlite3.connect(":memory:") con.create_aggregate("mysum", 1, MySum) cur = con.cursor() cur.execute("create table test(i)") cur.execute("insert into test(i) values (1)") cur.execute("insert into test(i) values (2)") cur.execute("select mysum(i) from test") print cur.fetchone()[0] ####### import sqlite3 def collate_reverse(string1, string2): return -cmp(string1, string2) con = sqlite3.connect(":memory:") con.create_collation("reverse", collate_reverse) cur = con.cursor() cur.execute("create table test(x)") cur.executemany("insert into test(x) values (?)", [("a",), ("b",)]) cur.execute("select x from test order by x collate reverse") for row in cur: print row con.close() ####### import sqlite3 def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d con = sqlite3.connect(":memory:") con.row_factory = dict_factory cur = con.cursor() cur.execute("select 1 as a") print cur.fetchone()["a"] ####### import sqlite3 con = sqlite3.connect(":memory:") cur = con.cursor() # Create the table con.execute("create table person(lastname, firstname)") AUSTRIA = u"\xd6sterreich" # by default, rows are returned as Unicode cur.execute("select ?", (AUSTRIA,)) row = cur.fetchone() assert row[0] == AUSTRIA # but we can make pysqlite always return bytestrings ... con.text_factory = str cur.execute("select ?", (AUSTRIA,)) row = cur.fetchone() assert type(row[0]) == str # the bytestrings will be encoded in UTF-8, unless you stored garbage in the # database ... assert row[0] == AUSTRIA.encode("utf-8") # we can also implement a custom text_factory ... # here we implement one that will ignore Unicode characters that cannot be # decoded from UTF-8 con.text_factory = lambda x: unicode(x, "utf-8", "ignore") cur.execute("select ?", ("this is latin1 and would normally create errors" + u"\xe4\xf6\xfc".encode("latin1"),)) row = cur.fetchone() assert type(row[0]) == unicode # pysqlite offers a builtin optimized text_factory that will return bytestring # objects, if the data is in ASCII only, and otherwise return unicode objects con.text_factory = sqlite3.OptimizedUnicode cur.execute("select ?", (AUSTRIA,)) row = cur.fetchone() assert type(row[0]) == unicode cur.execute("select ?", ("Germany",)) row = cur.fetchone() assert type(row[0]) == str |