4. カスタム SQL 分割方式並列処理ケース (DBMS_PARALLEL_EXECUTE)

Oracle DBMS_PARALLEL_EXECUTEを活用して、カスタムSQL分割方式の並列処理の例を見てみましょう。カスタムSQLの作成、テスト環境、ジョブの作成、ジョブ単位の分割、ジョブの実行、ジョブ完了の確認および削除に関する内容です。

前の記事で続く内容だ。

3. NUMBER Column 分割方式並列処理ケース (DBMS_PARALLEL_EXECUTE)

*注Oracleドキュメント: DBMS_PARALLEL_EXECUTE – CREATE_CHUNKS_BY_SQL Procedure (oracle.com)

4. カスタムSQL分割方式並列処理の事例

4.1。カスタムSQL分割方式の概要

カスタムSQLによる分割方式は、次の場合に便利です。

  • ROWID分割方式がサポートされていない場合の分割(例:DB Linkを介したリモートテーブルのROWID分割)
  • NUMBER列以外の列を基準に分割(VARCHAR2、DATEなど)

ここでは、最初のケースのDB Linkを介したROWID分割のケースについて説明します。

CREATE_CHUNKS_BY_ROWID を使用してDB Linkを介したテーブルのROWIDを分割しようとすると、エラーが発生します。

-- 1단계: 작업생성
BEGIN
  DBMS_PARALLEL_EXECUTE.CREATE_TASK(TASK_NAME => 'DPE_TEST(BY ROWID, VIA DBLINK)');
END;
/

-- 2단계: 작업 단위 분할
BEGIN
  DBMS_PARALLEL_EXECUTE
    .CREATE_CHUNKS_BY_ROWID(TASK_NAME   => 'DPE_TEST(BY ROWID, VIA DBLINK)',
                            TABLE_OWNER => USER,
                            -- TABLE _NAME을 “T1@DL_MS949”로 DB Link 지정
                            TABLE_NAME  => 'T1@DL_MS949',
                            BY_ROW      => TRUE,
                            CHUNK_SIZE  => 10000);
END;
/

--> 실행 오류 메시지
ORA-29491: 조각에 부적합한 테이블
ORA-06512: "SYS.DBMS_PARALLEL_EXECUTE",  27행
ORA-06512: "SYS.DBMS_PARALLEL_EXECUTE",  121행
ORA-06512:  4행

この場合、DB Link上のテーブルに対するROWIDを分割するSQLを作成し、CREATE_CHUNKS_BY_SQLを通じて適用することができる。

DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL (
   task_name  IN  VARCHAR2,
   sql_stmt   IN  CLOB,
   by_rowid   IN  BOOLEAN);

sql_stmt は CLOB type で長さにほとんど制約なく使用できますが、ここでは SQL を直接記述するより Pipe-lined function を使う方法を提示します。

4.2。カスタムSQLの作成

カスタム型を作成し、この型の結果セットを返すパイプライン関数を次のように生成します。

-- 1. TYPE 생성 (Pipe-Lined function에서 return하기 위함)
CREATE OR REPLACE TYPE TP_ROWID_RANGE AS OBJECT (
    START_ROWID VARCHAR2(50)
   ,END_ROWID   VARCHAR2(50)
);

CREATE OR REPLACE TYPE TL_ROWID_RANGE AS TABLE OF TP_ROWID_RANGE;

-- 2. Function 생성
CREATE OR REPLACE FUNCTION FN_SPLIT_BY_ROWID(
    I_OWNER IN VARCHAR2, I_TABLE_NAME IN VARCHAR2, I_CHUNKS IN NUMBER)
RETURN TL_ROWID_RANGE
PIPELINED
AS
  CURSOR C_ROWID_RANGE (CP_OWNER VARCHAR2, CP_TABLE_NAME VARCHAR2, CP_CHUNKS NUMBER)
  IS
    SELECT GRP,
           DBMS_ROWID.ROWID_CREATE( 1, DATA_OBJECT_ID, LO_FNO, LO_BLOCK, 0 ) MIN_RID,
           DBMS_ROWID.ROWID_CREATE( 1, DATA_OBJECT_ID, HI_FNO, HI_BLOCK, 10000 ) MAX_RID
      FROM (
            SELECT DISTINCT GRP,
                   FIRST_VALUE(RELATIVE_FNO) 
                    OVER (PARTITION BY GRP ORDER BY RELATIVE_FNO, BLOCK_ID
                          ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) LO_FNO,
                   FIRST_VALUE(BLOCK_ID) 
                   OVER (PARTITION BY GRP ORDER BY RELATIVE_FNO, BLOCK_ID
                        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) LO_BLOCK,
                   LAST_VALUE(RELATIVE_FNO) 
                    OVER (PARTITION BY GRP ORDER BY RELATIVE_FNO, BLOCK_ID
                    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) HI_FNO,
                   LAST_VALUE(BLOCK_ID+BLOCKS-1) 
                    OVER (PARTITION BY GRP ORDER BY RELATIVE_FNO, BLOCK_ID
                     ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) HI_BLOCK,
                   SUM(BLOCKS) OVER (PARTITION BY GRP) SUM_BLOCKS
              FROM (
                    SELECT RELATIVE_FNO, BLOCK_ID, BLOCKS,
                           TRUNC( (SUM(BLOCKS) OVER (ORDER BY RELATIVE_FNO, BLOCK_ID)-0.01) /
                                  (SUM(BLOCKS) OVER ()/ CP_CHUNKS) ) GRP
                      FROM DBA_EXTENTS@DL_MS949
                     WHERE SEGMENT_NAME = UPPER(CP_TABLE_NAME)
                       AND OWNER = UPPER(CP_OWNER)
                     ORDER BY BLOCK_ID
                   )
           ),
           (SELECT DATA_OBJECT_ID
              FROM DBA_OBJECTS@DL_MS949
             WHERE OWNER = UPPER(CP_OWNER)
               AND OBJECT_NAME = UPPER(CP_TABLE_NAME))
     ORDER BY GRP
    ;
