SSIS_Util: Herramienta para extraer SQL del archivo del paquete SSIS (.dtsx)
En el trabajo reciente que usa SSIS (Servicios de integración de SQL Server), observamos una herramienta que extrae SQL de un archivo de paquete SSIS simple para el análisis de SQL. Puede verificar el método de ejecución, el código fuente, el archivo sql y ejemplos de resultados de salida del archivo csv.
1. Descripción general de SSIS y SSIS_Util
1.1. Descripción general de SSIS
SSIS significa "Servicios de integración de SQL Server" y se define en la documentación de Microsoft de la siguiente manera.
SQL Server Integration Services es la plataforma para crear soluciones de integración y transformación de datos a nivel empresarial. Utilice Integration Services para resolver problemas comerciales complejos al copiar o descargar archivos, cargar almacenes de datos, limpiar y extraer datos y administrar objetos y datos de SQL Server.
Integration Services puede extraer y transformar datos de una variedad de orígenes, como archivos de datos XML, archivos sin formato y orígenes de datos relacionales, y luego cargarlos en uno o más destinos.
Integration Services incluye un amplio conjunto de tareas y transformaciones integradas, herramientas gráficas para crear paquetes y una base de datos de catálogo de Integration Services para almacenar, ejecutar y administrar paquetes.
Puede usar las herramientas gráficas de Integration Services para crear soluciones sin escribir una sola línea de código. Puede crear paquetes mediante programación programando el amplio modelo de objetos de Integration Services, y también puede codificar tareas personalizadas y otros objetos de paquetes.
En resumen, SSIS es una herramienta ETL que extrae, transforma y carga datos de varias fuentes.
1.2. Descripción general de SSIS_Util
SSIS_Util es una herramienta para extraer SQL del archivo de paquete SSIS (.dtsx). Además de SQL, también se extraen información de flujo de trabajo y DB ConnectionString.
El archivo del paquete SSIS (.dtsx) se puede abrir en Visual Studio. Tiene la ventaja de verificar visualmente el procedimiento de procesamiento de datos, pero es engorroso buscar en todo el SQL o verificar toda la información de conexión de la base de datos.
Lo creé porque pensé que la eficiencia del análisis podría aumentar extrayendo la información necesaria en un archivo separado con SSIS_Util.
El código fuente ha sido subido a github.
https://github.com/DAToolset/SSIS_Util
2. Estructura del archivo del paquete SSIS (.dtsx)
El contenido del archivo dtsx se define en la estructura XML. Entre los archivos dtsx abiertos, se extraen algunos de los contenidos de los siguientes archivos.
https://github.com/LearningTechStuff/SSIS-Tutorial/blob/master/Lesson%201.dtsx
<?xml version="1.0"?> <DTS:Executable DTS:refId="Package" xmlns:DTS="www.microsoft.com/SqlServer/Dts" DTS:ExecutableType="SSIS.Package.3" DTS:CreatorName="REDWOOD\Barry" DTS:CreatorComputerName="REDWOOD" DTS:CreationDate="6/9/2012 4:03:57 PM" DTS:PackageType="5" DTS:VersionBuild="3" DTS:VersionGUID="{3A510DCF-C9DB-4039-9817-E408E5D2BACF}" DTS:LastModifiedProductVersion="11.0.2100.60" DTS:LocaleID="1033" DTS:ObjectName="Lesson 1" DTS:DTSID="{7691D130-E411-4B14-9676-A5DB6B8B4ADE}" DTS:CreationName="SSIS.Package.3"> <DTS:Property DTS:Name="PackageFormatVersion">6</DTS:Property> <DTS:ConnectionManagers> <DTS:ConnectionManager DTS:refId="Package.ConnectionManagers[localhost.AdventureWorksDW2012]" DTS:ObjectName="localhost.AdventureWorksDW2012" DTS:DTSID="{F094926F-2384-48F7-9515-DB8ACC225303}" DTS:CreationName="OLEDB"> <DTS:ObjectData> <DTS:ConnectionManager DTS:ConnectionString="Data Source=localhost;Initial Catalog=AdventureWorksDW2012;Provider=SQLNCLI11.1;Integrated Security=SSPI;Auto Translate=False;" /> </DTS:ObjectData> </DTS:ConnectionManager> <DTS:ConnectionManager DTS:refId="Package.ConnectionManagers[Sample Flat File Source Data]" DTS:ObjectName="Sample Flat File Source Data" DTS:DTSID="{6DF5747F-57C2-4173-89B6-1A14A807DA3E}" DTS:CreationName="FLATFILE"> <DTS:ObjectData> <DTS:ConnectionManager DTS:Format="Delimited" DTS:LocaleID="1033" DTS:HeaderRowDelimiter="_x000D__x000A_" DTS:RowDelimiter="" DTS:TextQualifier="_x003C_none_x003E_" DTS:CodePage="1252" DTS:ConnectionString="C:\Program Files\Microsoft SQL Server\110\Samples\Integration Services\Tutorial\Creating a Simple ETL Package\Sample Data\SampleCurrencyData.txt"> <DTS:FlatFileColumns> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:DataType="4" DTS:TextQualified="True" DTS:ObjectName="AverageRate" DTS:DTSID="{786CE995-4B7C-41CE-BE81-37933271E018}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:MaximumWidth="3" DTS:DataType="130" DTS:TextQualified="True" DTS:ObjectName="CurrencyID" DTS:DTSID="{7E5616A8-02FD-49A7-ADEC-AB276D909B75}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x0009_" DTS:DataType="133" DTS:TextQualified="True" DTS:ObjectName="CurrencyDate" DTS:DTSID="{AF15821D-08F4-4FAA-B167-262FE448C298}" DTS:CreationName="" /> <DTS:FlatFileColumn DTS:ColumnType="Delimited" DTS:ColumnDelimiter="_x000D__x000A_" DTS:DataType="4" DTS:TextQualified="True" DTS:ObjectName="EndOfDayRate" DTS:DTSID="{B9FC32CE-DE09-4626-B539-7E50A8469675}" DTS:CreationName="" /> </DTS:FlatFileColumns> </DTS:ConnectionManager> </DTS:ObjectData> </DTS:ConnectionManager> </DTS:ConnectionManagers> <DTS:Variables /> <DTS:Executables> <DTS:Executable DTS:refId="Package\Extract Sample Currency Data" DTS:ExecutableType="SSIS.Pipeline.3" DTS:TaskContact="Performs high-performance data extraction, transformation and loading;Microsoft Corporation; Microsoft SQL Server; (C) 2007 Microsoft Corporation; All Rights Reserved;http://www.microsoft.com/sql/support/default.asp;1" DTS:LocaleID="-1" DTS:ObjectName="Extract Sample Currency Data" DTS:DTSID="{E967823D-AA98-44CA-8B5D-E29585E4C0F0}" DTS:Description="Data Flow Task" DTS:CreationName="SSIS.Pipeline.3"> <DTS:Variables /> <DTS:ObjectData> <pipeline version="1"> <components> <component refId="Package\Extract Sample Currency Data\Extract Sample Currency Data" name="Extract Sample Currency Data" componentClassID="{D23FD76B-F51D-420F-BBCB-19CBF6AC1AB4}" description="Flat File Source" localeId="1033" usesDispositions="true" version="1" contactInfo="Flat File Source;Microsoft Corporation; Microsoft SQL Server; (C) Microsoft Corporation; All Rights Reserved; http://www.microsoft.com/sql/support;1"> <properties> <property name="RetainNulls" dataType="System.Boolean" description="Specifies whether zero-length columns are treated as null.">false</property> <property name="FileNameColumnName" dataType="System.String" description="Specifies the name of an output column containing the file name. If no name is specified, no output column containing the file name will be generated."></property> </properties> <connections> ...
La estructura detallada del archivo dtsx se puede encontrar en la página siguiente.
3. Código Python SSIS_Util
El código fuente se puede encontrar en la siguiente ruta.
https://github.com/DAToolset/SSIS_Util/blob/main/ssis_util.py
3.1. Instalar los paquetes necesarios
Para ejecutar este código, debe instalar el paquete lxml como un analizador XML.
pip instalar lxml
3.2. Extraer SQL: cómo ejecutar SSIS_Util
python ssis_util.py --in_path <dtdx file path> --out_path <output sql file path>
La ruta completa debajo de la ruta especificada en in_path se busca recursivamente para encontrar el archivo .dtsx, y el contenido de SQL se extrae del archivo y se almacena en out_path.
3.3. Código fuente completo de extracción SQL
El código fuente completo se pega a continuación.
from lxml import etree import os import datetime import csv import argparse # region XML Tag definition pfx = '{www.microsoft.com/' tag_Executable = pfx + 'SqlServer/Dts}Executable' tag_ObjectName = pfx + 'SqlServer/Dts}ObjectName' atr_refId = pfx + 'SqlServer/Dts}refId' tag_Objectdata = pfx + 'SqlServer/Dts}ObjectData' tag_SqlTaskData = pfx + 'sqlserver/dts/tasks/sqltask}SqlTaskData' atr_ConnectionID = pfx + 'sqlserver/dts/tasks/sqltask}Connection' atr_SqlStatementSource = pfx + 'sqlserver/dts/tasks/sqltask}SqlStatementSource' atr_Disabled = pfx + 'SqlServer/Dts}Disabled' # precedence tag_PrecedenceConstraint = pfx + 'SqlServer/Dts}PrecedenceConstraint' tag_From = pfx + 'SqlServer/Dts}From' tag_To = pfx + 'SqlServer/Dts}To' # connection tag_ConnectionManager = pfx + 'SqlServer/Dts}ConnectionManager' tag_CreationName = pfx + 'SqlServer/Dts}CreationName' atr_ConnectionString = pfx + 'SqlServer/Dts}ConnectionString' atr_DTSID = pfx + 'SqlServer/Dts}DTSID' # variable tag_Variable = pfx + 'SqlServer/Dts}Variable' tag_Expression = pfx + 'SqlServer/Dts}Expression' tag_Namespace = pfx + 'SqlServer/Dts}Namespace' # endregion def get_sql_using_xpath(file_name) -> (str, dict): tree = etree.parse(file_name) root = tree.getroot() con_dic = {} # Key: ConnectionName , Value: ConnectionProperty con_id_dic = {} # Key: ConnectionID , Value: ConnectionName con_cnt = 0 var_dic = {} # Key: "NameSpace::ObjectName", Value: Expression, etc... for ele in root.findall(f'.//{tag_ConnectionManager}[@{atr_DTSID}]'): # extract connection list con_cnt += 1 con_id = ele.attrib[atr_DTSID] con_name = ele.attrib[tag_ObjectName] cre_name = ele.attrib[tag_CreationName] con_str = "" con_ele = ele.find(f'.//{tag_Objectdata}/{tag_ConnectionManager}') if con_ele is not None and atr_ConnectionString in con_ele.attrib: con_str = con_ele.attrib[atr_ConnectionString] con_id_dic[con_id] = con_name con_dic[con_name] = f'{con_cnt}. {con_name}\n - CreationName:{cre_name}\n - ConnectionString:[{con_str}]' con_text = '\n\n'.join(f'{v}' for k, v in con_dic.items()) for ele in root.findall(f'.//{tag_Variable}[@{tag_Expression}]'): # extract variable list var_name = ele.attrib[tag_Namespace] + "::" + ele.attrib[tag_ObjectName] # Key: "NameSpace::ObjectName" var_val = ele.attrib[tag_Expression] var_dic[var_name] = var_val sql_str = "" for ele in root.findall(f'.//{tag_Executable}/{tag_Objectdata}/{tag_SqlTaskData}/../..'): # extract SQL list if atr_refId not in ele.attrib: continue ref_id = ele.attrib[atr_refId] sql_ele = ele.find(f'.//{tag_Objectdata}/{tag_SqlTaskData}[@{atr_SqlStatementSource}]') if is_disabled(ele): # exclude Disabled task(also ancestor Elements) continue tsk_sql_str = "" tsk_con_name = "" tsk_con_str = "" if sql_ele is not None: tsk_con_id = sql_ele.attrib[atr_ConnectionID] tsk_con_name = con_id_dic[tsk_con_id] tsk_con_str = con_dic[tsk_con_name] tsk_sql_str = sql_ele.attrib[atr_SqlStatementSource] sql_str += f'/* [Control Flow(제어 흐름) TaskName: {ref_id}]\n [Connection: {tsk_con_str}]\n*/\n{tsk_sql_str}' sql_str += get_line_separator() data_flow_str = "" for ele in root.findall(f'.//pipeline/components/component[@refId]'): # extract data flow list if is_disabled(ele): # exclude Disabled task(also ancestor Elements) continue ref_id = ele.attrib['refId'] sql_cmd_var_str = ""; sql_cmd_str = ""; open_rowset_str = "" sql_cmd_var_ele = ele.find(f'.//property[@name="SqlCommandVariable"]') if sql_cmd_var_ele is not None: sql_cmd_var_str = sql_cmd_var_ele.text sql_cmd_ele = ele.find(f'.//property[@name="SqlCommand"]') if sql_cmd_ele is not None: sql_cmd_str = sql_cmd_ele.text open_rowset_ele = ele.find(f'.//property[@name="OpenRowset"]') if open_rowset_ele is not None and open_rowset_ele.text is not None: open_rowset_str = "OpenRowset: " + open_rowset_ele.text str_list = [sql_cmd_var_str, sql_cmd_str, open_rowset_str] try: data_flow_val = next(s for s in str_list if s) except: data_flow_val = '' con_str = '' con_ele = ele.find(f'.//connections/connection[@connectionManagerID]') if con_ele is not None: con_str = con_ele.attrib['connectionManagerID'] data_flow_str += f'/* [Data Flow(데이터 흐름) TaskName: {ref_id}]\n [Connection: {con_str}] */\n{data_flow_val}' data_flow_str += get_line_separator() prd_text = "" for ele in root.findall(f'.//{tag_PrecedenceConstraint}[@{tag_From}]'): # make precedence string prd_from = ele.attrib[tag_From] prd_to = ele.attrib[tag_To] prd_text += f'[{prd_from}] --> [{prd_to}]\n' result_str = f"""/* Precedence Constraint(실행 순서) {prd_text}*/ {get_line_separator()} /* Connections(연결 정보) {con_text} */ {get_line_separator()} {data_flow_str} {get_line_separator()} {sql_str}""" return result_str, con_dic def get_current_datetime() -> str: return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") def get_line_separator() -> str: return f'\n\n/*{"*" * 100}*/\n\n' def is_disabled(ele) -> bool: """check if the element has 'Disabled' attribute and it's value is 'True', and check parent element""" if atr_Disabled in ele.attrib and ele.attrib[atr_Disabled].lower() == "true": return True else: pele = ele.find('..') if pele is None: return False else: return is_disabled(pele) def main(): parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--in_path', required=True, type=str, help='input path with dtsx files') parser.add_argument('--out_path', required=True, type=str, help='output path with extracted sql files from dtsx files') args = parser.parse_args() in_path = os.path.abspath(args.in_path) out_path = os.path.abspath(args.out_path) file_list = [] print(f'[{get_current_datetime()}] Start Get File List...') in_abspath = os.path.abspath(in_path) # os.path.abspath('.') + '\\test_files' file_types = ('.dtsx',) for root, dir, files in os.walk(in_abspath): for file in sorted(files): # exclude if file.startswith('~'): continue # include if file.endswith(file_types): file_list.append(root + '\\' + file) print(f'[{get_current_datetime()}] Finish Get File List. ({len(file_list)} files)') print(f'[{get_current_datetime()}] Start Extract File Contents...') con_list = [] for dtsx_file in file_list: print(dtsx_file) sql_file = dtsx_file.replace(in_path, out_path) + ".sql" sql_file_dir = os.path.dirname(sql_file) os.makedirs(name=sql_file_dir, exist_ok=True) result_str, con_dic = get_sql_using_xpath(dtsx_file) tmp_list = [] for k, v in con_dic.items(): tmp_list.append([dtsx_file, sql_file, k, v]) if len(tmp_list) > 0: con_list.append(tmp_list) with open(sql_file, "w", encoding="utf8") as file: file.write(result_str) csv_file = out_path + "\\con_sql.csv" with open(csv_file, 'w', newline='', encoding='ansi') as file: writer = csv.writer(file) writer.writerow(["dtsx_file", "sql_file", "connection name", "connection property"]) # writer.writerow(con_list) for con in con_list: for con2 in con: writer.writerow(con2) print(f'[{get_current_datetime()}] Finish Extract File Contents. ({len(file_list)} files)') if __name__ == '__main__': main() # sample dtsx file: https://github.com/LearningTechStuff/SSIS-Tutorial
- Línea 7: XML con NameSpace debe especificar el NameSpace correspondiente en etiquetas y atributos. Si el espacio de nombres se escribe repetidamente como una cadena, la legibilidad del código se deteriora, por lo que debe declararlo como una variable separada de antemano.
- Línea 38: Analizando el archivo de entrada con el paquete lxml.
- Línea 46: Iterar a través de la información de conexión con la sintaxis XPath.
- Línea 56: almacene la información de conexión en un diccionario separado. La clave es el nombre de la conexión y el valor es la cadena de conexión. Esta información se utiliza para obtener la cadena de conexión como nombre de conexión de la tarea en la línea 78.
- Líneas 70 y 85: si alguno de los nodos antepasados, incluido su propio nodo, está deshabilitado, no los extraiga. Este código se agregó para evitar extraer información innecesaria. Puede comentar este código para extraer la información completa.
- Línea 112: extraiga la relación predecesor/secuencia de la tarea.
- Líneas 117~126: Se compone una estructura para almacenar toda la información extraída.
- Línea 139: Esta es la función llamada en las líneas 70 y 85. Se llama recursivamente para determinar el antepasado un nivel más arriba.
- Línea 188: guarde el resultado de la extracción creado en las líneas 117 a 126 como un archivo .sql.
- Líneas 192~198: recopile toda la información de conexión creada en la línea 56 y guárdela como un archivo csv separado.
4. Ejemplo de salida de SSIS_Util
4.1. Ejemplo de archivo de resultados de extracción de SQL
/* Precedence Constraint(실행 순서) [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] --> [Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5] */ /******************************************************************************************************/ /* Connections(연결 정보) 1. key 캐시연결 - CreationName:CACHE - ConnectionString:[] 2. 10.x.x.1.admin.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 3. 10.x.x.1.db3.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 4. 10.x.x.1.db3.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;] 5. localhost.db2 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] 6. localhost.db1 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;] */ /******************************************************************************************************/ /* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Source] [Connection: Package.ConnectionManagers[10.x.x.4.db6.admin]] */ select ... from ... /******************************************************************************************************/ /* [Data Flow(데이터 흐름) TaskName: Package\update\task4 Target] [Connection: Package.ConnectionManagers[localhost.db1]] */ OpenRowset: [dbo].[table1] /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task1] [Connection: 6. localhost.db1 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.3;User ID=admin;Initial Catalog=db1;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ delete from ... where dt = @dt insert ... (...) select ... from ( select ... from #tempp a inner join ... b on ... where (...) ) t where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task2] [Connection: 5. localhost.db2 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.2;User ID=admin;Initial Catalog=db2;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ if object_id('tempdb.dbo.#tempp') is not null drop table #tempp select ... into #tempp from ... a where dt = @dt group by ... merge ... a using (...) b on ... when not matched then insert(...) values(...); /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task3] [Connection: 3. localhost.db3 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=db3;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ update b set ... from ... a inner join ... b on ... where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task4] [Connection: 4. localhost.db4 - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;Initial Catalog=db4;Provider=SQLNCLI11;Integrated Security=SSPI;]] */ update b set ... from ... a inner join ... b on ... where ... /******************************************************************************************************/ /* [Control Flow(제어 흐름) TaskName: Package\Foreach 루프 컨테이너\시퀀스 컨테이너\task5] [Connection: 2. localhost.admin - CreationName:OLEDB - ConnectionString:[Data Source=10.x.x.1;User ID=admin;Initial Catalog=admin;Provider=SQLNCLI11;Persist Security Info=True;Auto Translate=False;]] */ if object_id('tempdb.dbo.#tempm') is not null drop table #tempm select ... into #tempm from a where a.dt = @dt merge ... a using (...) b on ... and ... when not matched then insert (...) values (...); /******************************************************************************************************/
4.2. ejemplo de archivo csv
El archivo csv incluye dtsx_file, sql_file, nombre de conexión e información de propiedades de conexión.
Se espera que sea útil cuando se requiere un análisis desde una perspectiva general en un entorno donde se utiliza SSIS.
Si tiene algún error o mejora en el código fuente, déjelo en los comentarios.