irpas技术客

Python—12、DB操作_蓝魔Y_python 数据库开发

大大的周 3392

文档目录 驱动选择接口设计Oracle连接单个连接连接池 MySql连接交互操作数据查询绑定变量批量操作日期时间列元数据行工厂PL/SQL调用

驱动选择

不同的数据库需要使用对应的驱动去连接;

数据库驱动下载链接MySqlmysql-connectorOraclecx_OraclePostgreSqlpsycopg2Greenplumpsycopg2GaussDBpsycopg2
接口设计 动态加载 import sys class UtilGetConnection: __DBtype="" __TypeTuple=("mysql","oracle","postgresql","gaussdb200","odps") def __init__(self,dbtype): self.__DBtype=dbtype.lower() if(self.__DBtype=="oracle"): pass elif(self.__DBtype in ("postrgres","greenplum")): pass elif(self.__DBtype=="gaussdb200"): pass else: print("the dbtype is not in {}".format(self.__DBtype)) sys.exit()

备注:此处实现按照不同的数据库类型进行驱动动态加载;

公共模块 # -*- coding= utf-8 -*- """ @DevTool : PyCharm @Author : xxx @DateTime : 2022/6/9 13:44 @FileName : BaseOperate.py """ class BaseOperate: def __init__(self): pass def sqlDqlOper(self, v_cursor, v_sql): pass def sqlDmlOper(self, v_cursor, v_sql): pass def sqlDdlOPer(self, v_cursor, v_sql): pass def sqlDclOper(self, v_cursor, v_sql): pass

备注:该模块作为公共模块或公共接口供不同类型数据库重写或重载;

Oracle连接 相关链接 cx_Oracle驱动下载:https://pypi.org/project/cx-Oracle/ cx_Oracle驱动文档:https://cx-oracle.readthedocs.io/en/8.3.0/ cx_Oracle文档下载:https://readthedocs.org/projects/cx-oracle/downloads/ Github下载链接:https://github.com/oracle/python-cx_Oracle/tags 单个连接 获取cursor import cx_Oracle # Establish the database connection connection = cx_Oracle.connect(user="username", password="password", dsn="ip:port/sid",encoding="UTF-8") # Obtain a cursor cursor = connection.cursor() doSomething # close cursor cursor.close() # close connection connection.close() 推荐写法: import cx_Oracle # Establish the database connection connection = cx_Oracle.connect(user="username", password="password", dsn="ip:port/sid",encoding="UTF-8") # Obtain a cursor with connection.cursor() as cursor: doSomething pass 连接池 # Create the session pool dbPool = cx_Oracle.SessionPool(user="username", password="password", dsn="ip:port/sid",min=2, max=5, increment=1, encoding="UTF-8") # Acquire a connection from the pool connection = dbPool.acquire() # Use the pooled connection cursor = connection.cursor() doSomething # close cursor cursor.close() # release connection dbPool.release(connection) # close dbPool dbPool.close()

备注:Oracle 实际性能组的建议是使用固定大小的连接池,最小值和最大值的值应相同,增量设置为零;

MySql连接

PyMySql驱动下载:https://pypi.org/project/PyMySQL/ PyMySql驱动文档:https://pymysql.readthedocs.io/en/latest/ Github下载地址:https://github.com/PyMySQL/PyMySQL/tags

交互操作 数据查询 方式1: totalResults=cursor.execute(sql) # Loop over the result set for row in totalResults: pass 方式2:fetchall() cursor.execute(sql) totalResults=cursor.fetchall() # Loop over the result set for row in totalResults: pass 方式3:fetchmany totalRows=0 batchSize=100 while True: manyRows=cursor.fetchmany(batchSize) if not manyRows: break for row in manyRows: totalRows+=1 pass print(totalRows) 方式4:fetchone totalRows=0 while True: row=cursor.fetchone() if not row: break totalRows+=1 pass print(totalRows) 绑定变量 v_sql = "select * from t_etl_col_mapping d where d.tab_id=:tab_id" totalResult=cursor.execute(v_sql,tab_id=1011200122)

备注:绑定变量不能使用在 DDL语句中;只能是在 DML和 DQL 的值和条件中进行参数绑定;

名称绑定 data = { "dept_id": 280, "dept_name": "Facility" } cursor.execute("insert into departments (department_id, department_name) values (:dept_id,:dept_name)", data)

