Word Extraction Tool(5): Word Extraction Tool Source Code Description(2)

Continuing from the previous article, we look at the source code of the word extraction tool implemented in Python.

This is a continuation of the previous article.

Word Extraction Tool(4): Word Extraction Tool Source Code Description(1)

4. Word extraction tool source code

4.3. get_file_text function

def get_file_text(file_name) -> DataFrame:
    """
    MS Word, PowerPoint, Text, DB Comment(Excel) file에서 text를 추출하는 함수
    :param file_name: 파일명
    :return: file에서 추출한 text(DataFrame type)
    """
    df_text = DataFrame()
    if file_name.endswith(('.doc', '.docx')):
        df_text = get_doc_text(file_name)
    elif file_name.endswith(('.ppt', '.pptx')):
        df_text = get_ppt_text(file_name)
    elif file_name.endswith('.txt'):
        df_text = get_txt_text(file_name)
    elif file_name.endswith(('.xls', '.xlsx', '.xlsb')):
        df_text = get_db_comment_text(file_name)
    return df_text
  • Lines 357 to 365: Execute the appropriate function according to the file extension and return the result in df_text.

The description of each function executed according to the file extension is as follows.

4.3.1. get_doc_text function

def get_doc_text(file_name) -> DataFrame:
    """
    doc 파일에서 text를 추출하여 DataFrame type으로 return
    :param file_name: 입력 파일명 (str type)
    :return: 입력 파일에서 추출한 text
    """
    # :return: 입력 파일에서 추출한 text에 형태소 분석기로 명사 추출한 DataFrame
    start_time = time.time()
    print('\r\nget_doc_text: %s' % file_name)
    word_app = win32com.client.Dispatch("Word.Application")
    word_file = word_app.Documents.Open(file_name, True)
    # result = []
    df_text = pd.DataFrame()
    page = 0
    for paragraph in word_file.Paragraphs:
        text = paragraph.Range.Text
        page = paragraph.Range.Information(3)  # 3: wdActiveEndPageNumber(Text의 페이지번호 확인)
        if text.strip() != '':
            sr_text = Series([file_name, 'doc', page, text, f'{file_name}:{page}:{text}'],
                             index=['FileName', 'FileType', 'Page', 'Text', 'Source'])
            df_text = df_text.append(sr_text, ignore_index=True)

    word_file.Close()
    print('text count: %s' % str(df_text.shape[0]))
    print('page count: %d' % page)
    end_time = time.time()
    # elapsed_time = end_time - start_time
    elapsed_time = str(datetime.timedelta(seconds=end_time - start_time))
    print('[pid:%d] get_doc_text elapsed time: %s' % (os.getpid(), elapsed_time))
    # return get_word_list(df_text)
    return df_text
  • Line 193: Create an instance of the MS Word program with the win32com package. If MS Word is not running, it is executed with this code.
  • Line 194: Open the .doc or .docx file in the instance of the MS Word program created above.
  • Line 198: Iterate through the paragraphs in the file contents.
  • Line 199: Extract text from the contents of the paragraph.
  • Line 200: Extract the page number of the current paragraph. In Range.Information(3), “3” corresponds to the wdActiveEndPageNumber constant value. more details WdInformation enumeration (Word) | Microsoft Docssee
  • Lines 202~204: Make the extracted text a Series object and add it to the row of the df_text DataFrame.
  • Line 214: Return df_text containing the text extracted from the file.

4.3.2. get_ppt_text function

