Pandasを並列動作させて高速化(Modin)

Modin

pythonでTableデータを扱おうと思うとPandasを利用することになると思いますが,Pandasでの操作を遅く感じることが多々あります. どうすれば簡単に早くなるか調べていたところ,Modinというモジュールに出会いました. 自分の備忘録としてModinについて記載します.

ModinはPandasを高速化するためのProjectで,もともとray project配下にあったようですが,今はray projectから独立して開発が続いています.pandasのimport分を変えるだけで高速化できるという謳い文句で宣伝していました.下記のように1行変更するだけで利用できます.

# import pandas as pd
import modin.pandas as pd

こうやってほとんど変更なしに利用できるのはありがたいですね. バックエンドはray, daskの2種類があり,環境変数でどちらをバックエンドにするか選択できます.

export MODIN_ENGINE=ray  # Modin will use Ray
export MODIN_ENGINE=dask  # Modin will use Dask

インストール

バックエンドをどれにするかによって以下の3つのパターンのインストール方法があります.

pip install 'modin[ray]' # Install Modin dependencies and Ray to run on Ray
pip install 'modin[dask]' # Install Modin dependencies and Dask to run on Dask
pip install 'modin[all]' # Install all of the above

詳細はgithubを参照ください.

性能

Modinのページを見ると,csvからDataFrameに変換する操作が4core利用の場合,4倍に高速になると記載があります. read_csv_benchmark.png

実験

軽く自分でも実験してみたところ数MBほどのデータだと,素のPandasの方がread_csvは早く,数GBのcsvだとModin(ray)の方が早かったです.(実行時間はデータの形状にも依存があると思います.) 以下の表はread_csvのpandas, modin(ray)の実行時間の比較です.csvは1.4GB,CPUは"Intel(R) Core(TM) i7-7700K"でread_csvの実行時間を20回測定した平均です.

pandas modin(ray)
read_csvの実行時間 [s] 8.39 3.09

だいぶ高速化できてますね.他の操作の実行時間もどうなっているのか気になるところです. 所感としては大規模のデータのときはModin利用して高速化していこうかなと思っています.

XGBoostでマルチGPUを利用する

XGBoostでもDeepLearning FWのようにマルチGPUを利用することができます。 分散のための下回りはdaskが担当し、daskを利用するためにxgboostのdask用wrapper(dask-xgboost)を利用します。 利用方法としてはPandas DataFrameをdask形式に変換して、dask-xgboostに渡してあげる形になっています。 Pandasをdask形式に変換するタイミングは以下の記述があるので、XGBoostに渡す直前がいいかもしれないです。 docs.dask.org

Note that, despite parallelism, Dask.dataframe may not always be faster than Pandas. We recommend that you stay with Pandas for as long as possible before switching to Dask.dataframe.

dask-cudaだとpandasの操作は早いかもしれないが未確認。

必要モジュールのインストール

pip install xgboost dask-xgboost pandas fsspec

dask-cudaが以下のエラーが発生してしまう関係でバージョン0.10以降を利用する必要があります。

ImportError: cannot import name 'TOTAL_MEMORY'                                                                                     

現時点(2019/11/30)で公開されている最新バージョンはpipだと0.6となっているので,dask-cudaから最新版を直接インストールして解決しました。

サンプルコード

dask-schedulerを走行

pythonコードを走らせる前に、まずはdaskのスケジューラを起動する必要があります。

(venv) ➜  sample ✗ dask-scheduler

distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-j9te68us
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://{IPADRESS}:8786

pytonサンプル

以下がpythonのサンプルです. ノード内マルチにのみ対応してるよう.

