SSIS_Util:从 SSIS 包文件 (.dtsx) 中提取 SQL 的工具
我们将了解一个从 SSIS 包文件中提取 SQL 的工具,该文件是最近在使用 SSIS(SQL Server 集成服务)时为 SQL 分析而创建的。您可以查看执行方法、源代码、sql文件和csv文件输出结果示例。
1.SSIS和SSIS_Util概述
1.1. SSIS概述
SSIS 代表“SQL Server Integration Services”,在 Microsoft 文档中定义如下。
资源: SQL Server 集成服务 – SQL Server 集成服务 (SSIS) | SQL Server 集成服务微软文档
SQL Server Integration Services 是构建企业级数据集成和数据转换解决方案所需的平台。使用集成服务通过复制或下载文件、加载数据仓库、清理和挖掘数据以及管理 SQL Server 对象和数据来解决复杂的业务问题。
Integration Services 可以从多种来源(例如 XML 数据文件、平面文件和关系数据源)提取和转换数据,然后将其加载到一个或多个目标中。
Integration Services 包括一组丰富的内置操作和转换、用于构建包的图形工具以及用于存储、运行和管理包的 Integration Services 目录数据库。
您可以使用图形集成服务工具来创建解决方案,而无需编写任何代码。您还可以对扩展的 Integration Services 对象模型进行编程,以编程方式创建包并编写自定义任务和其他包对象。
简而言之,SSIS 是一个 ETL 工具,可以从各种来源提取、转换和加载数据。
1.2. SSIS_Util 概述
SSIS_Util 是一个从 SSIS 包文件 (.dtsx) 中提取 SQL 的工具。除了 SQL 之外,还提取工作流信息和 DB ConnectionString。
可以在 Visual Studio 中打开 SSIS 包文件 (.dtsx)。虽然能够直观地检查数据处理过程有一个好处,但要搜索整个SQL或检查整个DB连接信息很麻烦。
我创建它是因为我认为可以通过使用 SSIS_Util 将必要的信息提取到单独的文件中来提高分析效率。
源码已上传至github。
https://github.com/DAToolset/SSIS_Util
2.SSIS包文件(.dtsx)结构
dtsx 文件的内容以 XML 结构定义。公开的dtsx文件中,摘录了以下文件的部分内容。
https://github.com/LearningTechStuff/SSIS-Tutorial/blob/master/Lesson%201.dtsx
<?xml version="1.0"?> <DTS:Executable DTS:refId="Package" xmlns:DTS="www.microsoft.com/SqlServer/Dts" DTS:ExecutableType="SSIS.Package.3" DTS:CreatorName="REDWOOD\Barry" DTS:CreatorComputerName="REDWOOD" DTS:CreationDate="6/9/2012 4:03:57 PM" DTS:PackageType="5" DTS:VersionBuild="3" DTS:VersionGUID="{3A510DCF-C9DB-4039-9817-E408E5D2BACF}" DTS:LastModifiedProductVersion="11.0.2100.60" DTS:LocaleID="1033" DTS:ObjectName="Lesson 1" DTS:DTSID="{7691D130-E411-4B14-9676-A5DB6B8B4ADE}" DTS:CreationName="SSIS.Package.3"> <DTS:Property DTS:Name="PackageFormatVersion">6</DTS:Property> <DTS:ConnectionManagers> <DTS:ConnectionManager DTS:refId="Package.ConnectionManagers[localhost.AdventureWorksDW2012]" DTS:ObjectName="localhost.AdventureWorksDW2012" DTS:DTSID="{F094926F-2384-48F7-9515-DB8ACC225303}" DTS:CreationName="OLEDB"> <DTS:ObjectData> <DTS:ConnectionManager DTS:ConnectionString="Data Source=localhost;Initial Catalog=AdventureWorksDW2012;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" /> </DTS:ObjectData> </DTS:ConnectionManager> <DTS:ConnectionManager DTS:refId="Package.ConnectionManagers[Sample Flat File Source Data]" DTS:ObjectName="Sample Flat File Source Data" DTS:DTSID="{6DF5747F-57C2-4173-89B6-1A14A807DA3E}" DTS:CreationName="FLATFILE"> <DTS:ObjectData> <DTS:ConnectionManager DTS:Format="Delimited" DTS:LocaleID="1033" DTS:HeaderRowDelimiter="_x000D__x000A_" DTS:RowDelimiter="" DTS:TextQualifier="_x003C_none_x003E_" DTS:CodePage="1252" DTS:ConnectionString="C:\Program Files\Microsoft SQL Server\110\Samples\Integration Services\Tutorial\Creating a Simple ETL Package\Sample Data\SampleCurrencyData.txt"> <DTS:FlatFileColumns> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:DataType="4" DTS:TextQualified="True" DTS:ObjectName="AverageRate" DTS:DTSID="{786CE995-4B7C-41CE-BE81-37933271E018}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:MaximumWidth="3" DTS:DataType="130" DTS:TextQualified="True" DTS:ObjectName="CurrencyID" DTS:DTSID="{7E5616A8-02FD-49A7-ADEC-AB276D909B75}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:DataType="133" DTS:TextQualified="True" DTS:ObjectName="CurrencyDate" DTS:DTSID="{AF15821D-08F4-4FAA-B167-262FE448C298}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x000D__x000A_" DTS:DataType="4" DTS:TextQualified="True" DTS:ObjectName="EndOfDayRate" DTS:DTSID="{B9FC32CE-DE09-4626-B539-7E50A8469675}" DTS:CreationName="" /> </DTS:FlatFileColumns> </DTS:ConnectionManager> </DTS:ObjectData> </DTS:ConnectionManager> </DTS:ConnectionManagers> <DTS:Variables /> <DTS:Executables> <DTS:Executable DTS:refId="Package\Extract Sample Currency Data" DTS:ExecutableType="SSIS.Pipeline.3" DTS:TaskContact="Performs high-performance data extraction, transformation and loading;Microsoft Corporation; Microsoft SQL Server; (C) 2007 Microsoft Corporation; All Rights Reserved;http://www.microsoft.com/sql/support/default.asp;1" DTS:LocaleID="-1" DTS:ObjectName="Extract Sample Currency Data" DTS:DTSID="{E967823D-AA98-44CA-8B5D-E29585E4C0F0}" DTS:Description="Data Flow Task" DTS:CreationName="SSIS.Pipeline.3"> <DTS:Variables /> <DTS:ObjectData> <pipeline version="1"> <components> <component refId="Package\Extract Sample Currency Data\Extract Sample Currency Data" name="Extract Sample Currency Data" componentClassID="{D23FD76B-F51D-420F-BBCB-19CBF6AC1AB4}" description="Flat File Source" localeId="1033" usesDispositions="true" version="1" contactInfo="Flat File Source;Microsoft Corporation; Microsoft SQL Server; (C) Microsoft Corporation; All Rights Reserved; http://www.microsoft.com/sql/support;1"> <properties> <property name="RetainNulls" dataType="System.Boolean" description="Specifies whether zero-length columns are treated as null.">false</property> <property name="FileNameColumnName" dataType="System.String" description="Specifies the name of an output column containing the file name. If no name is specified, no output column containing the file name will be generated."></property> </properties> <connections> ...
dtsx 文件的详细结构可以在下面的页面中找到。
3.SSIS_Util Python代码
源代码可以在下面的路径中找到。
https://github.com/DAToolset/SSIS_Util/blob/main/ssis_util.py
3.1.安装必要的包
要运行此代码,您需要安装 lxml 包作为 XML 解析器。
pip 安装 lxml
3.2. SQL 摘录:如何运行 SSIS_Util
python ssis_util.py --in_path <dtdx file path> --out_path <output sql file path>
递归搜索in_path指定路径的整个子路径,找到.dtsx文件,并从文件中提取SQL内容,并将其存储在out_path中。
3.3. SQL提取完整源代码
完整的源代码附在下面。
from lxml import etree import os import datetime import csv import argparse # region XML Tag definition pfx = '{www.microsoft.com/' tag_Executable = pfx + 'SqlServer/Dts}Executable' tag_ObjectName = pfx + 'SqlServer/Dts}ObjectName' atr_refId = pfx + 'SqlServer/Dts}refId' tag_Objectdata = pfx + 'SqlServer/Dts}ObjectData' tag_SqlTaskData = pfx + 'sqlserver/dts/tasks/sqltask}SqlTaskData' atr_ConnectionID = pfx + 'sqlserver/dts/tasks/sqltask}Connection' atr_SqlStatementSource = pfx + 'sqlserver/dts/tasks/sqltask}SqlStatementSource' atr_Disabled = pfx + 'SqlServer/Dts}Disabled' # precedence tag_PrecedenceConstraint = pfx + 'SqlServer/Dts}PrecedenceConstraint' tag_From = pfx + 'SqlServer/Dts}From' tag_To = pfx + 'SqlServer/Dts}To' # connection tag_ConnectionManager = pfx + 'SqlServer/Dts}ConnectionManager' tag_CreationName = pfx + 'SqlServer/Dts}CreationName' atr_ConnectionString = pfx + 'SqlServer/Dts}ConnectionString' atr_DTSID = pfx + 'SqlServer/Dts}DTSID' # variable tag_Variable = pfx + 'SqlServer/Dts}Variable' tag_Expression = pfx + 'SqlServer/Dts}Expression' tag_Namespace = pfx + 'SqlServer/Dts}Namespace' # endregion def get_sql_using_xpath(file_name) -> (str, dict): tree = etree.parse(file_name) root = tree.getroot() con_dic = {} # Key: ConnectionName , Value: ConnectionProperty con_id_dic = {} # Key: ConnectionID , Value: ConnectionName con_cnt = 0 var_dic = {} # Key: "NameSpace::ObjectName", Value: Expression, etc... for ele in root.findall(f'.//{tag_ConnectionManager}[@{atr_DTSID}]'): # extract connection list con_cnt += 1 con_id = ele.attrib[atr_DTSID] con_name = ele.attrib[tag_ObjectName] cre_name = ele.attrib[tag_CreationName] con_str = "" con_ele = ele.find(f'.//{tag_Objectdata}/{tag_ConnectionManager}') if con_ele is not None and atr_ConnectionString in con_ele.attrib: con_str = con_ele.attrib[atr_ConnectionString] con_id_dic[con_id] = con_name con_dic[con_name] = f'{con_cnt}. {con_name}\n - CreationName:{cre_name}\n - ConnectionString:[{con_str}]' con_text = '\n\n'.join(f'{v}' for k, v in con_dic.items()) for ele in root.findall(f'.//{tag_Variable}[@{tag_Expression}]'): # extract variable list var_name = ele.attrib[tag_Namespace] + "::" + ele.attrib[tag_ObjectName] # Key: "NameSpace::ObjectName" var_val = ele.attrib[tag_Expression] var_dic[var_name] = var_val sql_str = "" for ele in root.findall(f'.//{tag_Executable}/{tag_Objectdata}/{tag_SqlTaskData}/../..'): # extract SQL list if atr_refId not in ele.attrib: continue ref_id = ele.attrib[atr_refId] sql_ele = ele.find(f'.//{tag_Objectdata}/{tag_SqlTaskData}[@{atr_SqlStatementSource}]') if is_disabled(ele): # exclude Disabled task(also ancestor Elements) continue tsk_sql_str = "" tsk_con_name = "" tsk_con_str = "" if sql_ele is not None: tsk_con_id = sql_ele.attrib[atr_ConnectionID] tsk_con_name = con_id_dic[tsk_con_id] tsk_con_str = con_dic[tsk_con_name] tsk_sql_str = sql_ele.attrib[atr_SqlStatementSource] sql_str += f'/* [Control Flow(제어 흐름) TaskName: {ref_id}]\n [Connection: {tsk_con_str}]\n*/\n{tsk_sql_str}' sql_str += get_line_separator() data_flow_str = "" for ele in root.findall(f'.//pipeline/components/component[@refId]'): # extract data flow list if is_disabled(ele): # exclude Disabled task(also ancestor Elements) continue ref_id = ele.attrib['refId'] sql_cmd_var_str = ""; sql_cmd_str = ""; open_rowset_str = "" sql_cmd_var_ele = ele.find(f'.//property[@name="SqlCommandVariable"]') if sql_cmd_var_ele is not None: sql_cmd_var_str = sql_cmd_var_ele.text sql_cmd_ele = ele.find(f'.//property[@name="SqlCommand"]') if sql_cmd_ele is not None: sql_cmd_str = sql_cmd_ele.text open_rowset_ele = ele.find(f'.//property[@name="OpenRowset"]') if open_rowset_ele is not None and open_rowset_ele.text is not None: open_rowset_str = "OpenRowset: " + open_rowset_ele.text str_list = [sql_cmd_var_str, sql_cmd_str, open_rowset_str] try: data_flow_val = next(s for s in str_list if s) except: data_flow_val = '' con_str = '' con_ele = ele.find(f'.//connections/connection[@connectionManagerID]') if con_ele is not None: con_str = con_ele.attrib['connectionManagerID'] data_flow_str += f'/* [Data Flow(데이터 흐름) TaskName: {ref_id}]\n [Connection: {con_str}] */\n{data_flow_val}' data_flow_str += get_line_separator() prd_text = "" for ele in root.findall(f'.//{tag_PrecedenceConstraint}[@{tag_From}]'): # make precedence string prd_from = ele.attrib[tag_From] prd_to = ele.attrib[tag_To] prd_text += f'[{prd_from}] --> [{prd_to}]\n' result_str = f"""/* Precedence Constraint(실행 순서) {prd_text}*/ {get_line_separator()} /* Connections(연결 정보) {con_text} */ {get_line_separator()} {data_flow_str} {get_line_separator()} {sql_str}""" return result_str, con_dic def get_current_datetime() -> str: return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") def get_line_separator() -> str: return f'\n\n/*{"*" * 100}*/\n\n' def is_disabled(ele) -> bool: """check if the element has 'Disabled' attribute and it's value is 'True', and check parent element""" if atr_Disabled in ele.attrib and ele.attrib[atr_Disabled].lower() == "true": return True else: pele = ele.find('..') if pele is None: return False else: return is_disabled(pele) def main(): parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--in_path', required=True, type=str, help='input path with dtsx files') parser.add_argument('--out_path', required=True, type=str, help='output path with extracted sql files from dtsx files') args = parser.parse_args() in_path = os.path.abspath(args.in_path) out_path = os.path.abspath(args.out_path) file_list = [] print(f'[{get_current_datetime()}] Start Get File List...') in_abspath = os.path.abspath(in_path) # os.path.abspath('.') + '\\test_files' file_types = ('.dtsx',) for root, dir, files in os.walk(in_abspath): for file in sorted(files): # exclude if file.startswith('~'): continue # include if file.endswith(file_types): file_list.append(root + '\\' + file) print(f'[{get_current_datetime()}] Finish Get File List. ({len(file_list)} files)') print(f'[{get_current_datetime()}] Start Extract File Contents...') con_list = [] for dtsx_file in file_list: print(dtsx_file) sql_file = dtsx_file.replace(in_path, out_path) + ".sql" sql_file_dir = os.path.dirname(sql_file) os.makedirs(name=sql_file_dir, exist_ok=True) result_str, con_dic = get_sql_using_xpath(dtsx_file) tmp_list = [] for k, v in con_dic.items(): tmp_list.append([dtsx_file, sql_file, k, v]) if len(tmp_list) > 0: con_list.append(tmp_list) with open(sql_file, "w", encoding="utf8") as file: file.write(result_str) csv_file = out_path + "\\con_sql.csv" with open(csv_file, 'w', newline='', encoding='ansi') as file: writer = csv.writer(file) writer.writerow(["dtsx_file", "sql_file", "connection name", "connection property"]) # writer.writerow(con_list) for con in con_list: for con2 in con: writer.writerow(con2) print(f'[{get_current_datetime()}] Finish Extract File Contents. ({len(file_list)} files)') if __name__ == '__main__': main() # sample dtsx file: https://github.com/LearningTechStuff/SSIS-Tutorial
- 第 7 行:带有命名空间的 XML 必须在标签和属性中指定命名空间。如果将NameSpace重复写入为字符串,代码可读性会变差,因此提前将其声明为单独的变量。
- 第 38 行:使用 lxml 包解析输入文件。
- 第 46 行:使用 XPath 语法迭代连接信息。
- 第 56 行:将连接信息保存在单独的字典中。键是连接名称,值是连接字符串。此信息用于获取连接字符串作为第 78 行中任务的连接名称。
- 第 70 行和第 85 行:如果任何祖先节点(包括自己的节点)被禁用,则不要提取它们。添加此代码是为了避免提取不必要的信息。要提取完整信息,只需注释掉此代码即可。
- 第 112 行:提取任务前导/跟随者关系。
- 第 117-126 行:构造一个结构来存储全部提取的信息。
- 第 139 行:这是第 70 和 85 行调用的函数。递归调用以检查更高一级的祖先。
- 第 188 行:将第 117 至 126 行创建的提取结果保存为 .sql 文件。
- 第 192-198 行:收集第 56 行中创建的所有连接信息,并将其保存为单独的 csv 文件。
4.SSIS_Util输出结果示例
4.1. SQL提取结果文件示例
/* Precedence Constraint(실행 순서) [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5] */ /******************************************************************************************************/ /* Connections(연결 정보) 1. key 캐시연결 - CreationName:CACHE - ConnectionString:[] 2. 10.x.x.1.admin.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 3. 10.x.x.1.db3.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 4. 10.x.x.1.db3.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;] 5. localhost.db2 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 6. localhost.db1 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] */ /******************************************************************************************************/ /* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Source] [Connection: Package.ConnectionManagers[10.x.x.4.db6.admin]] */ select ... from ... /******************************************************************************************************/ /* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Target] [Connection: Package.ConnectionManagers[localhost.db1]] */ OpenRowset: [dbo].[table1] /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1] [Connection: 6. localhost.db1 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ delete from ... where dt = @dt insert ... (...) select ... from ( select ... from #tempp a inner join ... b on ... where (...) ) t where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] [Connection: 5. localhost.db2 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ if object_id('tempdb.dbo.#tempp') is not null drop table #tempp select ... into #tempp from ... a where dt = @dt group by ... merge ... a using (...) b on ... when not matched then insert(...) values(...); /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] [Connection: 3. localhost.db3 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ update b set ... from ... a inner join ... b on ... where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] [Connection: 4. localhost.db4 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;]] */ update b set ... from ... a inner join ... b on ... where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5] [Connection: 2. localhost.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ if object_id('tempdb.dbo.#tempm') is not null drop table #tempm select ... into #tempm from a where a.dt = @dt merge ... a using (...) b on ... and ... when not matched then insert (...) values (...); /******************************************************************************************************/
4.2. csv 文件示例
csv 文件包含 dtsx_file、sql_file、连接名称和连接属性信息。
预计在使用 SSIS 的环境中需要从整体角度进行分析时,它将很有用。
如果源代码有任何错误或改进,请发表评论。