def get_ppt_text(file_name) -> DataFrame:
    """
    ppt 파일에서 text를 추출하여 DataFrame type으로 return
    :param file_name: 입력 파일명 (str type)
    :return: 입력 파일에서 추출한 text
    """
    # :return: 입력 파일에서 추출한 text에 형태소 분석기로 명사 추출한 DataFrame
    start_time = time.time()
    print('\r\nget_ppt_text: %s' % file_name)
    ppt_app = win32com.client.Dispatch('PowerPoint.Application')
    ppt_file = ppt_app.Presentations.Open(file_name, True)
    # result = []
    df_text = pd.DataFrame()
    page_count = 0
    for slide in ppt_file.Slides:
        slide_number = slide.SlideNumber
        page_count += 1
        for shape in slide.Shapes:
            shape_text = []
            text = ''
            if shape.HasTable:
                col_cnt = shape.Table.Columns.Count
                row_cnt = shape.Table.Rows.Count
                for row_idx in range(1, row_cnt + 1):
                    for col_idx in range(1, col_cnt + 1):
                        text = shape.Table.Cell(row_idx, col_idx).Shape.TextFrame.TextRange.Text
                        if text != '':
                            text = text.replace('\r', ' ')
                            shape_text.append(text)
            elif shape.HasTextFrame:
                for paragraph in shape.TextFrame.TextRange.Paragraphs():
                    text = paragraph.Text
                    if text != '':
                        shape_text.append(text)
            for text in shape_text:
                if text.strip() != '':
                    sr_text = Series([file_name, 'ppt', slide_number, text, f'{file_name}:{slide_number}:{text}'],
                                     index=['FileName', 'FileType', 'Page', 'Text', 'Source'])
                    df_text = df_text.append(sr_text, ignore_index=True)
    # print(result)
    ppt_file.Close()
    # print(df_result)
    print('text count: %s' % str(df_text.shape[0]))
    print('page count: %d' % page_count)
    # print(df_text.head(10))
    # print(df_result.Paragraph)
    # return df_result
    end_time = time.time()
    # elapsed_time = end_time - start_time
    elapsed_time = str(datetime.timedelta(seconds=end_time - start_time))
    print('[pid:%d] get_ppt_text elapsed time: %s' % (os.getpid(), elapsed_time))
    # return get_word_list(df_text)
    return df_text
  • Line 138: Create an instance of the MS Powerpoint program with the win32com package. If MS Powerpoint is not running, this code will run.
  • Line 139: Open the .ppt or .pptx file in the MS Powerpoint program instance created above.
  • Line 143: Traverse the slides in the file.
  • Line 146: Iterate through the shapes of each slide.
  • Lines 149~157: If the shape is a table, text is extracted from each cell of the table.
  • Lines 158~162: If the shape is not a table and has text, text is extracted.
  • Lines 163 to 167: Make the extracted text a Series object and add it to the row of the df_text DataFrame.
  • Line 181: Return df_text containing the text extracted from the file.

4.3.3. get_txt_text function

def get_txt_text(file_name) -> DataFrame:
    """
    txt 파일에서 text를 추출하여 DataFrame type으로 return
    :param file_name: 입력 파일명 (str type)
    :return: 입력 파일에서 추출한 text
    """
    # :return: 입력 파일에서 추출한 text에 형태소 분석기로 명사 추출한 DataFrame
    start_time = time.time()
    print('\r\nget_txt_text: ' + file_name)
    df_text = pd.DataFrame()
    line_number = 0
    with open(file_name, 'rt', encoding='UTF8') as file:
        for text in file:
            line_number += 1
            if text.strip() != '':
                sr_text = Series([file_name, 'txt', line_number, text, f'{file_name}:{line_number}:{text}'],
                                 index=['FileName', 'FileType', 'Page', 'Text', 'Source'])
                df_text = df_text.append(sr_text, ignore_index=True)
    print('text count: %d' % df_text.shape[0])
    print('line count: %d' % line_number)
    end_time = time.time()
    # elapsed_time = end_time - start_time
    elapsed_time = str(datetime.timedelta(seconds=end_time - start_time))
    print('[pid:%d] get_txt_text elapsed time: %s' % (os.getpid(), elapsed_time))
    # return get_word_list(df_text)
    return df_text
  • Line 228: Open file in read-only text mode with UTF8 encoding. (mode='rt')
  • Line 229: Iterate through the lines of the file.
  • Lines 231 to 234: Make the row text a Series object, and add it to the rows of the df_text DataFrame.
  • Line 242: Return df_text containing the text extracted from the file.

4.3.4. get_db_comment_text function