参照 (情報は少し古め..)

 from dask import dataframe as dd                                                                                                                                                                           
 from dask_cuda import LocalCUDACluster                                                                                                                                                                     
 from dask.distributed import Client, LocalCluster                                                                                                                                                          
 import dask_xgboost as dxgb                                                                                                                                                                                
 import pandas as pd                                                                                                                                                                                        
 import time                                                                                                                                                                                                
                                                                                                                                                                                                            
 if __name__ == '__main__':                                                                                                                                                                                 
     cluster = LocalCUDACluster()                                                                                                                                                                           
     client = Client(cluster)                                                                                                                                                                               
                                                                                                                                                                                                            
     df = pd.read_csv(f"data/train.csv")                                                                                                                                                                    
                                                                                                                                                                                                            
     # Pandasで処理を行う                                                                                                                                                                                   
     label_train = df['meter_reading']                                                                                                                                                                      
     df_train = df.drop(columns=['meter_reading', 'timestamp'])                                                                                                                                             
                                                                                                                                                                                                            
     # Pandasからdaskへの変換                                                                                                                                                                               
     ddf_train = dd.from_pandas(df_train, npartitions=10)                                                                                                                                                   
     dlabel_train = dd.from_pandas(label_train, npartitions=10)                                                                                                                                             
                                                                                                                                                                                                            
                                                                                                                                                                                                            
     # dask_xgboostのtrainを呼び出す(通常のxgboostを利用するのと同様に利用できそう)                                                                                                                         
     start = time.time()                                                                                                                                                                                    
     dxgb.train(client, {"boosting": "gbdt", 'objective': 'reg:squarederror',                                                                                                                               
                         'tree_method': 'gpu_hist',                                                                                                                                                         
                         'num_iterations': 1000, 'learning_rate':0.01},                                                                                                                                   
                data=ddf_train, labels=dlabel_train)                                                                                                                                                        
     end = time.time()                                                                                                                                                                                      
     print("elapsed train time:", round(end -start, 3), '[s]')                                                                                                                                              

実行時間

GPUと2GPUのtrainの実行時間を比較してみました.(上記スクリプトのstartからendまで) どちらも1080tiを利用しています.

1GPU 2GPU
実行時間[s] 1.577 6.063

結果は1GPUの方が早いという結果になってしまいました.純粋にマルチが遅いのか,もしかしたらGPUへのデータ転送時間も含まれてしまっているのかもしれません. cudf形式に変換したりして,事前にGPUにデータ転送完了させとけばよいとかなのかな? もうちょっと詳細に見ていく必要がありそう.

LightGBM: A Highly Efficient Gradient Boosting Decision Tree

論文を自分なりにまとめる。(未完、ほぼ日本語訳かも)

papers.nips.cc

 Abstruct

Gradient Boosting Decision Tree(GBDT)は機械学習で人気のアルゴリズムであり、 XGBoost, pGBRTなどの実装があるが、まだまだ演算効率やスケーラビリティは良くない。 分岐ポイントになる可能性がある箇所で特徴量ごとにすべてのデータをスキャンする必要があり、 その処理に時間がかかってしまうことが原因としてあげられる。そこでこの問題に取り組むために、 Gradient-based One-Side Sampling(GOSS)、Exclusive Feature Bundling(EFB)という2つの方法を提案する。 GOSSは小さいGradientsになるデータインスタンスを取り除いて、残りをInformation gainに利用する。 EFBは互いに独立している特徴を削減するアイディア。特徴を互いに独立していると証明するのはNP困難だが、 貪欲アルゴリズムを利用すると非常に良い予測結果となった。 たくさんの公開されているデータで実験したところ、精度を保ったまま従来のGBDTより20倍も高速となることがわかった。

1. Introduction

機械学習アルゴリズムとして広くGBDTは使われているが、大きなデータを扱うときの実行時間が非常に長いという問題がある。 データのInstanceを減らすと良いが、その方法は自明ではない。データのインスタンスとは特徴量を列としたときの行にあたる。扱うデータ種のようなものと自分は認識している。データをサンプリングする方法として、Weightsに従ってサンプリングする方法があるが、Weightsがわからないと適用できないため、シンプルにGBDTに適用することはできない。 そこで2つの方法を提案する。

Gradient-based One-Side Sampling(GOSS)

GBDTの厳密なWeightはわからないが、Gradientsが違うとInformation gainの計算過程が違うことに気づいた。 特に、大きなGradientsはよりInformation gainに対する寄与が大きい。なのでダウンサンプリングするときにInformation gainの精度を 保つために大きなGradientsをもつデータInstanceを残すようにすればよく、小さいGradientsの中からランダムに削除するようにする。 このようにすることでランダムにサンプリングするより、Information gainの精度を保つことに成功した。

Exclusive Feature Bundling(EFB)

実データを扱うと特徴量が非常に多いことがあり、特徴空間は非常にSparseである。これは特徴量を削除できる可能性を示唆している。 Sparceな特徴空間だと多くの特徴は独立していることが多い。この場合、グラフのカラーリング問題として扱う事ができ、貪欲法で解決することができる。

私達はこの2つのアルゴリズムをLightGBMに実装し、多くの公開されているデータで、従来手法より精度を保ったまま20倍以上高速になることを示した。

以降の章はちょくちょく継ぎ足していくつもり。。

2. Preliminaries

2.1 GBDT and Its Complexity Analysis

Efficient Neural Architecture Search via Parameter Sharing (ENAS)

はじめに