说明:使用字典形式传值,执行名称绑定;

位置绑定 data = [280,"Facility"] cursor.execute("insert into departments (department_id, department_name) values (:dept_id,:dept_name)", data)

说明:使用列表形式传值,执行位置绑定;

批量操作 with connection.cursor() as cursor: v_sql = 'insert into t_dml_oper(uuid,je,update_time,memo) values(:uuid,:je,:update_time,:memo)' data = [{"uuid": "1011101191", "update_time": datetime.datetime(2022, 6, 9, 21, 23, 39, 234098), "je": 19.7, "memo": "数据批量生成-A"}, {"uuid": "1011101192", "je": 11.3, "update_time": datetime.datetime(2022, 6, 9, 21, 23, 39, 234345), "memo": "数据批量生成-B"}, {"uuid": "1011101193", "update_time": datetime.datetime(2022, 6, 9, 21, 23, 39, 234098), "memo": "数据批量生成-C", "je": 987}] try: cursor.executemany(v_sql, data) connection.commit() except cx_Oracle.DatabaseError as e: connection.rollback() info = e.args print("errorCode:{};errorMessage:{}".format(info.code, info.message))

说明:此处数据可以以列表的形式传入,元素为字典形式,实现参数名字绑定;

日期时间

场景:比如 Oracle数据库一个表 t_dml_oper(update_time timestamp(6)),字段 Update_time为时间戳类型 timestamp(6); 操作:现在需要将 datetime.datetime(2022, 6, 9, 21, 23, 39, 234098)插入 update_time列,但是插入后表的 update_time列毫秒为都是 000000;

可以转换的方式:

v_sql = 'insert into t_dml_oper(uuid,je,update_time,memo) values(:uuid,:je,:update_time,:memo)' data = {"uuid": "1011101192", "je": 11.3, "update_time": datetime.datetime(2022, 6, 9, 21, 23, 39, 234345), "memo": "数据批量生成-B"} cursor.prepare(v_sql) cursor.setinputsizes(update_time=cx_Oracle.TIMESTAMP) cursor.executemany(None, data) connection.commit()

说明:在实际执行前在 cursor.prepare和 cursor.execute之间加 cursor.setinputsizes(update_time=cx_Oracle.TIMESTAMP),最终目标表的毫秒位数保留,否则毫秒位数将被截取;

列元数据

使用 cursor.description 返回的是 list 类型,每个元素是 tuple ;

for column in cursor.description: print(column)

备注:输出结果为包含7个元素的元组;

获取列名 columns = [tup[0] for tup in cursor.description] tuple(ele_1,ele_2,ele_3,ele_4,ele_5,ele_6,ele_7) 元组位含义ele_1代表column_nameele_2代表 data_typeele_3ele_4ele_5ele_6ele_7代表列是否可为空
行工厂

可以实现列名和列值的键值对字段;

v_sql = "select * from t_etl_col_mapping d where d.tab_id=:tab_id" totalResult=cursor.execute(v_sql,tab_id=1011200122) # get column_name columns=[col[0] for col in cursor.description] # rowFactory convert cursor.rowfactory=lambda *args:dict(zip(columns,args)) totalRows=0 batchSize=10 while True: row=cursor.fetchone() if not row: break totalRows+=1 print(row) print(totalRows) PL/SQL调用 存过调用

如下存过

begin -- Call the procedure PKG_ETL_SHELL.P_ETL_SHELL_CALL(I_JOB_NAME => :I_JOB_NAME, I_BATCH_ID => :I_BATCH_ID, O_SUCC_FLAG => :O_SUCC_FLAG, O_SHELL_FLAG => :O_SHELL_FLAG); end;

Python调用

cursor = connection.cursor() # declare variable o_succ_flag=cursor.var(str) o_shell_flag=cursor.var(str) cursor.callproc("PKG_ETL_SHELL.P_ETL_SHELL_CALL",["SJZBQ_DJ_NSRXX__ZJ","",o_succ_flag,o_shell_flag]) # o_succ_flag.getvalue() print("o_succ_flag={},o_shell_flag={}".format(o_succ_flag.getvalue(),o_shell_flag.getvalue()))

================================ over ========================================


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Python #数据库开发 #操作数据库