SSIS_Util:从 SSIS 包文件 (.dtsx) 中提取 SQL 的工具
我们将了解一个从 SSIS 包文件中提取 SQL 的工具,该文件是最近在使用 SSIS(SQL Server 集成服务)时为 SQL 分析而创建的。您可以查看执行方法、源代码、sql文件和csv文件输出结果示例。
1.SSIS和SSIS_Util概述
1.1. SSIS概述
SSIS 代表“SQL Server Integration Services”,在 Microsoft 文档中定义如下。
资源: SQL Server 集成服务 – SQL Server 集成服务 (SSIS) | SQL Server 集成服务微软文档
SQL Server Integration Services 是构建企业级数据集成和数据转换解决方案所需的平台。使用集成服务通过复制或下载文件、加载数据仓库、清理和挖掘数据以及管理 SQL Server 对象和数据来解决复杂的业务问题。
Integration Services 可以从多种来源(例如 XML 数据文件、平面文件和关系数据源)提取和转换数据,然后将其加载到一个或多个目标中。
Integration Services 包括一组丰富的内置操作和转换、用于构建包的图形工具以及用于存储、运行和管理包的 Integration Services 目录数据库。
您可以使用图形集成服务工具来创建解决方案,而无需编写任何代码。您还可以对扩展的 Integration Services 对象模型进行编程,以编程方式创建包并编写自定义任务和其他包对象。
简而言之,SSIS 是一个 ETL 工具,可以从各种来源提取、转换和加载数据。
1.2. SSIS_Util 概述
SSIS_Util 是一个从 SSIS 包文件 (.dtsx) 中提取 SQL 的工具。除了 SQL 之外,还提取工作流信息和 DB ConnectionString。
可以在 Visual Studio 中打开 SSIS 包文件 (.dtsx)。虽然能够直观地检查数据处理过程有一个好处,但要搜索整个SQL或检查整个DB连接信息很麻烦。
我创建它是因为我认为可以通过使用 SSIS_Util 将必要的信息提取到单独的文件中来提高分析效率。
源码已上传至github。
https://github.com/DAToolset/SSIS_Util
2.SSIS包文件(.dtsx)结构
dtsx 文件的内容以 XML 结构定义。公开的dtsx文件中,摘录了以下文件的部分内容。
https://github.com/LearningTechStuff/SSIS-Tutorial/blob/master/Lesson%201.dtsx
<?xml version="1.0"?>
<DTS:Executable
DTS:refId="Package" xmlns:DTS="www.microsoft.com/SqlServer/Dts"
DTS:ExecutableType="SSIS.Package.3"
DTS:CreatorName="REDWOOD\Barry"
DTS:CreatorComputerName="REDWOOD"
DTS:CreationDate="6/9/2012 4:03:57 PM"
DTS:PackageType="5"
DTS:VersionBuild="3"
DTS:VersionGUID="{3A510DCF-C9DB-4039-9817-E408E5D2BACF}"
DTS:LastModifiedProductVersion="11.0.2100.60"
DTS:LocaleID="1033"
DTS:ObjectName="Lesson 1"
DTS:DTSID="{7691D130-E411-4B14-9676-A5DB6B8B4ADE}"
DTS:CreationName="SSIS.Package.3">
<DTS:Property
DTS:Name="PackageFormatVersion">6</DTS:Property>
<DTS:ConnectionManagers>
<DTS:ConnectionManager
DTS:refId="Package.ConnectionManagers[localhost.AdventureWorksDW2012]"
DTS:ObjectName="localhost.AdventureWorksDW2012"
DTS:DTSID="{F094926F-2384-48F7-9515-DB8ACC225303}"
DTS:CreationName="OLEDB">
<DTS:ObjectData>
<DTS:ConnectionManager
DTS:ConnectionString="Data Source=localhost;Initial Catalog=AdventureWorksDW2012;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" />
</DTS:ObjectData>
</DTS:ConnectionManager>
<DTS:ConnectionManager
DTS:refId="Package.ConnectionManagers[Sample Flat File Source Data]"
DTS:ObjectName="Sample Flat File Source Data"
DTS:DTSID="{6DF5747F-57C2-4173-89B6-1A14A807DA3E}"
DTS:CreationName="FLATFILE">
<DTS:ObjectData>
<DTS:ConnectionManager
DTS:Format="Delimited"
DTS:LocaleID="1033"
DTS:HeaderRowDelimiter="_x000D__x000A_"
DTS:RowDelimiter=""
DTS:TextQualifier="_x003C_none_x003E_"
DTS:CodePage="1252"
DTS:ConnectionString="C:\Program Files\Microsoft SQL Server\110\Samples\Integration Services\Tutorial\Creating a Simple ETL Package\Sample Data\SampleCurrencyData.txt">
<DTS:FlatFileColumns>
<DTS:FlatFileColumn
DTS:ColumnType="Delimited"
DTS:ColumnDelimiter="_x0009_"
DTS:DataType="4"
DTS:TextQualified="True"
DTS:ObjectName="AverageRate"
DTS:DTSID="{786CE995-4B7C-41CE-BE81-37933271E018}"
DTS:CreationName="" />
<DTS:FlatFileColumn
DTS:ColumnType="Delimited"
DTS:ColumnDelimiter="_x0009_"
DTS:MaximumWidth="3"
DTS:DataType="130"
DTS:TextQualified="True"
DTS:ObjectName="CurrencyID"
DTS:DTSID="{7E5616A8-02FD-49A7-ADEC-AB276D909B75}"
DTS:CreationName="" />
<DTS:FlatFileColumn
DTS:ColumnType="Delimited"
DTS:ColumnDelimiter="_x0009_"
DTS:DataType="133"
DTS:TextQualified="True"
DTS:ObjectName="CurrencyDate"
DTS:DTSID="{AF15821D-08F4-4FAA-B167-262FE448C298}"
DTS:CreationName="" />
<DTS:FlatFileColumn
DTS:ColumnType="Delimited"
DTS:ColumnDelimiter="_x000D__x000A_"
DTS:DataType="4"
DTS:TextQualified="True"
DTS:ObjectName="EndOfDayRate"
DTS:DTSID="{B9FC32CE-DE09-4626-B539-7E50A8469675}"
DTS:CreationName="" />
</DTS:FlatFileColumns>
</DTS:ConnectionManager>
</DTS:ObjectData>
</DTS:ConnectionManager>
</DTS:ConnectionManagers>
<DTS:Variables />
<DTS:Executables>
<DTS:Executable
DTS:refId="Package\Extract Sample Currency Data"
DTS:ExecutableType="SSIS.Pipeline.3"
DTS:TaskContact="Performs high-performance data extraction, transformation and loading;Microsoft Corporation; Microsoft SQL Server; (C) 2007 Microsoft Corporation; All Rights Reserved;http://www.microsoft.com/sql/support/default.asp;1"
DTS:LocaleID="-1"
DTS:ObjectName="Extract Sample Currency Data"
DTS:DTSID="{E967823D-AA98-44CA-8B5D-E29585E4C0F0}"
DTS:Description="Data Flow Task"
DTS:CreationName="SSIS.Pipeline.3">
<DTS:Variables />
<DTS:ObjectData>
<pipeline
version="1">
<components>
<component
refId="Package\Extract Sample Currency Data\Extract Sample Currency Data"
name="Extract Sample Currency Data"
componentClassID="{D23FD76B-F51D-420F-BBCB-19CBF6AC1AB4}"
description="Flat File Source"
localeId="1033"
usesDispositions="true"
version="1"
contactInfo="Flat File Source;Microsoft Corporation; Microsoft SQL Server; (C) Microsoft Corporation; All Rights Reserved; http://www.microsoft.com/sql/support;1">
<properties>
<property
name="RetainNulls"
dataType="System.Boolean"
description="Specifies whether zero-length columns are treated as null.">false</property>
<property
name="FileNameColumnName"
dataType="System.String"
description="Specifies the name of an output column containing the file name. If no name is specified, no output column containing the file name will be generated."></property>
</properties>
<connections>
...
dtsx 文件的详细结构可以在下面的页面中找到。
3.SSIS_Util Python代码
源代码可以在下面的路径中找到。
https://github.com/DAToolset/SSIS_Util/blob/main/ssis_util.py
3.1.安装必要的包
要运行此代码,您需要安装 lxml 包作为 XML 解析器。
pip 安装 lxml
3.2. SQL 摘录:如何运行 SSIS_Util
python ssis_util.py --in_path <dtdx file path> --out_path <output sql file path>
递归搜索in_path指定路径的整个子路径,找到.dtsx文件,并从文件中提取SQL内容,并将其存储在out_path中。
3.3. SQL提取完整源代码
完整的源代码附在下面。
from lxml import etree
import os
import datetime
import csv
import argparse
# region XML Tag definition
pfx = '{www.microsoft.com/'
tag_Executable = pfx + 'SqlServer/Dts}Executable'
tag_ObjectName = pfx + 'SqlServer/Dts}ObjectName'
atr_refId = pfx + 'SqlServer/Dts}refId'
tag_Objectdata = pfx + 'SqlServer/Dts}ObjectData'
tag_SqlTaskData = pfx + 'sqlserver/dts/tasks/sqltask}SqlTaskData'
atr_ConnectionID = pfx + 'sqlserver/dts/tasks/sqltask}Connection'
atr_SqlStatementSource = pfx + 'sqlserver/dts/tasks/sqltask}SqlStatementSource'
atr_Disabled = pfx + 'SqlServer/Dts}Disabled'
# precedence
tag_PrecedenceConstraint = pfx + 'SqlServer/Dts}PrecedenceConstraint'
tag_From = pfx + 'SqlServer/Dts}From'
tag_To = pfx + 'SqlServer/Dts}To'
# connection
tag_ConnectionManager = pfx + 'SqlServer/Dts}ConnectionManager'
tag_CreationName = pfx + 'SqlServer/Dts}CreationName'
atr_ConnectionString = pfx + 'SqlServer/Dts}ConnectionString'
atr_DTSID = pfx + 'SqlServer/Dts}DTSID'
# variable
tag_Variable = pfx + 'SqlServer/Dts}Variable'
tag_Expression = pfx + 'SqlServer/Dts}Expression'
tag_Namespace = pfx + 'SqlServer/Dts}Namespace'
# endregion
def get_sql_using_xpath(file_name) -> (str, dict):
tree = etree.parse(file_name)
root = tree.getroot()
con_dic = {} # Key: ConnectionName , Value: ConnectionProperty
con_id_dic = {} # Key: ConnectionID , Value: ConnectionName
con_cnt = 0
var_dic = {} # Key: "NameSpace::ObjectName", Value: Expression, etc...
for ele in root.findall(f'.//{tag_ConnectionManager}[@{atr_DTSID}]'): # extract connection list
con_cnt += 1
con_id = ele.attrib[atr_DTSID]
con_name = ele.attrib[tag_ObjectName]
cre_name = ele.attrib[tag_CreationName]
con_str = ""
con_ele = ele.find(f'.//{tag_Objectdata}/{tag_ConnectionManager}')
if con_ele is not None and atr_ConnectionString in con_ele.attrib:
con_str = con_ele.attrib[atr_ConnectionString]
con_id_dic[con_id] = con_name
con_dic[con_name] = f'{con_cnt}. {con_name}\n - CreationName:{cre_name}\n - ConnectionString:[{con_str}]'
con_text = '\n\n'.join(f'{v}' for k, v in con_dic.items())
for ele in root.findall(f'.//{tag_Variable}[@{tag_Expression}]'): # extract variable list
var_name = ele.attrib[tag_Namespace] + "::" + ele.attrib[tag_ObjectName] # Key: "NameSpace::ObjectName"
var_val = ele.attrib[tag_Expression]
var_dic[var_name] = var_val
sql_str = ""
for ele in root.findall(f'.//{tag_Executable}/{tag_Objectdata}/{tag_SqlTaskData}/../..'): # extract SQL list
if atr_refId not in ele.attrib:
continue
ref_id = ele.attrib[atr_refId]
sql_ele = ele.find(f'.//{tag_Objectdata}/{tag_SqlTaskData}[@{atr_SqlStatementSource}]')
if is_disabled(ele): # exclude Disabled task(also ancestor Elements)
continue
tsk_sql_str = ""
tsk_con_name = ""
tsk_con_str = ""
if sql_ele is not None:
tsk_con_id = sql_ele.attrib[atr_ConnectionID]
tsk_con_name = con_id_dic[tsk_con_id]
tsk_con_str = con_dic[tsk_con_name]
tsk_sql_str = sql_ele.attrib[atr_SqlStatementSource]
sql_str += f'/* [Control Flow(제어 흐름) TaskName: {ref_id}]\n [Connection: {tsk_con_str}]\n*/\n{tsk_sql_str}'
sql_str += get_line_separator()
data_flow_str = ""
for ele in root.findall(f'.//pipeline/components/component[@refId]'): # extract data flow list
if is_disabled(ele): # exclude Disabled task(also ancestor Elements)
continue
ref_id = ele.attrib['refId']
sql_cmd_var_str = ""; sql_cmd_str = ""; open_rowset_str = ""
sql_cmd_var_ele = ele.find(f'.//property[@name="SqlCommandVariable"]')
if sql_cmd_var_ele is not None:
sql_cmd_var_str = sql_cmd_var_ele.text
sql_cmd_ele = ele.find(f'.//property[@name="SqlCommand"]')
if sql_cmd_ele is not None:
sql_cmd_str = sql_cmd_ele.text
open_rowset_ele = ele.find(f'.//property[@name="OpenRowset"]')
if open_rowset_ele is not None and open_rowset_ele.text is not None:
open_rowset_str = "OpenRowset: " + open_rowset_ele.text
str_list = [sql_cmd_var_str, sql_cmd_str, open_rowset_str]
try:
data_flow_val = next(s for s in str_list if s)
except:
data_flow_val = ''
con_str = ''
con_ele = ele.find(f'.//connections/connection[@connectionManagerID]')
if con_ele is not None:
con_str = con_ele.attrib['connectionManagerID']
data_flow_str += f'/* [Data Flow(데이터 흐름) TaskName: {ref_id}]\n [Connection: {con_str}] */\n{data_flow_val}'
data_flow_str += get_line_separator()
prd_text = ""
for ele in root.findall(f'.//{tag_PrecedenceConstraint}[@{tag_From}]'): # make precedence string
prd_from = ele.attrib[tag_From]
prd_to = ele.attrib[tag_To]
prd_text += f'[{prd_from}] --> [{prd_to}]\n'
result_str = f"""/* Precedence Constraint(실행 순서)
{prd_text}*/
{get_line_separator()}
/* Connections(연결 정보)
{con_text}
*/
{get_line_separator()}
{data_flow_str}
{get_line_separator()}
{sql_str}"""
return result_str, con_dic
def get_current_datetime() -> str:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
def get_line_separator() -> str:
return f'\n\n/*{"*" * 100}*/\n\n'
def is_disabled(ele) -> bool:
"""check if the element has 'Disabled' attribute and it's value is 'True', and check parent element"""
if atr_Disabled in ele.attrib and ele.attrib[atr_Disabled].lower() == "true":
return True
else:
pele = ele.find('..')
if pele is None:
return False
else:
return is_disabled(pele)
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--in_path', required=True, type=str,
help='input path with dtsx files')
parser.add_argument('--out_path', required=True, type=str,
help='output path with extracted sql files from dtsx files')
args = parser.parse_args()
in_path = os.path.abspath(args.in_path)
out_path = os.path.abspath(args.out_path)
file_list = []
print(f'[{get_current_datetime()}] Start Get File List...')
in_abspath = os.path.abspath(in_path) # os.path.abspath('.') + '\\test_files'
file_types = ('.dtsx',)
for root, dir, files in os.walk(in_abspath):
for file in sorted(files):
# exclude
if file.startswith('~'):
continue
# include
if file.endswith(file_types):
file_list.append(root + '\\' + file)
print(f'[{get_current_datetime()}] Finish Get File List. ({len(file_list)} files)')
print(f'[{get_current_datetime()}] Start Extract File Contents...')
con_list = []
for dtsx_file in file_list:
print(dtsx_file)
sql_file = dtsx_file.replace(in_path, out_path) + ".sql"
sql_file_dir = os.path.dirname(sql_file)
os.makedirs(name=sql_file_dir, exist_ok=True)
result_str, con_dic = get_sql_using_xpath(dtsx_file)
tmp_list = []
for k, v in con_dic.items():
tmp_list.append([dtsx_file, sql_file, k, v])
if len(tmp_list) > 0:
con_list.append(tmp_list)
with open(sql_file, "w", encoding="utf8") as file:
file.write(result_str)
csv_file = out_path + "\\con_sql.csv"
with open(csv_file, 'w', newline='', encoding='ansi') as file:
writer = csv.writer(file)
writer.writerow(["dtsx_file", "sql_file", "connection name", "connection property"])
# writer.writerow(con_list)
for con in con_list:
for con2 in con:
writer.writerow(con2)
print(f'[{get_current_datetime()}] Finish Extract File Contents. ({len(file_list)} files)')
if __name__ == '__main__':
main()
# sample dtsx file: https://github.com/LearningTechStuff/SSIS-Tutorial
- 第 7 行:带有命名空间的 XML 必须在标签和属性中指定命名空间。如果将NameSpace重复写入为字符串,代码可读性会变差,因此提前将其声明为单独的变量。
- 第 38 行:使用 lxml 包解析输入文件。
- 第 46 行:使用 XPath 语法迭代连接信息。
- 第 56 行:将连接信息保存在单独的字典中。键是连接名称,值是连接字符串。此信息用于获取连接字符串作为第 78 行中任务的连接名称。
- 第 70 行和第 85 行:如果任何祖先节点(包括自己的节点)被禁用,则不要提取它们。添加此代码是为了避免提取不必要的信息。要提取完整信息,只需注释掉此代码即可。
- 第 112 行:提取任务前导/跟随者关系。
- 第 117-126 行:构造一个结构来存储全部提取的信息。
- 第 139 行:这是第 70 和 85 行调用的函数。递归调用以检查更高一级的祖先。
- 第 188 行:将第 117 至 126 行创建的提取结果保存为 .sql 文件。
- 第 192-198 行:收集第 56 行中创建的所有连接信息,并将其保存为单独的 csv 文件。
4.SSIS_Util输出结果示例
4.1. SQL提取结果文件示例
/* Precedence Constraint(실행 순서)
[Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2]
[Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3]
[Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4]
[Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5]
*/
/******************************************************************************************************/
/* Connections(연결 정보)
1. key 캐시연결
- CreationName:CACHE
- ConnectionString:[]
2. 10.x.x.1.admin.admin
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]
3. 10.x.x.1.db3.admin
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]
4. 10.x.x.1.db3.admin
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;]
5. localhost.db2
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]
6. localhost.db1
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]
*/
/******************************************************************************************************/
/* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Source]
[Connection: Package.ConnectionManagers[10.x.x.4.db6.admin]] */
select ...
from ...
/******************************************************************************************************/
/* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Target]
[Connection: Package.ConnectionManagers[localhost.db1]] */
OpenRowset: [dbo].[table1]
/******************************************************************************************************/
/* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1]
[Connection: 6. localhost.db1
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]]
*/
delete from ... where dt = @dt
insert ...
(...)
select
...
from (
select
...
from #tempp a
inner join ... b
on ...
where (...)
) t
where ...
/******************************************************************************************************/
/* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2]
[Connection: 5. localhost.db2
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]]
*/
if object_id('tempdb.dbo.#tempp') is not null
drop table #tempp
select
...
into #tempp
from ... a
where dt = @dt
group by ...
merge ... a
using (...) b
on ...
when not matched then
insert(...)
values(...);
/******************************************************************************************************/
/* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3]
[Connection: 3. localhost.db3
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]]
*/
update b
set ...
from ... a
inner join ... b
on ...
where ...
/******************************************************************************************************/
/* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4]
[Connection: 4. localhost.db4
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;]]
*/
update b
set ...
from ... a
inner join ... b
on ...
where ...
/******************************************************************************************************/
/* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5]
[Connection: 2. localhost.admin
- CreationName:OLEDB
- ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]]
*/
if object_id('tempdb.dbo.#tempm') is not null
drop table #tempm
select
...
into #tempm
from a
where a.dt = @dt
merge ... a
using (...) b
on ...
and ...
when not matched then
insert (...)
values (...);
/******************************************************************************************************/
4.2. csv 文件示例
csv 文件包含 dtsx_file、sql_file、连接名称和连接属性信息。
预计在使用 SSIS 的环境中需要从整体角度进行分析时,它将很有用。
如果源代码有任何错误或改进,请发表评论。