BEGIN
  FOR ROWID_RANGE IN C_ROWID_RANGE(I_OWNER, I_TABLE_NAME, I_CHUNKS) LOOP
      PIPE ROW(TP_ROWID_RANGE(ROWID_RANGE.MIN_RID, ROWID_RANGE.MAX_RID));
  END LOOP;
  RETURN;
END;
/

上記のFunctionで使用されるSQLは、DBA_EXTENTSを基準としたブロック単位のROWID分割であり、Thomas Kyteが提示した技術を参照してDB Linkを使用するように若干変形した。

*参照URL: https://asktom.oracle.com/pls/asktom/f?p=100:11:0::::P11_QUESTION_ID:10498431232211

DB LinkはDL_MS949を指定して使用しましたが、もしDB Linkも動的に指定するには、上記のfunctionのcursor SQLをdynamic SQLに変換して使用すれば可能です。

この関数を使用してLEG.SUB_MON_STATテーブル(総件数7,426)をROWIDに分割すると、次のようになります。

-- DL_MS949 DB Link상의 LEG owner, SUB_MON_STAT table에 대해 4개의 Chunk로 ROWID 분할
SELECT ROWNUM RNO, START_ROWID, END_ROWID
  FROM TABLE(FN_SPLIT_BY_ROWID('LEG', 'SUB_MON_STAT', 4))
;
Row#START_ROWIDEND_ROWID
1AAAQXFAAEAAAACIAAAAAAQXFAAEAAAAC3CcQ
2AAAQXFAAEAAAAC4AAAAAAQXFAAEAAAADHCcQ
3AAAQXFAAEAAAADIAAAAAAQXFAAEAAAADXCcQ
4AAAQXFAAEAAAADYAAAAAAQXFAAEAAAADnCcQ

ここで生成されたROWIDにデータを分割する際に、データ全体に欠落がないか、以下のSQLで確認してみましょう。 

SELECT R.RNO, COUNT(*) CNT
  FROM SUB_MON_STAT S
      ,(
        SELECT 1 RNO, 'AAAQXFAAEAAAACIAAA' START_ROWID, 'AAAQXFAAEAAAAC3CcQ' END_ROWID FROM DUAL
        UNION ALL
        SELECT 2 RNO, 'AAAQXFAAEAAAAC4AAA' START_ROWID, 'AAAQXFAAEAAAADHCcQ' END_ROWID FROM DUAL
        UNION ALL
        SELECT 3 RNO, 'AAAQXFAAEAAAADIAAA' START_ROWID, 'AAAQXFAAEAAAADXCcQ' END_ROWID FROM DUAL
        UNION ALL
        SELECT 4 RNO, 'AAAQXFAAEAAAADYAAA' START_ROWID, 'AAAQXFAAEAAAADnCcQ' END_ROWID FROM DUAL
       ) R
 WHERE S.ROWID BETWEEN R.START_ROWID AND END_ROWID
 GROUP BY R.RNO
 ORDER BY R.RNO
;

実行結果は次のとおりです。 (テスト環境ごとにCNTは異なる場合があります。)

RNO(チャンクノ)CNT
11,790
22,206
32,209
41,221

CNTの合計は7,426で、テーブル全体のRow数と等しく、欠落がないことを確認できます。ここで、各RNOに分割されたRow数は、それぞれ1790、2206、2209、1221に均等ではない。これは、ROWID分割方法で説明した均等に分割されない理由と同じです。

4.3。テスト環境とテストテーブルの作成

Target DBから

DB Linkを利用したSource DBのtableにインポートするテストシナリオを想定し、次の環境構成に進む。

사용자 정의 SQL 분할 방식 병렬 처리 테스트 환경 개념도
カスタムSQL分割方式並列処理テスト環境概念図

