はじめに

Jupyterノートブックはデータ分析や科学研究で広く使われていますが、大規模なプロジェクトでは、多数のノートブックを効率的に実行・管理することが課題となります。この記事では、Jupyterノートブックを効率的に実行し、その結果を整理するためのPythonスクリプトexecute.pyとexecute_jupyter_notebook_by_papermill.pyを紹介します。Papermillライブラリを使用することで、Jupyterノートブックに引数を与えて実行することが可能となりました。また、複数のJupyterノートブックを並列実行する関数も実装しました。

(スクリプト本体は こちら)

なお、本記事の内容は、ChatGPT4との共同作業により作成されました。 下記の文章は、ChatGPT4により生成された原稿に筆者が若干の手直しを加えることで作成されました。

execute.pyの概要

このスクリプトは、特定のモデルと解析時間に基づいて複数のJupyterノートブックを実行する機能を提供します。パラメータの辞書を作成し、それをparallel_execute関数に渡して、指定されたノートブックを実行します。また、必要に応じてファイルの上書き確認も行います。

execute_jupyter_notebook_by_papermill.pyの概要

このスクリプトは、指定されたパラメータを使用してJupyterノートブックを実行するexecute_notebook関数と、これを並列に実行するparallel_execute関数を含んでいます。また、実行されたノートブックの情報をREADME.mdファイルに自動的に追加するappend_to_readme関数も提供します。

スクリプトの使用方法

execute.py: リポジトリのパスを設定し、実行したいノートブックの名前、モデル、解析時間を指定して実行します。

execute_jupyter_notebook_by_papermill.py: 並列実行の最大ワーカー数を設定し、各ノートブックに渡すパラメータの辞書を作成します。上書き確認機能も含まれています。

関数の詳細

以下に、記事で紹介する主要な関数の詳細を記載します。

1. run_notebooks関数(execute.py内)

目的: 複数のモデルと解析時間に基づいて、指定されたJupyterノートブックを実行します。この関数だけスクリプトが分かれているのは、Jupyterノートブックに与える引数が増えた場合に、run_notebooks関数を修正する必要があるからです。

機能: モデルと解析時間の組み合わせごとにパラメータ辞書を作成し、parallel_execute関数を呼び出してノートブックを実行します。

パラメータ:

notebook_name: 実行するノートブックの名前。

models: 実行するモデルのリスト。

analysis_times: 解析時間(オプション)。指定されていない場合、モデルのみで実行されます。

check_overwrite: 既存のファイルを上書きするかどうかの確認を行うかどうか(ブール値)。

2. execute_notebook関数(execute_jupyter_notebook_by_papermill.py内)

目的: 指定されたパラメータを使用して単一のJupyterノートブックを実行し、結果を指定した名前のファイルに保存します。これにより、実行するノートブックと結果を保存するノートブックの名前を独立して指定でき、より柔軟なファイル管理が可能になります。

機能: Papermillを使用してノートブックを実行し、実行後にappend_to_readme関数を呼び出してREADME.mdに実行情報を記録します。

パラメータ:

notebook_name: 実行するノートブックの名前。

input_dir: 入力ノートブックが格納されているディレクトリ。

output_dir: 出力ノートブックを保存するディレクトリ。

params: ノートブックに渡すパラメータの辞書。

3. parallel_execute関数(execute_jupyter_notebook_by_papermill.py内)

目的: 複数のノートブックを並列に実行します。

機能: 各ノートブックの実行タスクを非同期的に処理し、必要に応じてファイルの上書き確認を行います。

パラメータ:

notebook_name: 実行するノートブックの名前。

input_dir: 入力ノートブックが格納されているディレクトリ。

output_dir: 出力ノートブックを保存するディレクトリ。

params_dict: 各ノートブックに渡すパラメータの辞書。

max_workers: 並列実行の最大ワーカー数。

check_overwrite: 既存のファイルを上書きするかどうかの確認を行うかどうか(ブール値)。

4. append_to_readme関数(execute_jupyter_notebook_by_papermill.py内)

目的: 実行されたノートブックの情報をREADME.mdファイルに追記します。

機能: 実行されたノートブックの名前、パラメータ、出力パスなどの情報をフォーマットしてREADME.mdに追加します。

パラメータ:

notebook_name: 実行されたノートブックの名前。

params: ノートブックに渡されたパラメータ。

output_path: 出力されたノートブックのパス。

