ZT:如何用Python访问SQLServer的代码示例
speakitnow
|
1#
speakitnow 发表于 2006-12-31 10:05
ZT:如何用Python访问SQLServer的代码示例
import win32com.client
class operateData: def __init__(self): pass def __ado1(self): conn = self.getConn() DSN = 'PROVIDER=Microsoft.Jet.OLEDB.4.0;DATA SOURCE=C:/MyDB.mdb;' conn.Open(DSN) rs = self.getRs() Sql = "Sheet1" rs.Open('['+Sql+']',conn,1,3) rs.AddNew() rs.Fields.Item('name').Value = "bbb" rs.Update() conn.Close() def __select(self): import dbi, odbc # 导入ODBC模块 dbc = odbc.odbc('a//') crsr = dbc.cursor() # 产生一个cursor crsr.execute("select * from b") print 'Column descriptions:' # 显示行描述 for col in crsr.description: print ' ', col result = crsr.fetchall() # 一次取出所有的结果 print '\nFirst result row:\n ', result[0] # 显示结果的第一行 def getRs(self): rs = win32com.client.Dispatch(r'ADODB.Recordset') return rs def getConn(self): conn = win32com.client.Dispatch(r'ADODB.Connection') return conn def getConnstr(self,dbName,dbType,user,psw,ip): if dbName"" and dbType == 1: import sys,os (dir,name) = os.path.split(dbName) if name"": (name1,extend) = os.path.splitext(name) if extend"": if extend == ".mdb": #access connStr = "PROVIDER=Microsoft.Jet.OLEDB.4.0;" connStr = connStr+"DATA SOURCE="+dbName+";" elif dbType == 2: #sql server connStr = "Provider=SQLOLEDB.1;Integrated Security=SSPI;Persist Security Info=False;Initial Catalog=faxsy;Data Source="+ip+"" elif dbType == 3: #sysbase connStr = "" elif dbType == 4: #db2 connStr = "" else: connStr = "" return connStr def getTableName(self,conn): listTable = [] oCat = win32com.client.Dispatch(r'ADOX.Catalog') oCat.ActiveConnection = conn oTable = oCat.Tables for x in oTable: if x.Type == "Table" or x.Type == "TABLE": listTable.append(x.Name) return listTable def getItem(self,rs): flds_dict = {} for x in range(rs.Fields.Count): flds_dict[x] = rs.Fields.Item(x).Name return flds_dict def AdoInsert(self,Directory,tableName,gdbName,gdbType,guser,gpsw,gip): tableItem = {} conn = self.getConn() rs = self.getRs() connStr = self.getConnstr(gdbName,gdbType,guser,gpsw,gip) if connStrNone or connStr"": conn.Open(connStr) listTable = self.getTableName(conn) for x in listTable: if x == tableName: rs.Open('['+x+']',conn,1,3) tableItem = self.getItem(rs) try: rs.AddNew() for y in tableItem.keys(): for z in Directory.keys(): if tableItem[y] == z: rs.Fields.Item(tableItem[y]).Value = Directory[z] rs.Update() return 1 except: return -1 break else: return 0 def execSql(self,sql,gdbName,gdbType,guser,gpsw,gip): #操作成功返回11 if sql != "": try: conn = self.getConn() connStr = self.getConnstr(gdbName,gdbType,guser,gpsw,gip) if connStr!="": conn.Open(connStr) try: conn.Execute(sql) conn.Close() return 1 except: return sql #sql有错误 else: return -2 #传入参数有错 except: return 0 #操作数据库有错 else: return -1 #sql语句有错 def selData(self,sql,gdbName,gdbType,guser,gpsw,gip): #返回一个结果集 if sql != "": try: conn = self.getConn() rs = self.getRs() connStr = self.getConnstr(gdbName,gdbType,guser,gpsw,gip) if connStr!="": conn.Open(connStr) try: rs.Open('['+sql+']',conn,1,3) return rs except: return sql #sql有错误 else: return -2 #传入参数有错 except: return 0 #操作数据库有错 else: return -1 #sql语句有错 def s(): import operateData import time print time.ctime() a = operateData.operateData() b = a.selData('select * from b ',"c:/MyDB.mdb",1,"","","") b.MoveFirst() while 1: if b.Eof : break else: print b.Fields.Item('Name').Value b.MoveNext() print time.ctime() if __name__ =="__main__": s() |