最近DeepLearningのモデル構築もDeepLearningで行う手法(AutoML)が流行っています。 AutoMLの一つであるNAS(Neural Architechture Search)は、2016年にGoogleから発表されました。強化学習を利用してアーキテクチャ、ハイパラメータの最適化をDeeepLearningでできてしまうとして注目されてましたが、実行時間がネックとなっていました。2018年にNASより計算量を削減したあるENAS(Effective NAS)が発表され(こちらも強化学習を利用),1GPUでも演算可能な計算量とNASと変わらない精度を保っているということで注目されました.今回はこのENAS論文を読んでみます。(会社で論文紹介するのでメモ用です) AutoMLについて知らなかったので,調べてみると他にも複数の探索アルゴリズムがあるようです。

  • MNAS
    モバイル向けの最適モデル探索アルゴリズム。実行時間も目的関数に取り込んで精度、実行時間を考慮したモデル探索ができる。 Googleが公式モデルも公開していて,Google Cloudからも利用できます.
  • Darts
  • FBNet

NASとENASの違い

ENASとNASの大きな違いは、実行時間です。NASと比較してGPUを利用した学習を1000倍以上高速にすることが可能になりました. 高速化の大きな要因として探索するDNNモデル間の重みデータの共有にあります。探索対象の新規モデルを毎回0から訓練するのではなく,探索済みのネットワークの重みデータを利用することで高速化しています。精度もNASと同等です。 ENASの構造的な特徴は有向非循環グラフ(DAG)としてNASの探索空間を表現したことです。各Nodeは局所的な演算(convolutionなどの演算)を表し、エッジは重みデータを表します。下図はすべての探索空間を表している例で,赤いエッジはコントローラによって選ばれた探索空間になります.

f:id:kuroko1t:20190901105309p:plain
The graph represents the entire search space while thered arrows define a model in the search space, which is decided by a controller. Here, node 1 is the input to the model whereas nodes 3 and 6 are the model’s outputs.

RNNモデルの探索の例

ENASのコントローラ(RNN)の役割は2つ

  1. どのエッジをアクティブにするか
  2. DAGの各ノードで実行する演算の種類

下図の左図、DAGとアクティブエッジ(赤矢印)を表す。 下図の右図、DAGに対応するRNNを表します。

f:id:kuroko1t:20190901112448p:plain

単純なRNN探索のENASのメカニズム。

  1. ノード1で利用するActivation関数(例:tanh)を選択
    f:id:kuroko1t:20190901125846p:plain:w300
  2. ノード2では前の接続Node(ノード1)とActivation関数(例:relu)を選択
    f:id:kuroko1t:20190901125850p:plain:w200
  3. ノード3ではノード2とActivation関数(例:tanh)を選択
    f:id:kuroko1t:20190901125902p:plain:w200
  4. ノード4ではノード1とActivation関数(例:tanh)を選択
    f:id:kuroko1t:20190901125905p:plain:w200
  5. 出力ノードを決定する。出力ノードは他のノードへの入力になっていないノードとします。この例ではノード3,4です。リカレントセルは平均である(h3+h4) / 2 を出力として使用する。

各エッジはノード間の情報(重み)を表しています。ENASではこのエッジ情報を探索中のすべての繰り返しセルで共有します。なので、繰り返しで選ぶActivation関数が変わったとしても選んだノードペアが同一なら重みも共有されるということだと。

探索空間は4つのActivation関数(tanh, ReLU, identity, sigmoid)とする場合、4N x Nです。この論文ではN = 12で、約1015の空間です。

ENASのトレーニン

コントローラのネットワークは100個のhidden-unitを持つLSTMで、2つの学習パラメータθ(探索ポリシーの変数?)と子モデルの共有パラメータwがあります。トレーニングはまず、トレーニング全体の学習で子モデルのwをトレーニングします。そして一定のStep数ごとにθをトレーニングします。この論文では2000stepごとにθをトレーニングしている。

子モデルのwのトレーニン

コントローラのポリシーを固定して、SGDを利用してloss関数(下図左)を最小化します。勾配はモンテカルロ推定を利用します。

f:id:kuroko1t:20190901214531p:plain:w350

M=1でうまく機能します。つまりπ(m;θ)からサンプリングした任意の1つのモデルmの勾配を利用してwを更新できます。

θのトレーニン

wとθの報酬の最大化を目的とし強化学習を利用します。報酬は検証用データで計算される。

CNNモデルの探索の例

コントローラーはRNNの時と同様にノードにおけるローカル演算と一つ前に接続するノードを決定する操作をします.1つ前の接続によってはスキップ構造(残差接続)を表現できます. ノードは6つのオペレーションから選択します.

  • convolutions with filter sizes 3x3
  • convolutions with filter sizes 5x5
  • depthwise-separable convolutions 3x3
  • depthwise-separable convolutions 5x5
  • max pooling
  • average pooling

