irpas技术客

Python数据库操作【三】—— SQLServer_虚梦年华_python sqlserver

未知 6752

SQLServer简介

以下摘自百度百科:

SQL Server是由Microsoft开发和推广的关系数据库管理系统(DBMS),它最初是由Microsoft、Sybase和Ashton-Tate三家公司共同开发的,并于1988年推出了第一个OS/2版本。Microsoft SQL Server近年来不断更新版本,1996年,Microsoft 推出了SQL Server 6.5版本;1998年,SQL Server 7.0版本和用户见面;SQL Server 2000是Microsoft公司于2000年推出,目前最新版本是2019年份推出的SQL SERVER 2019。

SQL Server是微软推出的重量级的数据库,目前有多个版本,如2000、2008、2012等,这些版本名字均为该版本推出的年份,每个版本的差异并不是特别大。本文使用SQL Server2016。

传送门:

Python数据库操作【一】—— Sqlite

Python数据库操作【二】—— MySQL?

本文代码已上传至GitHub,项目地址如下:

https://github.com/XMNHCAS/SQLServerPythonDemo


安装pymssql

python内未集成SQLServer的操作库,需要安装pymssql库。

pip3 install pymssql

?

安装完成后使用import pymssql导入。

pymssql有官方文档,里面有更详细的使用示例,链接地址如下:

GitHub

pymssql文档


增删改查(CRUD) 数据库远程访问配置

使用pymssql访问SQLServer,需要启用SQLServer的TCP/IP协议。

如下图所示,打开SQLServer配置管理器,选择“SQL Server网络配置”中对应的需要使用的实例,在右侧窗口中启用TCP/IP协议。

?

接着重启SQL Server服务,就可以通过IP的形式访问数据库了。

?

?服务重启完成后,使用Navicat测试,可以看到连接成功。

??

新建数据库

新建CreateDatabase.py,使用pymssql创建TestDB数据库。

db.cursor()创建的游标对象,默认返回的是元组,通过设置as_dict=True可以将返回的数据改成字典类型。

如果最后再通过db.commit()来提交事务操作,可能会出现创建数据库失败的情况,故此处设置为autocommit,在游标执行sql的时候就提交操作。

import pymssql # 打开数据库连接 db = pymssql.connect(server='localhost', user='test', password='123456') # 创建游标对象,并设置返回数据的类型为字典 cursor = db.cursor(as_dict=True) # 设置立即操作 db.autocommit(True) # 查看现有数据库 sql = "SELECT * FROM SYSDATABASES" cursor.execute(sql) print("现有数据库:") while True: msg = cursor.fetchone() if msg is None: break print(msg['name']) # 创建数据库 sql = ''' IF NOT EXISTS(SELECT * FROM SYSDATABASES WHERE NAME = 'TestDB') CREATE DATABASE TestDB; ''' cursor.execute(sql) # 查看现有数据库 sql = "SELECT * FROM SYSDATABASES" cursor.execute(sql) print("创建后的数据库:") while True: msg = cursor.fetchone() if msg is None: break print(msg['name']) # 关闭游标与数据库连接 cursor.close() db.close()

运行结果如下:

?

新建数据表?

新建CreateTable.py,使用pymssql在TestDB数据库中新建一个Persons表。

import pymssql # 打开数据库连接 db = pymssql.connect(server='localhost', user='test', password='123456', database='TestDB') # 创建一个游标对象 cursor = db.cursor() # 查看现有表 sql = "SELECT NAME FROM TESTDB.SYS.TABLES" msg = cursor.execute(sql) print(cursor.fetchall()) # 创建Person表 sql = """ USE TestDB CREATE TABLE [Persons]( ID INT PRIMARY KEY IDENTITY(1, 1), Name VARCHAR(50), Age INT, ) """ cursor.execute(sql) # 查看现有表 sql = "SELECT NAME FROM TESTDB.SYS.TABLES" cursor.execute(sql) print(cursor.fetchall()) # 提交事务 db.commit() # 关闭游标与数据库连接 cursor.close() db.close()

运行结果如下:

?

新增记录(Create)

使用游标执行插入语句,不同于pymysql,pymssql不会返回游标执行后影响的行数。

而且由于编码问题,pymssql查询出来的中文数据,如果不进行转码则会输出乱码,所以后面均采用循环输出数据,同时将中文数据改成gbk的编码。该乱码问题可以通过设置数据库字段的编码格式的方式来解决,但是会稍微影响数据库存储性能,故此处不进行演示。

