pandasデータフレームのforループ処理高速化ーPythonのmultiprocessingによる並列処理

2019-05-10-公開, 2019-09-26-公開

前提

<2019-09-26追記>

forループはできるだけ避けること。groupbyを使って同様の実装を行うだけで想像以上に高速になる。

  • pandasのgroupbyを使えば、2つ以上のDataFrameを結合した後の行数のカウントや集計も可能
  • 処理速度は圧倒的に速く、並列化も不要になる
  • forを使わない方法を頭と時間をかけて考えてもお釣りがくる
  • どうしても思いつかない場合の救済手段として、以下の内容を参考にforループでの実装と高速化を行う

やりたいこと

  • forループで書いている処理を高速化したい
    • Pythonのpandasデータフレームの各行について、他のデータフレームを集約しつつ値を変更する処理
    • こんなの ↓ を高速化したい
for i in range(df_user.shape[0]):
    # df_userの対象行のuserIDを取得
    user_id = df_user["userID"].iloc[i]
    # df_userの対象行のuserIDと同じか否かのTrue, Falseを作成
    user_in_log = df_log["userID"]==user_id
    # df_userの対象行のユーザーかつ対象行のtrigger以前に起こったeventの数を代入
    df_user["event_cnt"].iloc[i] = df_log[(user_in_log) & (df_log["event_dt"]<df_user["trigger_dt"].iloc[i])].shape[0]

状況

  • 使用データフレームは2つ
    • df_user:userIDとトリガー日時
    • df_log:userIDとイベント日時
  • 各userIDのトリガー日時までに発生したイベント数をdf_userの各行に追加
  • dfが2つあって、df_userの各行についてdf_logから複数行抽出して集約する。mapにするの大変
    • forループ回してしまえ(雑魚並の感想)
      • forループは良くない、分かってはいるんだよ…

困りごと

  • forループ遅い…
    • 仕事で扱っているデータは行数が多くて最初は処理に300時間くらいかかる計算だった。つらい
    • 高速化しなければやってられない

解決策

  • 結論:処理を関数化してmultiprocessing.Pool()で実行
    • 並列処理(リソースで殴るの)は強い
    • マシンのコア数だけプロセスを並列化できるため、Google Cloud Platform(GCP)で16コアのインスタンスを立てて15プロセスの並列実行
    • とても速い(小並感)
    • 以下、デモスクリプトを紹介

まずはライブラリを読み込む

# ライブラリの読み込み
import numpy as np
import pandas as pd
import random
import pickle
import multiprocessing
from datetime import datetime
from datetime import timedelta
from time import time

デモデータを作成する

# デモデータ作成
# ランダムなIDと日時
# df_userは10,000行
# df_logは100,000行

random.seed(256)

# ランダムID
r1 = [random.randint(1, 10000) for i in range(10000)]
r1.sort()
r2 = [random.randint(1, 10000) for i in range(100000)]
r2.sort()

# ランダム日時
now = datetime.now()
datetime1 = [now-timedelta(random.randint(1, 1000)) for i in range(10000)]
datetime2 = [now-timedelta(random.randint(1, 1000)) for i in range(100000)]

# user
df_user = pd.DataFrame({
    "userID": r1,
    "trigger_dt": datetime1
})

# log
df_log = pd.DataFrame({
    "userID": r2,
    "event_dt": datetime2
})

multiprocessing実行

# 結果を代入する列を作成
df_user["event_cnt"] = np.nan

# 前もってSeriesを抽出しておく
s_user_userID = df_user["userID"]
s_user_dt = df_user["trigger_dt"]
s_log_userID = df_log["userID"]

# multiprocessing用にindexをd分割したリストを作成
d = 15
x = int(np.linspace(0, df_user.shape[0], d)[1])
id_lists = [[i*x, (i+1)*x] for i in range(d-1)]
id_lists.append([id_lists[-1][-1], df_user.shape[0]])

# メイン処理
# multiprocessing用に処理を関数化
def event_cnt(id_list):
    t1 =time()
    for i in range(id_list[0], id_list[1]):
        # df_userの対象行のuserIDを取得
        user_id = s_user_userID.iloc[i]
        # df_userの対象行のuserIDと同じか否かのTrue, Falseを作成
        user_in_log = s_log_userID==user_id
        # df_logから対象userの行だけ抽出しておく
        df_1 = df_log[user_in_log]
        # 対象userの行がdf_logに存在する場合のみ代入を行う
        if df_1.shape[0]>0:
            # df_userの対象行のユーザーかつ対象行のtrigger以前に起こったeventの数を代入
            df_user.at[i, "event_cnt"] = len(df_1[df_1["event_dt"]<s_user_dt.iloc[i]])
            
        # 進捗を表示
        if i%1000 == 0:
            print("process_{}-{}: {}...time: {}".format(id_list[0], id_list[1], i, time()-t1))

    # かかった時間を表示
    print("process_{}-{}_finish: {}".format(id_list[0], id_list[1], time()-t1))

    # 結果を保存
    with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='wb') as f:
        pickle.dump(df_user, f)