L層のネットワークでは,6L x 2 ^ L(L-1)/2の探索空間で,L=12では1.6 x 1029の探索空間になります.

カニズム

f:id:kuroko1t:20190903000017p:plain:w280

  1. ノード1,2は入力ノードなのでなにもしない.
  2. ノード3は2つの入力ノード(ノード2,ノード2)を決定する.入力ノードはそれぞれidentityとseparable_conv_5x5と接続される.
  3. ノード3は2つの入力ノード(ノード3,ノード1)を決定する.入力ノードはそれぞれaverage poolingとsep_conv_3x3と接続される. 4. ノード4はどのノードの入力でもないので,出力として扱われる.

公開モデル

  • pytorchモデル
    2019/9/1現在、CNNには対応しておらずRNNに対応しています。

  • tensorflowモデル
    python2系のみ対応しています。

  • nni
    Microsoftが作成しているAutoML向けのツールでその中にENASもあります. 主要なFWはカバーしていたり,Web UIもあったりしてnniは多機能そうです.

GoでDeepLearning

GoでMnistを使ってDeep Learning

昔(2018年10月27日)Qiitaに書いた記事の転載です

現在、Deep Leanig向けのFWとしてはTensorflow、Pytorchなどが有名です. これらのFWのインターフェースはPython、内部実装はC++になっていて,ユーザーは使いやすくかつ高速にというのを実現しています (演算はGPUがメインですよね). 最近Go言語に興味を持ち、趣味でGoのアプリケーションの実装をしてみたい&Deep Learningの勉強をしたい,Goでの実装はなさそう??ということでGoでやってみました.

概要

Deep Learningの始めの一歩としてまずは、Mnist(手書き文字のデータ)だろう!ということで,Mnistで学習して、自分の書いた数字を推論できるようになることを目標に進めていきます. 現在はまだGo100%です(まだと書いた理由はGPU演算に関してに書きました) Screenshot from 2018-08-10 13-01-35.png

今回実装したもの

実装したプロジェクトです.

以下の順番で実装を行っていきました.

  • Mnistのデータを1次元配列として読み込む
  • 行列演算の実装
  • Goで学習できる枠組みを作る(Forward,Backward演算,重みParameterのUpdate)
  • 学習したデータをSave,Restoreできるような仕組み
  • 保存した学習済みデータを使って自分の書いた文字を推論

Mnistのデータを1次元配列として読み込む(GoMnist)

GoMnistをForkしてfloat64の1次元の配列として読み込めるようにしたり,正規化できるように改造しました. 改造量は多くないです.Deep Learningは演算精度はそんなにいらなく,工夫すれば16bitで学習できたりもします. なのでfloat64もいらなくてfloat32にしようかなと思ったんですが,Goのライブラリはfloat64前提にされているものが多く,対応がめんどくさかったのでfloat64でデータも読み込んでいます.

行列演算の実装(gmat)

ここは愚直にネットワークに必要な行列演算の実装を行っていきました.実装はGPUとCPUに分かれていて, build tagでGPU,CPU使い分けるようにしています.下記に書きましたがGPUの方はまだ不完全です. あとCPU演算のpprofを見たところ圧倒的にdot演算に時間がかかってたのでこの部分だけCPU数分だけ並列処理するようにしたところ,だいーぶ早くなりました.syncやchannelのいい勉強になりました.

GPU演算に関して

GPUでの演算にも取り組みましたが,GoでGPU演算を行うのが少し難しく現状はDot演算のみ実装してあります.CUDAのkernel関数を定義してそれをcgoを使ってリンクすれば,やりたい演算を実現できるのですがそうするとGoでDeep Learningを行うというのに反してしまうと思い,踏みとどまりました. でもそうするとCUDAのライブラリ(cuda runtime, cublas,cudnnなど)を使うしかなく柔軟な計算ができません.そこをどうやって解決するかはまだ答えがでてません.. 一応すべての関数をcuda kernel関数を定義することで実装しました。

Goで学習できる枠組みを作る(gdeep)

Interfaceに,各演算Layer(Dropout,Denseなど)のメソッド(Forward,Backwardなど)を登録します.そしてユーザーの定義した演算レイヤーに対応するForward,Backward演算が実行できるようにしました. ユーザーが定義する演算レイヤーはこんな感じ

layer := []gdeep.LayerInterface{}
gdeep.LayerAdd(&layer, &gdeep.Dense{}, []int{inputSize, hiddenSize})
gdeep.LayerAdd(&layer, &gdeep.Relu{})