此方式支持单条数据插入和批量插入。

import pymssql # 打开数据库连接 db = pymssql.connect(server='localhost', user='test', password='123456', database='TestDB') # 创建游标对象,并设置返回数据的类型为字典 cursor = db.cursor(as_dict=True) # 插入单条数据 cursor.execute("INSERT INTO Persons(Name, Age) VALUES ('张三',18 )") print("插入单条数据后的数据") cursor.execute("SELECT * FROM Persons") for item in cursor.fetchall(): item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 批量插入数据 sql = "INSERT INTO Persons(Name, Age) VALUES (%s,%s)" cursor.executemany(sql, [('李四', 20), ('王五', 10)]) print("批量插入后的数据") cursor.execute("SELECT * FROM Persons") for item in cursor.fetchall(): item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 提交事务 db.commit() # 关闭游标与数据库连接 cursor.close() db.close()

运行结果如下:

检索记录(Retrieve)

执行查询语句后,可以使用游标的fetchone方法获取单条数据,也可以使用fetchmany获取指定数量的数据,还可以用fetchall直接获取全部数据。为了结果显示得更加清晰,此处全部使用循环进行数据输出。

import pymssql # 打开数据库连接 db = pymssql.connect(server='localhost', user='test', password='123456', database='TestDB') # 创建游标对象,并设置返回数据的类型为字典 cursor = db.cursor(as_dict=True) # 获取全部记录 cursor.execute("SELECT * FROM Persons") allData = cursor.fetchall() print("直接获取全部记录:") for item in allData: item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 获取前N条记录 cursor.execute("SELECT * FROM Persons") manyData = cursor.fetchmany(2) print("获取部分结果:") for item in manyData: item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 一次读取一条结果,循环获取所有记录 cursor.execute("SELECT * FROM Persons") print("一次读取一条结果,循环获取所有记录:") while True: singleData = cursor.fetchone() if singleData is None: break singleData["Name"] = singleData["Name"].encode("latin1").decode("gbk") print(singleData) # 关闭游标 cursor.close() # 提交事务 db.commit() # 关闭数据库连接 db.close()

运行结果如下:

更新记录(Update)

使用游标执行update语句。此方法支持单条数据修改及批量修改。

import pymssql # 打开数据库连接 db = pymssql.connect(server='localhost', user='test', password='123456', database='TestDB') # 创建游标对象,并设置返回数据的类型为字典 cursor = db.cursor(as_dict=True) cursor.execute("SELECT * FROM Persons") print("修改前的数据:") cursor.execute("SELECT * FROM Persons") for item in cursor.fetchall(): item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 执行单条数据修改 cursor.execute("UPDATE Persons SET Age = 20 WHERE Name = '张三'") print("修改单条数据后的数据:") cursor.execute("SELECT * FROM Persons") for item in cursor.fetchall(): item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 执行批量修改 sql = "UPDATE Persons SET Age = %s WHERE Name = %s" batchUpdate = cursor.executemany(sql, [(25, '李四'), (35, '王五')]) print("批量修改后的数据:") cursor.execute("SELECT * FROM Persons") for item in cursor.fetchall(): item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 关闭游标 cursor.close() # 提交事务 db.commit() # 关闭数据库连接 db.close()

运行结果如下:

删除数据(Delete)

使用游标执行delete语句。此方法也支持单条数据删除以及批量删除。

import pymssql # 打开数据库连接 db = pymssql.connect(server='localhost', user='test', password='123456', database='TestDB') # 创建游标对象,并设置返回数据的类型为字典 cursor = db.cursor(as_dict=True) print("删除前的数据:") cursor.execute("SELECT * FROM Persons") data = cursor.fetchall() if len(data) == 0: print(data) else: for item in data: item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 执行单条数据删除 cursor.execute("DELETE FROM Persons WHERE ID = 1") print("删除单条数据后的数据:") cursor.execute("SELECT * FROM Persons") data = cursor.fetchall() if len(data) == 0: print(data) else: for item in data: item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 执行批量删除 sql = "DELETE FROM Persons WHERE ID = %s" cursor.executemany(sql, [('2'), ('3')]) print("批量删除后的的数据:") cursor.execute("SELECT * FROM Persons") data = cursor.fetchall() if len(data) == 0: print(data) else: for item in data: item["Name"] = item["Name"].encode("latin1").decode("gbk") print(item) # 关闭游标 cursor.close() # 提交事务 db.commit() # 关闭数据库连接 db.close()

运行结果如下:


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Python #sqlserver #SQL #Server