def get_db_comment_text(file_name) -> DataFrame:
    """
    db_comment 파일에서 text를 추출하여 DataFrame type으로 return
    :param file_name:  입력 파일명 (str type)
    :return: 입력 파일에서 추출한 text
    """
    # :return: 입력 파일에서 추출한 text에 형태소 분석기로 명사 추출한 DataFrame
    start_time = time.time()
    print('\r\nget_db_comment_text: %s' % file_name)
    excel_app = win32com.client.Dispatch('Excel.Application')
    full_path_file_name = os.path.abspath(file_name)
    excel_file = excel_app.Workbooks.Open(full_path_file_name, True)

    # region Table comment
    table_comment_sheet = excel_file.Worksheets(1)
    last_row = table_comment_sheet.Range("A1").End(-4121).Row  # -4121: xlDown
    table_comment_range = 'A2:D%s' % (str(last_row))
    print('table_comment_range : %s (%d rows)' % (table_comment_range, last_row - 1))
    table_comments = table_comment_sheet.Range(table_comment_range).Value2
    df_table = pd.DataFrame(list(table_comments),
                            columns=['DB', 'Schema', 'Table', 'Text'])
    df_table['FileName'] = full_path_file_name
    df_table['FileType'] = 'table'
    df_table['Page'] = 0
    df_table = df_table[df_table.Text.notnull()]  # Text 값이 없는 행 제거
    df_table['Source'] = df_table['DB'] + '.' + df_table['Schema'] + '.' + df_table['Table'] \
                         + '(' + df_table['Text'].astype(str) + ')'
    # print(df_table)
    # endregion

    # region Column comment
    column_comment_sheet = excel_file.Worksheets(2)
    last_row = column_comment_sheet.Range("A1").End(-4121).Row  # -4121: xlDown
    column_comment_range = 'A2:E%s' % (str(last_row))
    print('column_comment_range : %s (%d rows)' % (column_comment_range, last_row - 1))
    column_comments = column_comment_sheet.Range(column_comment_range).Value2
    df_column = pd.DataFrame(list(column_comments),
                             columns=['DB', 'Schema', 'Table', 'Column', 'Text'])
    df_column['FileName'] = full_path_file_name
    df_column['FileType'] = 'column'
    df_column['Page'] = 0
    df_column = df_column[df_column.Text.notnull()]  # Text 값이 없는 행 제거
    df_column['Source'] = df_column['DB'] + '.' + df_column['Schema'] + '.' + df_column['Table'] \
                          + '.' + df_column['Column'] + '(' + df_column['Text'].astype(str) + ')'
    # print(df_column)
    # endregion

    excel_file.Close()
    df_text = df_column.append(df_table, ignore_index=True)
    # print(df_text)
    end_time = time.time()
    # elapsed_time = end_time - start_time
    elapsed_time = str(datetime.timedelta(seconds=end_time - start_time))
    print('[pid:%d] get_db_comment_text elapsed time: %s' % (os.getpid(), elapsed_time))
    print('text count: %s' % str(df_text.shape[0]))
    # return get_word_list(df_text)
    return df_text
  • Line 300: Create an instance of the MS Excel program with the win32com package. If MS Excel is not running, it is executed with this code.
  • Line 302: Open the .xls or .xlsx file in the instance of the MS Excel program created above.
  • Lines 305~317: Text is extracted from the first sheet of the Excel file where table comments are stored.
  • Line 306: Get the last row number of the table comment sheet. In Range(“A1”).End(-4211).Row, “-4211” is the “xlDown” constant. more details XlDirection enumeration (Excel) | Microsoft Docs see the documentation
  • Line 309: Read the contents of the table comments sheet into the table_comments variable. This method is a method of reading the contents of the range into memory at once without using a loop. VBA Coding Pattern: Range Loop - Read see content This article is explained in Excel VBA, but if you use OLE Automation in Python, you can apply almost the same.
  • Lines 310-317: Convert table_comments to DataFrame df_table and add data from columns 'FileName', 'FileType', 'Page', 'Source'.
  • Lines 322~334: Text is extracted from the second sheet of the Excel file where column comments are stored.
  • Line 323: Get the last line number of the column comment sheet. Use the same method as in line 306.
  • Line 326: Read the contents of the column comments sheet into the column_comments variable. Use the same method as in line 309.
  • Lines 327-334: Convert column_comments to DataFrame df_column and add data from columns 'FileName', 'FileType', 'Page', 'Source'.
  • Line 339: Combine df_column and df_table to create df_text.
  • Line 347: Returns df_text containing the text extracted from the Excel file in which the DB table and column comments are stored.

4.3.5. get_hwp_text function

def get_hwp_text(file_name) -> DataFrame:
    pass

currently not implemented It will be implemented in the future if necessary.

4.3.6. get_pdf_text function

def get_pdf_text(file_name) -> DataFrame:
    pass

currently not implemented It will be implemented in the future if necessary.

4.4. get_word_list function

This is the most important function in the word extraction tool.