そして各IterationごとにRunしてあげれば学習が進みます.

loss := gdeep.Run(layer, momentum, x, t)

gdeep.Runの中でForward,Backwardを行い,Backwardの演算結果を使って重みParameterのアップデートも行っています. これによって学習が進んでいきます.フルのサンプルはこちらをご参照ください. で実際に上記のサンプルを実行してMnistのTrainデータを使って学習し,Testデータを使ってどれくらいの精度で学習できているか評価を行いました.そうしたところAccuracy:92%となり,ちゃんと学習できていることがわかりました! これでとりあえず学習はできていると一安心です.

学習したデータをSave,Restoreできるような仕組み(gdeep)

gobというライブラリを使って学習したParameterの保存を行いました. こんな風に書けば保存できます.

gdeep.Saver(layer, "./sample.gob")

ライブラリのおかげでわりと簡単にSaveとRestoreは実現できました.決め打ちになってしまっている箇所があるので修正しないとなーと思ってます.

保存した学習済みデータを使って自分の書いた文字を推論

はい.上記保存した学習モデルを使って自分の書いた文字が推論できるか試していきます. 推論用に使うように自分が書いた数字です(笑)

これをMnistと同じ28x28のサイズに加工して,GrayScaleにします.サイズの加工は外部ライブラリを利用して,GrayScaleにする部分は実装しました. これでデータの準備はできたので,推論していきましょう. 書いたサンプルです(現在は決め打ちなんでこんなに短いけど,本当は演算レイヤーの定義を書かないといけなくなる..)

package main                                                                                         
                                                                                                     
import (                                                                                             
    "fmt"                                                                                            
    "github.com/kuroko1t/gdeep"                                                                      
)                                                                                                    
                                                                                                     
func main() {                                                                                        
    img := gdeep.ImageRead2GrayNorm("data/5.jpg")                                                    
    layer := []gdeep.LayerInterface{}                                                                
    gdeep.Restore("./sample.gob", &layer)                                                            
    predictnum := gdeep.Predict(layer, img)                                                          
    fmt.Println("predict:", predictnum)                                                              
}       

実行してみたところ,

predict: 5

ちゃんと,推論できました!!めでたしめでたし.

追記

mpiを利用したデータ並列処理を追加しました.コマンドはこんな感じ

mpirun  -np 2 -H host1:2,host2:1 go run example/mlpMnist_allreduce.go

Todo

  • Convolutionの実装
  • MLPGPU実装

まとめ

現在自分が実装したことに関して書いてみました.アドバイスやコメントなどありましたらよろしくお願いします.

leapmindのblue-oil メモ2

dlk graphのグラフレベルの最適化で行っていることをリストアップしてみた. 中途半端だけど、、、追記するかも

pass_remove_identities

identity node(何もしないnode)を削除する.

pass_transpose

すべてのnodeのshapeをNHWCに変更

pass_propagate_quantization_details_into_conv

Quantizer nodeの情報をconvに伝える.その情報をもとにconvを量子化できる.

pass_compute_thresholds

pass_pack_weights

convolutionを32bitにpackする. op_typeがQTZ_binary_mean_scaling, QTZ_linear_mid_tread_half, QTZ_binary_channel_wise_mean_scalingのとき適用.

pass_quantize_convolutions

packじゃない量子化

pass_propagate_output_type_backward

pass_propagate_datatypes

conv以外の量子化情報保持nodeのinputのdtypeをoutputのdtypeにする.

pass_constant_folding

leapmindのblue-oil メモ1(TFグラフ変換)

何者か知らなかったのでちょっとだけ覗いてみた。

github.com

blueoilにはlmnetとdlkというものがあった。

lmnet

lmnetはTensorflowによる学習や推論をサポートしている。 分散学習するかどうかもconfig_fileに設定できてhorovodを利用している。

dlk

Tensorflowのprotocol buffer or onnix形式を受け取り、独自グラフに変換している
pb->graph_def->dlk的な感じ.(以下TFの場合)
generate_project.pyを実行すると, pb->graph_defの変換をTensorFlowIOにて行う.そしてoptimize_graph_stepが実行されTFのgraph_defからdlkのグラフに変換される. そしてその後,dlkグラフの最適化処理を行っている.

  • グラフ変換(TF)

importerクラスでtf->dlkグラフに変換. Importerメンバー関数のcreate_new_nodeでTFとdlk用のopの変換を行っている. 変換はnode.op_typeを見て判断(1部opはDLK_OPERATOR_MAPで事前に変換(BiasAdd->Addとか))
dlk側のopはoperater.pyに記述