readme_path: READMEファイルのパス。

まとめ

これらの関数を使用することで、大規模なデータ分析プロジェクトにおいて、Jupyterノートブックの実行と結果の管理を効率的に行うことができます。特に、複数のノートブックを並列に実行することで、時間を節約し、プロジェクトの進捗を加速することが可能です。また、README.mdへの自動的な記録により、実行されたノートブックの概要を簡単に確認できます。

補足

スクリプトの使用にはPapermillライブラリが必要です。インストールについては、Papermill公式ドキュメントを参照してください。

スクリプト

import os
import datetime
import papermill as pm
from concurrent.futures import ThreadPoolExecutor

def execute_notebook(notebook_name, input_dir, output_name, output_dir, params):
    """
    Execute a Jupyter notebook with specified parameters.

    :param notebook_name: Name of the notebook to execute.
    :param input_dir: Directory containing the input notebook.
    :param output_name: Name for the output notebook.
    :param output_dir: Directory to save the output notebook.
    :param params: Dictionary of parameters to pass to the notebook.
    """
    input_notebook = f"{input_dir}/{notebook_name}.ipynb"
    output_notebook = f"{output_dir}/{output_name}.ipynb"

    pm.execute_notebook(
        input_notebook,
        output_notebook,
        parameters=params
    )

    if notebook_name:
            append_to_readme(output_name, params, 
                             output_path=f"./{output_name}.ipynb", 
                             readme_path=f"{output_dir}/README.md")


def parallel_execute(notebook_name, input_dir, output_dir, params_dict, max_workers=4, check_overwrite=True):
    """
    Execute notebooks in parallel based on the provided parameters dictionary.

    :param notebook_name: Name of the notebook to execute.
    :param input_dir: Directory containing the input notebooks.
    :param output_dir: Directory to save the output notebooks.
    :param params_dict: Dictionary of parameters for each execution.
    :param max_workers: Maximum number of parallel workers.
    """
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []

        for key, params in params_dict.items():
            output_name = f"{notebook_name}_{key}"
            output_path = f"{output_dir}/{output_name}.ipynb"
            
            if os.path.exists(output_path) and check_overwrite:
                response = input(f"{output_path} already exists. Overwrite? (y/n): ")
                if response.lower() != 'y':
                    print(f"Skipping {output_path}")
                    continue

            print(f"Executing {notebook_name}.ipynb for {key}")
            future = executor.submit(execute_notebook, notebook_name, input_dir, 
                                     output_name, output_dir, params)
            futures.append(future)

        for future in futures:
            future.result()

def append_to_readme(notebook_name, params, output_path, readme_path="README.md"):
    """
    Append executed notebook information to a README.md file.

    :param notebook_name: Name of the executed notebook.
    :param params: Parameters passed to the notebook.
    :param output_path: Path to the output notebook.
    :param readme_path: Path to the README file.
    """
    with open(readme_path, "a") as file:
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        file.write(f"## {timestamp} - {notebook_name}.ipynb \n")
        file.write(f"### Parameters\n")
        for key, value in params.items():
            file.write(f"- **{key}**: {value}\n")
        file.write(f"### Output Notebook\n")
        file.write(f"- [Link]({output_path})\n\n")
from execute_jupyter_notebook_by_papermill import parallel_execute

# Repository and directories setup
path_for_repo = "path_to_your_working_directory"
parent_dir = f"{path_for_repo}/notebooks"
exploratory_dir = f"{parent_dir}/exploratory"
report_dir = f"{parent_dir}/report"


# Function to run notebooks for given models and analysis times
def run_notebooks(notebook_name, models, analysis_times=None, check_overwrite=True):
    params_dict = {}
    if analysis_times:
        for model in models:
            for analysis_time in analysis_times:
                params_dict[f"{model}_{analysis_time}"] = {
                    "model": model,
                    "analysis_time": analysis_time
                }
    else:
        for model in models:
            params_dict[model] = {"model": model}

    parallel_execute(notebook_name, exploratory_dir, report_dir, params_dict, check_overwrite=check_overwrite)

# Models and Analysis Times
models = ["model1", "model2"]
analysis_times = ["annual", "seasonal"]

# Executing Notebooks
run_notebooks("notebook1", models, analysis_times, check_overwrite=False)
run_notebooks("notebook2", models, check_overwrite=True)
广告
将在 10 秒后关闭
bannerAds