def get_word_list(df_text) -> DataFrame:
    """
    text 추출결과 DataFrame에서 명사를 추출하여 최종 output을 DataFrame type으로 return
    :param df_text: 파일에서 추출한 text(DataFrame type)
    :return: 명사, 복합어(1개 이상의 명사, 접두사+명사+접미사) 추출결과(Dataframe type)
    """
    start_time = time.time()
    df_result = DataFrame()

    tagger = Mecab()
    # tagger = Komoran()
    row_idx = 0
    for index, row in df_text.iterrows():
        row_idx += 1
        if row_idx % 100 == 0:  # 100건마다 현재 진행상태 출력
            print('[pid:%d] current: %d, total: %d, progress: %3.2f%%' %
                  (os.getpid(), row_idx, df_text.shape[0], round(row_idx / df_text.shape[0] * 100, 2)))
        file_name = row['FileName']
        file_type = row['FileType']
        page = row['Page']
        text = str(row['Text'])
        source = (row['Source'])
        is_db = True if row['FileType'] in ('table', 'column') else False
        is_db_table = True if row['FileType'] == 'table' else False
        is_db_column = True if row['FileType'] == 'column' else False
        if is_db:
            db = row['DB']
            schema = row['Schema']
            table = row['Table']
            if is_db_column:
                column = row['Column']

        if text is None or text.strip() == '':
            continue
        try:
            # nouns = mecab.nouns(text)
            # [O]ToDo: 연속된 체언접두사(XPN), 명사파생접미사(XSN) 까지 포함하여 추출
            # [O]ToDo: 명사(NNG, NNP)가 연속될 때 각각 명사와 연결된 복합명사 함께 추출
            text_pos = tagger.pos(text)
            words = [pos for pos, tag in text_pos if tag in ['NNG', 'NNP', 'SL']]  # NNG: 일반명사, NNP: 고유명사
            pos_list = [x for (x, y) in text_pos]
            tag_list = [y for (x, y) in text_pos]
            pos_str = '/'.join(pos_list) + '/'
            tag_str = '/'.join(tag_list) + '/'
            iterator = re.finditer('(NNP/|NNG/)+(XSN/)*|(XPN/)+(NNP/|NNG/)+(XSN/)*|(SL/)+', tag_str)
            for mo in iterator:
                x, y = mo.span()
                if x == 0:
                    start_idx = 0
                else:
                    start_idx = tag_str[:x].count('/')
                end_idx = tag_str[:y].count('/')
                sub_pos = ''
                # if end_idx - start_idx > 1 and not (start_idx == 0 and end_idx == len(tag_list)):
                if end_idx - start_idx > 1:
                    for i in range(start_idx, end_idx):
                        sub_pos += pos_list[i]
                    # print('%s[sub_pos]' % sub_pos)
                    words.append('%s[복합어]' % sub_pos)  # 추가 형태소 등록

            if len(words) >= 1:
                # print(nouns, text)
                for word in words:
                    # print(noun, '\t', text)
                    if not is_db:
                        # sr_text = Series([file_name, file_type, page, text, word],
                        #                  index=['FileName', 'FileType', 'Page', 'Text', 'Word'])
                        df_word = DataFrame(
                            {'FileName': [file_name], 'FileType': [file_type], 'Page': [page], 'Text': [text],
                             'Word': [word], 'Source': [source]})
                    elif is_db_table:
                        # sr_text = Series([file_name, file_type, page, text, word, db, schema, table],
                        #                  index=['FileName', 'FileType', 'Page', 'Text', 'Word', 'DB', 'Schema', 'Table'])
                        df_word = DataFrame(
                            {'FileName': [file_name], 'FileType': [file_type], 'Page': [page], 'Text': [text],
                             'Word': [word], 'DB': [db], 'Schema': [schema], 'Table': [table “” not found /]
, 'Source': [source]}) elif is_db_column: # sr_text = Series([file_name, file_type, page, text, word, db, schema, table, column], # index=['FileName', 'FileType', 'Page', 'Text', 'Word', 'DB', 'Schema', 'Table', 'Column']) df_word = DataFrame( {'FileName': [file_name], 'FileType': [file_type], 'Page': [page], 'Text': [text], 'Word': [word], 'DB': [db], 'Schema': [schema], 'Table': [table “” not found /]
, 'Column': [column], 'Source': [source]}) # df_result = df_result.append(sr_text, ignore_index=True) # Todo: append를 concat으로 바꾸기 df_result = pd.concat([df_result, df_word], ignore_index=True) except Exception as ex: print('[pid:%d] Exception has raised for text: %s' % (os.getpid(), text)) print(ex) print( '[pid:%d] input text count:%d, extracted word count: %d' % (os.getpid(), df_text.shape[0], df_result.shape[0])) end_time = time.time() # elapsed_time = end_time - start_time elapsed_time = str(datetime.timedelta(seconds=end_time - start_time)) print('[pid:%d] get_word_list finished. total: %d, elapsed time: %s' % (os.getpid(), df_text.shape[0], elapsed_time)) return df_result
  • Line 35: Create the natural language morpheme analyzer Mecab object. To use a tagger other than Mecab, change the package name here.
  • Line 38: Iterate through the rows of DataFrame df_text.
  • Line 64: Execute part-of-speech tagging of the morpheme analyzer with the pos function. I will separate the contents related to part-of-speech tagging.
    • The part-of-speech tagging function pos decomposes the input string into parts-of-speech units and returns a string in which each unit is tagged.
    • For example, if the text is 'Users define functional and non-functional requirements', the execution result of the pos function is '[('use', 'NNG'), ('character', 'XSN '), ('is', 'JX'), ('function', 'NNG'), ('enemy', 'XSN'), ('request', 'NNG'), ('spec', 'NNG' '), ('and', 'JC'), ('b', 'XPN'), ('feature', 'NNG'), ('enemies', 'XSN'), ('request', 'NNG '), ('thing', 'NNG'), ('to', 'JKO'), ('definition', 'NNG'), ('should', 'XSV+EF'), ('.', 'SF')]'.
    • Among the parts of speech tagged in the example above, 'NNG' is a common noun, 'XSN' is a noun-derived suffix, 'JX' is an auxiliary, 'JC' is a connective particle, 'XPN' is a prefix, 'JKO' is an objective particle, ' XSV + EF' is a verb derived suffix + final ending, and 'SF' means a period/question mark/exclamation mark.
  • Line 65: Select common nouns (NNG), proper nouns (NNP), and foreign words (SL), which are the most suitable parts of speech as standard word candidates from the part-of-speech tagging result. Foreign language (SL) was designated to extract abbreviations composed of alphabets as standard word candidates.
  • Line 70: Using the regular expression, '(NNP/|NNG/)+(XSN/)*|(XPN/)+(NNP/|NNG/)+(XSN/)*|(SL/) +' finds the pattern.
    • This pattern finds one of three things:
      • (NNP/|NNG/)+(XSN/)*: 1 or more (proper or common noun) + 0 or more noun-derived suffixes
      • (XPN/)+(NNP/|NNG/)+(XSN/)*: 1 or more prefixes + 1 or more (proper or common nouns) + 0 or more noun-derived suffixes
      • (SL/)+: At least one foreign language
  • Lines 71~84: Connect the words found with the above regular expression pattern and add them to the extracted word list by appending suffix '[compound word]'. Later, when working on standard word dictionary refinement, we intentionally add suffixes to identify words extracted as compound words.
  • Lines 86 to 110: Add additional attributes such as source and file format to the extracted words and store them in a DataFrame.
  • Line 122: Return df_result containing the extracted word list. 