# 並列処理実行
with multiprocessing.Pool(d) as pool:
    pool.map(event_cnt, id_lists)
  • 普通にforループを回す際のfor i in range(df1.shape[0]):for i in (id_list[0], id_list[1]):に変更
  • df_userの行数をd分割して実行できるように関数化している
  • ちなみに各プロセス内でdf_userの値を書き換えても、並列処理実行後もdf_userの値は元のまま
  • 処理の最後で代入後のdf_userをpickleに書き出しておく

処理完了後に15個のpickleをロード・統合する

# multiprocessing用にindexをd分割したリストを作成
d = 15
x = int(np.linspace(0, df_user.shape[0], d)[1])
id_lists = [[i*x, (i+1)*x] for i in range(d-1)]
id_lists.append([id_lists[-1][-1], df_user.shape[0]])

# 読み込み・統合
# 大元となる1つ目のpickleを読み込み
id_list = id_lists[0]
with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='rb') as f:
    df_user = pickle.load(f)

# 2つ目以降のpickleを順次読み込み・大元となるdf_userに代入
for id_list in id_lists[1:]:
    with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='rb') as f:
        df_user_tmp = pickle.load(f)
        
    # 代入
    df_user["event_cnt"].iloc[id_list[0]:id_list[1]] = df_user_tmp["event_cnt"].iloc[id_list[0]:id_list[1]]

結果

  • 結果は以下のjupyter notebookに記載

github.com

解決までの道のり

  1. ループする処理は独立して実行できる(i同士で依存関係はない)
    • 並列処理すればよくない?
    • 最初に並列処理しようとしたときにはjoblibを使用したがRuntimeErrorが出た。謎

リファクタリングによる高速化

joblibによる並列処理を諦め、コードのリファクタリングで速くならないかなと試行錯誤。

  1. 以下の記事を参考にリファクタリング
  2. df_log(行数が多い方のdf)の条件判定に無駄があるのではと考える
    • 行の抽出時、[(df_userのuserIDと一致するdf_logの行) & (df_userのtrigger日時より前のdf_logの行)]という条件判定を行っていた
    • これでは2回もdf_logを総検索しているのでは
    • 行数がやたら多いdf_logで総検索2回は無駄すぎる
    • まず df_userのuserIDと一致するdf_logの行 を作成し、これに対して df_userのtrigger日時より前のdf_logの行 の抽出を行った
    • ちょっと速くなった
  3. df_userの値の変更に時間がかかっていると考える
    • よくよく見ていると、df_logの集約結果をdf_userの新しい列に代入する処理が遅そう
    • そもそも df_userのuserIDと一致するdf_logの行 が存在しないことも多い
    • df_userのuserIDと一致するdf_logの行 が存在しない場合は集約・代入処理をif文でスキップ
    • 結果、そこそこ速くなった

以上の修正で実際のコードでは300時間(推計) -> 100時間(推計)くらいまで減った。

GitHubに上げたデモスクリプトだと565[s]

しかしまだ時間かかる。抜本的な改善にはやはり並列処理だということで、再度並列処理に挑戦。

multiprocessingによる高速化

  1. joblibではなくmultiprocessingを使用
    • めっちゃ速くなった
    • 実際のコードでは100時間(推計) -> 3時間 *1
    • GitHubに上げたデモスクリプトだと1.6[s] *2
    • めっちゃ速い(2回目)
    • リファクタリングはなんだったんだ

ちなみにmultiprocessingのPoolはjupyter上でも実行可能だった

if __name__ == '__main__':内で実行すること」という指摘もあるが、jupyterで実行したところ、if __name__ == '__main__':はなくても大丈夫だった

参考

感想

  • 依存関係の無いループ処理ならば、クラウド上で強いインスタンスを立てればいくらでも速くなる(言い過ぎ)
    • プロセス数に比例せず、なぜか想像以上に速くなる
    • GCPで96コアのマシン使えばすさまじいことになりそう
  • multiprocessingは強いマシンを必要に応じて使えるクラウド時代ならでは。富豪的解決策
    • データ分析の前処理では本来ならば難しいこと考えなくても高速化が実現されるべき。マシンを強くして速くなるなら強いマシンを使えばいいのでは
    • 高速化はとてもテンションが上がる楽しい作業だが、データ分析の前処理において高速化で仕事をした気になってはいけない
      • 処理の高速化はデータ分析における本質的な作業ではない
      • 誰もが本質的な課題に時間と労力を割けて生産性が高まるのが良い

※map化の方法など、ご指摘やコメント大歓迎です。

入門 Python 3

入門 Python 3

並列処理についても書かれているが、分かったような分からないような…精進せねば…

関連記事

www.kokokocococo555.com

*1:15プロセス作成しただけなのに計算おかしくない?

*2:15プロセスで350倍高速化…謎