ZT:如何用Python访问SQLServer的代码示例

import   win32com.client  
  class   operateData:  
    def   __init__(self):  
      pass  
     
    def   __ado1(self):  
      conn   =   self.getConn()     
      DSN   =   'PROVIDER=Microsoft.Jet.OLEDB.4.0;DATA   SOURCE=C:/MyDB.mdb;'  
      conn.Open(DSN)  
      rs   =   self.getRs()  
      Sql   =   "Sheet1"  
      rs.Open('['+Sql+']',conn,1,3)  
      rs.AddNew()  
      rs.Fields.Item('name').Value   =   "bbb"  
      rs.Update()  
      conn.Close()  
      
    def   __select(self):  
      import   dbi,   odbc               #   导入ODBC模块  
      dbc   =   odbc.odbc('a//')  
      crsr   =   dbc.cursor()            #   产生一个cursor  
      crsr.execute("select   *   from   b")  
      print   'Column   descriptions:'   #   显示行描述  
      for   col   in   crsr.description:  
        print   '   ',   col  
      result   =   crsr.fetchall()       #   一次取出所有的结果  
      print   '\nFirst   result   row:\n   ',   result[0]     #   显示结果的第一行     
   
    def   getRs(self):  
      rs   =   win32com.client.Dispatch(r'ADODB.Recordset')  
      return   rs  
   
    def   getConn(self):  
      conn   =   win32com.client.Dispatch(r'ADODB.Connection')  
      return   conn  
   
    def   getConnstr(self,dbName,dbType,user,psw,ip):  
      if   dbName""   and   dbType   ==   1:  
        import   sys,os  
        (dir,name)   =   os.path.split(dbName)  
        if   name"":  
          (name1,extend)   =   os.path.splitext(name)  
          if   extend"":  
            if   extend   ==   ".mdb":      #access  
              connStr   =   "PROVIDER=Microsoft.Jet.OLEDB.4.0;"  
              connStr   =   connStr+"DATA   SOURCE="+dbName+";"  
        elif   dbType   ==   2:      #sql   server  
          connStr   =   "Provider=SQLOLEDB.1;Integrated   Security=SSPI;Persist   Security   Info=False;Initial   Catalog=faxsy;Data   Source="+ip+""  
        elif   dbType   ==   3:      #sysbase  
          connStr   =   ""  
        elif   dbType   ==   4:      #db2  
          connStr   =   ""  
        else:           
          connStr   =   ""  
        return   connStr  
   
    def   getTableName(self,conn):   
      listTable   =   []  
      oCat   =   win32com.client.Dispatch(r'ADOX.Catalog')  
      oCat.ActiveConnection   =   conn  
      oTable   =   oCat.Tables  
      for   x   in   oTable:  
        if   x.Type   ==   "Table"   or   x.Type   ==   "TABLE":  
            listTable.append(x.Name)  
      return   listTable  
     
    def   getItem(self,rs):  
      flds_dict   =   {}  
      for   x   in   range(rs.Fields.Count):  
        flds_dict[x]   =   rs.Fields.Item(x).Name  
      return   flds_dict  
   
    def   AdoInsert(self,Directory,tableName,gdbName,gdbType,guser,gpsw,gip):  
      tableItem   =   {}  
      conn   =   self.getConn()  
      rs   =   self.getRs()  
      connStr   =   self.getConnstr(gdbName,gdbType,guser,gpsw,gip)  
      if   connStrNone   or   connStr"":  
        conn.Open(connStr)  
        listTable     =   self.getTableName(conn)  
        for   x   in   listTable:  
          if   x     ==   tableName:  
            rs.Open('['+x+']',conn,1,3)  
            tableItem   =   self.getItem(rs)  
            try:  
              rs.AddNew()  
              for   y   in   tableItem.keys():  
                for   z   in   Directory.keys():  
                  if   tableItem[y]   ==   z:  
                    rs.Fields.Item(tableItem[y]).Value   =   Directory[z]  
              rs.Update()  
              return   1  
            except:  
              return   -1  
            break  
      else:  
        return   0  
         
    def   execSql(self,sql,gdbName,gdbType,guser,gpsw,gip):     #操作成功返回11  
      if   sql   !=   "":  
        try:  
          conn   =   self.getConn()  
          connStr   =   self.getConnstr(gdbName,gdbType,guser,gpsw,gip)  
          if   connStr!="":  
            conn.Open(connStr)  
            try:  
              conn.Execute(sql)  
              conn.Close()  
              return   1  
            except:  
              return   sql     #sql有错误   
          else:  
            return   -2     #传入参数有错      
        except:  
          return   0     #操作数据库有错  
      else:  
        return   -1      #sql语句有错  
   
    def   selData(self,sql,gdbName,gdbType,guser,gpsw,gip):        #返回一个结果集  
      if   sql   !=   "":  
        try:  
          conn   =   self.getConn()  
          rs   =   self.getRs()  
          connStr   =   self.getConnstr(gdbName,gdbType,guser,gpsw,gip)  
          if   connStr!="":  
            conn.Open(connStr)  
            try:  
              rs.Open('['+sql+']',conn,1,3)         
              return   rs  
            except:  
              return   sql     #sql有错误   
          else:  
            return   -2     #传入参数有错      
        except:  
          return   0     #操作数据库有错  
      else:  
        return   -1      #sql语句有错  
   
   
  def   s():  
    import   operateData  
    import   time  
    print   time.ctime()  
    a   =   operateData.operateData()  
    b   =   a.selData('select   *   from   b   ',"c:/MyDB.mdb",1,"","","")  
    b.MoveFirst()  
    while   1:  
      if   b.Eof   :  
        break  
      else:  
        print   b.Fields.Item('Name').Value  
        b.MoveNext()  
     
    print   time.ctime()  
   
if   __name__   =="__main__":  
  s()