SSIS_Util:SSISパッケージファイル(.dtsx)からSQLを抽出するツール
最近、SQL Server Integration Services(SSIS)を使用する作業中にSQL分析用に簡単に作成されたSSISパッケージファイルからSQL抽出するツールについて説明します。実行方法とソースコード、sqlファイル、csvファイルの出力結果の例について確認することができる。
1. SSISとSSIS_Utilの概要
1.1。 SSISの概要
SSISは「SQL Server Integration Services」の略であり、Microsoft文書には次のように定義されています。
ソース: SQL Server Integration Services - SQL Server Integration Services(SSIS)| Microsoft Docs
SQL Server Integration Servicesは、エンタープライズレベルのデータ統合とデータ変換ソリューションを構築するために必要なプラットフォームです。 Integration Servicesを使用してファイルをコピーまたはダウンロードし、データウェアハウスをロードし、データをクリーンアップおよびマイニングし、SQL Serverオブジェクトとデータを管理して、複雑なビジネス上の問題を解決します。
Integration Servicesは、XMLデータファイル、フラットファイル、リレーショナルデータソースなど、さまざまなソースからデータを抽出して変換してから、1つ以上のターゲットにロードできます。
Integration Servicesには、豊富な組み込みタスクと変換セット、パッケージを構築するためのグラフィックツールとパッケージを保存、実行、および管理するIntegration Servicesカタログデータベースが含まれています。
グラフィカル Integration Services ツールを使用すると、コードを 1 行も書かずにソリューションを作成できます。幅広いIntegration Servicesオブジェクトモデルをプログラムしてパッケージをプログラム化し、カスタムタスクや他のパッケージオブジェクトをコーディングすることもできます。
簡単にまとめると、SSISはさまざまなソースからデータを抽出(Extraction)、変換(Transformation)し、ターゲット(target)にロード(Load)するETLツールです。
1.2. SSIS_Utilの概要
SSIS_UtilはSSISパッケージファイル(.dtsx)からSQLを抽出するツールです。 SQLのほか、ワークフロー情報、DB ConnectionStringも一緒に抽出する。
SSISパッケージファイル(.dtsx)はVisual Studioで開くことができます。視覚的にデータ処理手順を確認できるという利点はありますが、SQL全体を検索したり、DB接続情報全体を確認するのが面倒です。
SSIS_Utilで必要な情報を別々のファイルに抽出すれば分析の効率性を高めることができるようにした。
ソースコードはgithubにアップロードしておいた。
https://github.com/DAToolset/SSIS_Util
2. SSIS package file(.dtsx) 構造
dtsxファイルの内容はXML構造で定義されています。公開されているdtsxファイルのうち、次のファイルの内容を一部抜粋しておく。
https://github.com/LearningTechStuff/SSIS-Tutorial/blob/master/Lesson%201.dtsx
<?xml version="1.0"?> <DTS:Executable DTS:refId="Package" xmlns:DTS="www.microsoft.com/SqlServer/Dts" DTS:ExecutableType="SSIS.Package.3" DTS:CreatorName="REDWOOD\Barry" DTS:CreatorComputerName="REDWOOD" DTS:CreationDate="6/9/2012 4:03:57 PM" DTS:PackageType="5" DTS:VersionBuild="3" DTS:VersionGUID="{3A510DCF-C9DB-4039-9817-E408E5D2BACF}" DTS:LastModifiedProductVersion="11.0.2100.60" DTS:LocaleID="1033" DTS:ObjectName="Lesson 1" DTS:DTSID="{7691D130-E411-4B14-9676-A5DB6B8B4ADE}" DTS:CreationName="SSIS.Package.3"> <DTS:Property DTS:Name="PackageFormatVersion">6</DTS:Property> <DTS:ConnectionManagers> <DTS:ConnectionManager DTS:refId="Package.ConnectionManagers[localhost.AdventureWorksDW2012]" DTS:ObjectName="localhost.AdventureWorksDW2012" DTS:DTSID="{F094926F-2384-48F7-9515-DB8ACC225303}" DTS:CreationName="OLEDB"> <DTS:ObjectData> <DTS:ConnectionManager DTS:ConnectionString="Data Source=localhost;Initial Catalog=AdventureWorksDW2012;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" /> </DTS:ObjectData> </DTS:ConnectionManager> <DTS:ConnectionManager DTS:refId="Package.ConnectionManagers[Sample Flat File Source Data]" DTS:ObjectName="Sample Flat File Source Data" DTS:DTSID="{6DF5747F-57C2-4173-89B6-1A14A807DA3E}" DTS:CreationName="FLATFILE"> <DTS:ObjectData> <DTS:ConnectionManager DTS:Format="Delimited" DTS:LocaleID="1033" DTS:HeaderRowDelimiter="_x000D__x000A_" DTS:RowDelimiter="" DTS:TextQualifier="_x003C_none_x003E_" DTS:CodePage="1252" DTS:ConnectionString="C:\Program Files\Microsoft SQL Server\110\Samples\Integration Services\Tutorial\Creating a Simple ETL Package\Sample Data\SampleCurrencyData.txt"> <DTS:FlatFileColumns> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:DataType="4" DTS:TextQualified="True" DTS:ObjectName="AverageRate" DTS:DTSID="{786CE995-4B7C-41CE-BE81-37933271E018}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:MaximumWidth="3" DTS:DataType="130" DTS:TextQualified="True" DTS:ObjectName="CurrencyID" DTS:DTSID="{7E5616A8-02FD-49A7-ADEC-AB276D909B75}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:DataType="133" DTS:TextQualified="True" DTS:ObjectName="CurrencyDate" DTS:DTSID="{AF15821D-08F4-4FAA-B167-262FE448C298}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x000D__x000A_" DTS:DataType="4" DTS:TextQualified="True" DTS:ObjectName="EndOfDayRate" DTS:DTSID="{B9FC32CE-DE09-4626-B539-7E50A8469675}" DTS:CreationName="" /> </DTS:FlatFileColumns> </DTS:ConnectionManager> </DTS:ObjectData> </DTS:ConnectionManager> </DTS:ConnectionManagers> <DTS:Variables /> <DTS:Executables> <DTS:Executable DTS:refId="Package\Extract Sample Currency Data" DTS:ExecutableType="SSIS.Pipeline.3" DTS:TaskContact="Performs high-performance data extraction, transformation and loading;Microsoft Corporation; Microsoft SQL Server; (C) 2007 Microsoft Corporation; All Rights Reserved;http://www.microsoft.com/sql/support/default.asp;1" DTS:LocaleID="-1" DTS:ObjectName="Extract Sample Currency Data" DTS:DTSID="{E967823D-AA98-44CA-8B5D-E29585E4C0F0}" DTS:Description="Data Flow Task" DTS:CreationName="SSIS.Pipeline.3"> <DTS:Variables /> <DTS:ObjectData> <pipeline version="1"> <components> <component refId="Package\Extract Sample Currency Data\Extract Sample Currency Data" name="Extract Sample Currency Data" componentClassID="{D23FD76B-F51D-420F-BBCB-19CBF6AC1AB4}" description="Flat File Source" localeId="1033" usesDispositions="true" version="1" contactInfo="Flat File Source;Microsoft Corporation; Microsoft SQL Server; (C) Microsoft Corporation; All Rights Reserved; http://www.microsoft.com/sql/support;1"> <properties> <property name="RetainNulls" dataType="System.Boolean" description="Specifies whether zero-length columns are treated as null.">false</property> <property name="FileNameColumnName" dataType="System.String" description="Specifies the name of an output column containing the file name. If no name is specified, no output column containing the file name will be generated."></property> </properties> <connections> ...
dtsxファイルの詳細構造は以下のページで確認できます。
3. SSIS_Util Python Code
ソースコードは以下のパスで確認できます。
https://github.com/DAToolset/SSIS_Util/blob/main/ssis_util.py
3.1。必要なパッケージのインストール
このコードを実行するには、XMLパーサーでlxmlパッケージをインストールする必要があります。
pip install lxml
3.2。 SQL抽出:SSIS_Utilの実行方法
python ssis_util.py --in_path <dtdx file path> --out_path <output sql file path>
in_pathに指定したパスの下位フルパスを再帰探索して.dtsxファイルを探し、そのファイルからSQLの内容を抽出してout_pathに格納する。
3.3。 SQL抽出フルソースコード
ソースコード全体を下に貼り付けます。
from lxml import etree import os import datetime import csv import argparse # region XML Tag definition pfx = '{www.microsoft.com/' tag_Executable = pfx + 'SqlServer/Dts}Executable' tag_ObjectName = pfx + 'SqlServer/Dts}ObjectName' atr_refId = pfx + 'SqlServer/Dts}refId' tag_Objectdata = pfx + 'SqlServer/Dts}ObjectData' tag_SqlTaskData = pfx + 'sqlserver/dts/tasks/sqltask}SqlTaskData' atr_ConnectionID = pfx + 'sqlserver/dts/tasks/sqltask}Connection' atr_SqlStatementSource = pfx + 'sqlserver/dts/tasks/sqltask}SqlStatementSource' atr_Disabled = pfx + 'SqlServer/Dts}Disabled' # precedence tag_PrecedenceConstraint = pfx + 'SqlServer/Dts}PrecedenceConstraint' tag_From = pfx + 'SqlServer/Dts}From' tag_To = pfx + 'SqlServer/Dts}To' # connection tag_ConnectionManager = pfx + 'SqlServer/Dts}ConnectionManager' tag_CreationName = pfx + 'SqlServer/Dts}CreationName' atr_ConnectionString = pfx + 'SqlServer/Dts}ConnectionString' atr_DTSID = pfx + 'SqlServer/Dts}DTSID' # variable tag_Variable = pfx + 'SqlServer/Dts}Variable' tag_Expression = pfx + 'SqlServer/Dts}Expression' tag_Namespace = pfx + 'SqlServer/Dts}Namespace' # endregion def get_sql_using_xpath(file_name) -> (str, dict): tree = etree.parse(file_name) root = tree.getroot() con_dic = {} # Key: ConnectionName , Value: ConnectionProperty con_id_dic = {} # Key: ConnectionID , Value: ConnectionName con_cnt = 0 var_dic = {} # Key: "NameSpace::ObjectName", Value: Expression, etc... for ele in root.findall(f'.//{tag_ConnectionManager}[@{atr_DTSID}]'): # extract connection list con_cnt += 1 con_id = ele.attrib[atr_DTSID] con_name = ele.attrib[tag_ObjectName] cre_name = ele.attrib[tag_CreationName] con_str = "" con_ele = ele.find(f'.//{tag_Objectdata}/{tag_ConnectionManager}') if con_ele is not None and atr_ConnectionString in con_ele.attrib: con_str = con_ele.attrib[atr_ConnectionString] con_id_dic[con_id] = con_name con_dic[con_name] = f'{con_cnt}. {con_name}\n - CreationName:{cre_name}\n - ConnectionString:[{con_str}]' con_text = '\n\n'.join(f'{v}' for k, v in con_dic.items()) for ele in root.findall(f'.//{tag_Variable}[@{tag_Expression}]'): # extract variable list var_name = ele.attrib[tag_Namespace] + "::" + ele.attrib[tag_ObjectName] # Key: "NameSpace::ObjectName" var_val = ele.attrib[tag_Expression] var_dic[var_name] = var_val sql_str = "" for ele in root.findall(f'.//{tag_Executable}/{tag_Objectdata}/{tag_SqlTaskData}/../..'): # extract SQL list if atr_refId not in ele.attrib: continue ref_id = ele.attrib[atr_refId] sql_ele = ele.find(f'.//{tag_Objectdata}/{tag_SqlTaskData}[@{atr_SqlStatementSource}]') if is_disabled(ele): # exclude Disabled task(also ancestor Elements) continue tsk_sql_str = "" tsk_con_name = "" tsk_con_str = "" if sql_ele is not None: tsk_con_id = sql_ele.attrib[atr_ConnectionID] tsk_con_name = con_id_dic[tsk_con_id] tsk_con_str = con_dic[tsk_con_name] tsk_sql_str = sql_ele.attrib[atr_SqlStatementSource] sql_str += f'/* [Control Flow(제어 흐름) TaskName: {ref_id}]\n [Connection: {tsk_con_str}]\n*/\n{tsk_sql_str}' sql_str += get_line_separator() data_flow_str = "" for ele in root.findall(f'.//pipeline/components/component[@refId]'): # extract data flow list if is_disabled(ele): # exclude Disabled task(also ancestor Elements) continue ref_id = ele.attrib['refId'] sql_cmd_var_str = ""; sql_cmd_str = ""; open_rowset_str = "" sql_cmd_var_ele = ele.find(f'.//property[@name="SqlCommandVariable"]') if sql_cmd_var_ele is not None: sql_cmd_var_str = sql_cmd_var_ele.text sql_cmd_ele = ele.find(f'.//property[@name="SqlCommand"]') if sql_cmd_ele is not None: sql_cmd_str = sql_cmd_ele.text open_rowset_ele = ele.find(f'.//property[@name="OpenRowset"]') if open_rowset_ele is not None and open_rowset_ele.text is not None: open_rowset_str = "OpenRowset: " + open_rowset_ele.text str_list = [sql_cmd_var_str, sql_cmd_str, open_rowset_str] try: data_flow_val = next(s for s in str_list if s) except: data_flow_val = '' con_str = '' con_ele = ele.find(f'.//connections/connection[@connectionManagerID]') if con_ele is not None: con_str = con_ele.attrib['connectionManagerID'] data_flow_str += f'/* [Data Flow(데이터 흐름) TaskName: {ref_id}]\n [Connection: {con_str}] */\n{data_flow_val}' data_flow_str += get_line_separator() prd_text = "" for ele in root.findall(f'.//{tag_PrecedenceConstraint}[@{tag_From}]'): # make precedence string prd_from = ele.attrib[tag_From] prd_to = ele.attrib[tag_To] prd_text += f'[{prd_from}] --> [{prd_to}]\n' result_str = f"""/* Precedence Constraint(실행 순서) {prd_text}*/ {get_line_separator()} /* Connections(연결 정보) {con_text} */ {get_line_separator()} {data_flow_str} {get_line_separator()} {sql_str}""" return result_str, con_dic def get_current_datetime() -> str: return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") def get_line_separator() -> str: return f'\n\n/*{"*" * 100}*/\n\n' def is_disabled(ele) -> bool: """check if the element has 'Disabled' attribute and it's value is 'True', and check parent element""" if atr_Disabled in ele.attrib and ele.attrib[atr_Disabled].lower() == "true": return True else: pele = ele.find('..') if pele is None: return False else: return is_disabled(pele) def main(): parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--in_path', required=True, type=str, help='input path with dtsx files') parser.add_argument('--out_path', required=True, type=str, help='output path with extracted sql files from dtsx files') args = parser.parse_args() in_path = os.path.abspath(args.in_path) out_path = os.path.abspath(args.out_path) file_list = [] print(f'[{get_current_datetime()}] Start Get File List...') in_abspath = os.path.abspath(in_path) # os.path.abspath('.') + '\\test_files' file_types = ('.dtsx',) for root, dir, files in os.walk(in_abspath): for file in sorted(files): # exclude if file.startswith('~'): continue # include if file.endswith(file_types): file_list.append(root + '\\' + file) print(f'[{get_current_datetime()}] Finish Get File List. ({len(file_list)} files)') print(f'[{get_current_datetime()}] Start Extract File Contents...') con_list = [] for dtsx_file in file_list: print(dtsx_file) sql_file = dtsx_file.replace(in_path, out_path) + ".sql" sql_file_dir = os.path.dirname(sql_file) os.makedirs(name=sql_file_dir, exist_ok=True) result_str, con_dic = get_sql_using_xpath(dtsx_file) tmp_list = [] for k, v in con_dic.items(): tmp_list.append([dtsx_file, sql_file, k, v]) if len(tmp_list) > 0: con_list.append(tmp_list) with open(sql_file, "w", encoding="utf8") as file: file.write(result_str) csv_file = out_path + "\\con_sql.csv" with open(csv_file, 'w', newline='', encoding='ansi') as file: writer = csv.writer(file) writer.writerow(["dtsx_file", "sql_file", "connection name", "connection property"]) # writer.writerow(con_list) for con in con_list: for con2 in con: writer.writerow(con2) print(f'[{get_current_datetime()}] Finish Extract File Contents. ({len(file_list)} files)') if __name__ == '__main__': main() # sample dtsx file: https://github.com/LearningTechStuff/SSIS-Tutorial
- 行7:NameSpaceを持つXMLは、タグと属性に対応するNameSpaceを指定する必要があります。 NameSpaceを繰り返し文字列で作成すると、コードの読みやすさが離れて別の変数として事前に宣言しておきます。
- 行38:lxmlパッケージで入力ファイルをparsingします。
- 行46:XPath構文で接続情報を巡回します。
- 行56:接続情報を別の辞書に保存します。 keyはconnection名、valueはconnection文字列である。この情報は、78行目でタスクの接続名で接続文字列を取得するために使用されます。
- 70行目、85行目:自分自身のノードを含む先祖ノードの1つでも無効であれば抽出しないようにする。このコードは不要な情報を抽出しないように追加しました。完全な情報を抽出するには、このコードをコメントアウトするだけです。
- 行112:作業先行/後続関係を抽出します。
- 117~126行目:抽出した全体情報を保存するための構造で構成する。
- 139行目:70行目、85行目で呼び出す関数です。一歩上の祖先を確認するために再帰的に呼び出す。
- 行188:行117〜126で作成された抽出結果を.sqlファイルとして保存します。
- 192~198行目: 56行目で作成した接続情報をすべて集めて別々のcsvファイルとして保存する。
4. SSIS_Util出力結果の例
4.1。 SQL抽出結果ファイルの例
/* Precedence Constraint(실행 순서) [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5] */ /******************************************************************************************************/ /* Connections(연결 정보) 1. key 캐시연결 - CreationName:CACHE - ConnectionString:[] 2. 10.x.x.1.admin.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 3. 10.x.x.1.db3.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 4. 10.x.x.1.db3.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;] 5. localhost.db2 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 6. localhost.db1 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] */ /******************************************************************************************************/ /* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Source] [Connection: Package.ConnectionManagers[10.x.x.4.db6.admin]] */ select ... from ... /******************************************************************************************************/ /* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Target] [Connection: Package.ConnectionManagers[localhost.db1]] */ OpenRowset: [dbo].[table1] /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1] [Connection: 6. localhost.db1 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ delete from ... where dt = @dt insert ... (...) select ... from ( select ... from #tempp a inner join ... b on ... where (...) ) t where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] [Connection: 5. localhost.db2 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ if object_id('tempdb.dbo.#tempp') is not null drop table #tempp select ... into #tempp from ... a where dt = @dt group by ... merge ... a using (...) b on ... when not matched then insert(...) values(...); /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] [Connection: 3. localhost.db3 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ update b set ... from ... a inner join ... b on ... where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] [Connection: 4. localhost.db4 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;]] */ update b set ... from ... a inner join ... b on ... where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5] [Connection: 2. localhost.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ if object_id('tempdb.dbo.#tempm') is not null drop table #tempm select ... into #tempm from a where a.dt = @dt merge ... a using (...) b on ... and ... when not matched then insert (...) values (...); /******************************************************************************************************/
4.2。 csvファイルの例
csvファイルには、dtsx_file、sql_file、connection name、connection property情報が含まれています。
SSISを使用する環境で全体的な視点の分析が必要な場合に有用に使用できると期待する。
ソースコードのバグや改善点があればコメントとして残してほしい。