2019-05-10-公開, 2019-09-26-公開
前提
<2019-09-26追記>
forループはできるだけ避けること。groupbyを使って同様の実装を行うだけで想像以上に高速になる。
- pandasのgroupbyを使えば、2つ以上のDataFrameを結合した後の行数のカウントや集計も可能
- 処理速度は圧倒的に速く、並列化も不要になる
- forを使わない方法を頭と時間をかけて考えてもお釣りがくる
- どうしても思いつかない場合の救済手段として、以下の内容を参考にforループでの実装と高速化を行う
やりたいこと
- forループで書いている処理を高速化したい
- Pythonのpandasデータフレームの各行について、他のデータフレームを集約しつつ値を変更する処理
- こんなの ↓ を高速化したい
for i in range(df_user.shape[0]): # df_userの対象行のuserIDを取得 user_id = df_user["userID"].iloc[i] # df_userの対象行のuserIDと同じか否かのTrue, Falseを作成 user_in_log = df_log["userID"]==user_id # df_userの対象行のユーザーかつ対象行のtrigger以前に起こったeventの数を代入 df_user["event_cnt"].iloc[i] = df_log[(user_in_log) & (df_log["event_dt"]<df_user["trigger_dt"].iloc[i])].shape[0]
状況
- 使用データフレームは2つ
- df_user:userIDとトリガー日時
- df_log:userIDとイベント日時
- 各userIDのトリガー日時までに発生したイベント数をdf_userの各行に追加
- dfが2つあって、df_userの各行についてdf_logから複数行抽出して集約する。mapにするの大変
- forループ回してしまえ(雑魚並の感想)
- forループは良くない、分かってはいるんだよ…
- forループ回してしまえ(雑魚並の感想)
困りごと
- forループ遅い…
- 仕事で扱っているデータは行数が多くて最初は処理に300時間くらいかかる計算だった。つらい
- 高速化しなければやってられない
解決策
- 結論:処理を関数化してmultiprocessing.Pool()で実行
- 並列処理(リソースで殴るの)は強い
- マシンのコア数だけプロセスを並列化できるため、Google Cloud Platform(GCP)で16コアのインスタンスを立てて15プロセスの並列実行
- とても速い(小並感)
- 以下、デモスクリプトを紹介
まずはライブラリを読み込む
# ライブラリの読み込み import numpy as np import pandas as pd import random import pickle import multiprocessing from datetime import datetime from datetime import timedelta from time import time
デモデータを作成する
# デモデータ作成 # ランダムなIDと日時 # df_userは10,000行 # df_logは100,000行 random.seed(256) # ランダムID r1 = [random.randint(1, 10000) for i in range(10000)] r1.sort() r2 = [random.randint(1, 10000) for i in range(100000)] r2.sort() # ランダム日時 now = datetime.now() datetime1 = [now-timedelta(random.randint(1, 1000)) for i in range(10000)] datetime2 = [now-timedelta(random.randint(1, 1000)) for i in range(100000)] # user df_user = pd.DataFrame({ "userID": r1, "trigger_dt": datetime1 }) # log df_log = pd.DataFrame({ "userID": r2, "event_dt": datetime2 })
multiprocessing実行
# 結果を代入する列を作成 df_user["event_cnt"] = np.nan # 前もってSeriesを抽出しておく s_user_userID = df_user["userID"] s_user_dt = df_user["trigger_dt"] s_log_userID = df_log["userID"] # multiprocessing用にindexをd分割したリストを作成 d = 15 x = int(np.linspace(0, df_user.shape[0], d)[1]) id_lists = [[i*x, (i+1)*x] for i in range(d-1)] id_lists.append([id_lists[-1][-1], df_user.shape[0]]) # メイン処理 # multiprocessing用に処理を関数化 def event_cnt(id_list): t1 =time() for i in range(id_list[0], id_list[1]): # df_userの対象行のuserIDを取得 user_id = s_user_userID.iloc[i] # df_userの対象行のuserIDと同じか否かのTrue, Falseを作成 user_in_log = s_log_userID==user_id # df_logから対象userの行だけ抽出しておく df_1 = df_log[user_in_log] # 対象userの行がdf_logに存在する場合のみ代入を行う if df_1.shape[0]>0: # df_userの対象行のユーザーかつ対象行のtrigger以前に起こったeventの数を代入 df_user.at[i, "event_cnt"] = len(df_1[df_1["event_dt"]<s_user_dt.iloc[i]]) # 進捗を表示 if i%1000 == 0: print("process_{}-{}: {}...time: {}".format(id_list[0], id_list[1], i, time()-t1)) # かかった時間を表示 print("process_{}-{}_finish: {}".format(id_list[0], id_list[1], time()-t1)) # 結果を保存 with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='wb') as f: pickle.dump(df_user, f) # 並列処理実行 with multiprocessing.Pool(d) as pool: pool.map(event_cnt, id_lists)
- 普通にforループを回す際の
for i in range(df1.shape[0]):
をfor i in (id_list[0], id_list[1]):
に変更 - df_userの行数をd分割して実行できるように関数化している
- ちなみに各プロセス内でdf_userの値を書き換えても、並列処理実行後もdf_userの値は元のまま
- プロセス内の結果は実行後に破棄された模様
- "仮想メモリ空間が異なる"という指摘あり
- 処理の最後で代入後のdf_userをpickleに書き出しておく
処理完了後に15個のpickleをロード・統合する
# multiprocessing用にindexをd分割したリストを作成 d = 15 x = int(np.linspace(0, df_user.shape[0], d)[1]) id_lists = [[i*x, (i+1)*x] for i in range(d-1)] id_lists.append([id_lists[-1][-1], df_user.shape[0]]) # 読み込み・統合 # 大元となる1つ目のpickleを読み込み id_list = id_lists[0] with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='rb') as f: df_user = pickle.load(f) # 2つ目以降のpickleを順次読み込み・大元となるdf_userに代入 for id_list in id_lists[1:]: with open('result_{}-{}.pickle'.format(id_list[0], id_list[1]), mode='rb') as f: df_user_tmp = pickle.load(f) # 代入 df_user["event_cnt"].iloc[id_list[0]:id_list[1]] = df_user_tmp["event_cnt"].iloc[id_list[0]:id_list[1]]
結果
- 結果は以下のjupyter notebookに記載
解決までの道のり
- ループする処理は独立して実行できる(i同士で依存関係はない)
- 並列処理すればよくない?
- 最初に並列処理しようとしたときには
joblib
を使用したがRuntimeError
が出た。謎
リファクタリングによる高速化
joblib
による並列処理を諦め、コードのリファクタリングで速くならないかなと試行錯誤。
- 以下の記事を参考にリファクタリング
- うわっ…私のpandas、遅すぎ…?って時にやるべきこと(先人の知恵より) - Lean Baseball
- pandas.DataFrame のforループをゆるふわ△改良して300倍高速化する - くないらぼ
- ループ内での列の抽出をループ外に出してちょっと速くなった
- map関数化できず、この程度の変更では焼け石に水
- df_log(行数が多い方のdf)の条件判定に無駄があるのではと考える
- 行の抽出時、
[(df_userのuserIDと一致するdf_logの行) & (df_userのtrigger日時より前のdf_logの行)]
という条件判定を行っていた - これでは2回もdf_logを総検索しているのでは
- 行数がやたら多いdf_logで総検索2回は無駄すぎる
- まず
df_userのuserIDと一致するdf_logの行
を作成し、これに対してdf_userのtrigger日時より前のdf_logの行
の抽出を行った - ちょっと速くなった
- 行の抽出時、
- df_userの値の変更に時間がかかっていると考える
- よくよく見ていると、df_logの集約結果をdf_userの新しい列に代入する処理が遅そう
- そもそも
df_userのuserIDと一致するdf_logの行
が存在しないことも多い df_userのuserIDと一致するdf_logの行
が存在しない場合は集約・代入処理をif文でスキップ- 結果、そこそこ速くなった
以上の修正で実際のコードでは300時間(推計) -> 100時間(推計)くらいまで減った。
GitHubに上げたデモスクリプトだと565[s]。
しかしまだ時間かかる。抜本的な改善にはやはり並列処理だということで、再度並列処理に挑戦。
multiprocessingによる高速化
joblib
ではなくmultiprocessing
を使用- めっちゃ速くなった
- 実際のコードでは100時間(推計) -> 3時間 *1
- GitHubに上げたデモスクリプトだと1.6[s] *2
- めっちゃ速い(2回目)
リファクタリングはなんだったんだ
ちなみにmultiprocessingのPoolはjupyter上でも実行可能だった
「if __name__ == '__main__':
内で実行すること」という指摘もあるが、jupyterで実行したところ、if __name__ == '__main__':
はなくても大丈夫だった
参考
- Python で並列処理(初めての人向け)
- Pythonの並列処理・並行処理のための標準モジュールの比較 - minus9d's diary
- Python: マルチプロセスで並列処理をさせるには multiprocessing.Pool が超便利 - 子育てしながらエンジニアしたい
- うわっ…私のpandas、遅すぎ…?って時にやるべきこと(先人の知恵より) - Lean Baseball
- pandas.DataFrame のforループをゆるふわ△改良して300倍高速化する - くないらぼ
感想
- 依存関係の無いループ処理ならば、クラウド上で強いインスタンスを立てればいくらでも速くなる(言い過ぎ)
- プロセス数に比例せず、なぜか想像以上に速くなる
- GCPで96コアのマシン使えばすさまじいことになりそう
- multiprocessingは強いマシンを必要に応じて使えるクラウド時代ならでは。富豪的解決策
- データ分析の前処理では本来ならば難しいこと考えなくても高速化が実現されるべき。マシンを強くして速くなるなら強いマシンを使えばいいのでは
- 高速化はとてもテンションが上がる楽しい作業だが、データ分析の前処理において高速化で仕事をした気になってはいけない
- 処理の高速化はデータ分析における本質的な作業ではない
- 誰もが本質的な課題に時間と労力を割けて生産性が高まるのが良い
※map化の方法など、ご指摘やコメント大歓迎です。

- 作者: Bill Lubanovic,斎藤康毅,長尾高弘
- 出版社/メーカー: オライリージャパン
- 発売日: 2015/12/01
- メディア: 単行本(ソフトカバー)
- この商品を含むブログ (3件) を見る