For reference, if you want to additionally extract other parts of speech patterns, you can modify lines 65 and 70.

4.5. make_word_cloud function

def make_word_cloud(df_group, now_dt, out_path):
    """
    명사의 빈도를 구한 DataFrame으로 word cloud 그리기
    :param df_group: 명사 빈도 DataFrame
    :param now_dt: 현재 날짜 시각
    :param out_path: 출력경로
    :return: None
    """
    start_time = time.time()
    print('\r\nstart make_word_cloud...')
    from wordcloud import WordCloud
    import matplotlib.pyplot as plt
    # malgun.ttf # NanumSquare.ttf # NanumSquareR.ttf NanumMyeongjo.ttf # NanumBarunpenR.ttf # NanumBarunGothic.ttf
    wc = WordCloud(font_path='.\\font\\NanumBarunGothic.ttf',
                   background_color='white',
                   max_words=500,
                   width=1800,
                   height=1000
                   )

    # print(df_group.head(10))
    words = df_group.to_dict()['Freq']
    # print(words)
    # words = df_group.T.to_dict('list')
    wc.generate_from_frequencies(words)
    wc.to_file('%s\\wordcloud_%s.png' % (out_path, now_dt))
    # plt.axis('off')
    end_time = time.time()
    # elapsed_time = end_time - start_time
    elapsed_time = str(datetime.timedelta(seconds=end_time - start_time))
    print('make_word_cloud elapsed time: %s' % elapsed_time)

This function uses the WordCloud package.

WordCloud 예시
WordCloud Example
  • Lines 258-263: Create the WordCloud object.
    • The NanumBarunGothic.ttf (NanumBarunGothic) font file under the font folder was used. To change to another font, copy the font file to the font folder and designate the file name.
    • Background_color, max_words, width, and height can be changed to desired values.
  • Line 266: In DataFrame df_group, create dictionary words consisting of Index (word) as Key and 'Freq' (frequency) as Value.
  • Line 269: Create a WordCloud image from words with frequencies.
  • Line 270: Save the generated WordCloud image.

This is the end of the explanation of the source code. Next, we will take a look at the additional explanation of the source code and the part-of-speech tagging.


<< List of related articles >>

Leave a Reply

Your email address will not be published. Required fields are marked *

en_USEnglish