Target DBのtableは次のDDLで事前に生成されます。

CREATE TABLE SUB_MON_STAT_COPY
AS
SELECT USE_MON, LINE_NUM, SUB_STA_ID, SUB_STA_NM, RIDE_PASGR_NUM, ALIGHT_PASGR_NUM, WORK_DT
   FROM SUB_MON_STAT@DL_MS949
 WHERE 1=2;

4.4。ジョブの作成

4.4.1.作業の作成

-- 1단계: 작업생성
BEGIN
  DBMS_PARALLEL_EXECUTE.CREATE_TASK(TASK_NAME => 'DPE_TEST(BY SQL(ROWID), VIA DBLINK)');
END;
/

-- 작업 생성 확인
SELECT * FROM USER_PARALLEL_EXECUTE_TASKS;
1단계: 사용자 정의 SQL 분할 방식 병렬 처리 작업 생성 확인
ステップ1:カスタムSQL分割方式並列処理ジョブの作成を確認する

4.4.2。作業単位の分割

FN_SPLIT_BY_ROWID関数を使用して、作業単位を4に指定/分割する。

-- 2단계: 작업 단위 분할
BEGIN
  DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
          TASK_NAME => 'DPE_TEST(BY SQL(ROWID), VIA DBLINK)',
          SQL_STMT  => 'SELECT START_ROWID, END_ROWID FROM TABLE(FN_SPLIT_BY_ROWID(''LEG'', ''SUB_MON_STAT'', 4))',
          BY_ROWID  => TRUE);
END;
/

-- 작업 분할 상태 확인
SELECT *
  FROM USER_PARALLEL_EXECUTE_CHUNKS WHERE TASK_NAME = 'DPE_TEST(BY SQL(ROWID), VIA DBLINK)';
2단계: 사용자 정의 SQL 분할 방식 병렬 처리 작업 분할 상태 확인
ステップ2:カスタムSQL分割方式並列処理ジョブ分割状態の確認

4.5。ジョブの実行

ROWID条件をWHERE句に指定してジョブを実行します。ここでは、作業数を作業単位数と同様に4に指定した。

-- 3단계: 작업 실행
DECLARE
  L_SQL_STMT VARCHAR2(32767);
BEGIN
  L_SQL_STMT := 'INSERT INTO SUB_MON_STAT_COPY
                 SELECT USE_MON, LINE_NUM, SUB_STA_ID, SUB_STA_NM,RIDE_PASGR_NUM, ALIGHT_PASGR_NUM, WORK_DT
                   FROM SUB_MON_STAT@DL_MS949
                  WHERE ROWID BETWEEN :START_ID AND :END_ID';

  DBMS_PARALLEL_EXECUTE.RUN_TASK(TASK_NAME      => 'DPE_TEST(BY SQL(ROWID), VIA DBLINK)',
                                 SQL_STMT       => L_SQL_STMT,
                                 LANGUAGE_FLAG  => DBMS_SQL.NATIVE,
                                 PARALLEL_LEVEL => 4);
END;
/

-- 작업 실행상황, 오류코드/메시지 확인
SELECT  *
  FROM  USER_PARALLEL_EXECUTE_CHUNKS
 WHERE  TASK_NAME = 'DPE_TEST(BY SQL(ROWID), VIA DBLINK)';
3단계: 작업 실행 상황 확인
ステップ3:実行状況を確認する

4.6。ジョブ完了の確認と削除

次のSQLで作業完了を確認できます。

-- 작업 완료 확인
SELECT *
  FROM USER_PARALLEL_EXECUTE_TASKS;
작업 완료 확인
ジョブ完了確認

DROP_TASK( )でジョブを削除します。

-- 4단계: 작업삭제
BEGIN
  DBMS_PARALLEL_EXECUTE.DROP_TASK(TASK_NAME => 'DPE_TEST(BY SQL(ROWID), VIA DBLINK)');
END;
/

5. 考慮事項

  • Target tableがpartitioned tableでDMLがINSERTの場合
    • パーティション数だけ分割し、パーティションキー単位でデータ処理時にDirect Path I/Oが可能だと思う。 (テストはしなかったが、可能だと思う)
    • INSERT 構文に /*+ APPEND */ hint の使用と partition を NOLOGGING として指定する必要がある
  • Target tableがnon-partitioned tableの場合
    • Direct Path I/O が不可能で、conventional I/O のみが可能
    • UNDOの量が大幅に増加する可能性があるため、事前にstorageの空き容量を確保する必要があります
    • チャンクのサイズを小さく設定すると、UNDOのサイズをある程度制限できます。

以上、DBMS_PARALLEL_EXECUTEの活用方法について見てきました。数年前、あるプロジェクトで CLOB カラムを含むテーブルを DB Link を介して並列にロードするために探して知った方法だ。活用したい方に十分よく説明されたことを願う。 

気になる点はコメントで残してほしい。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

ja日本語