Erlangと配列
(「ErlangとCRC32」から改題)
CRC32が必要になりそうだったので、Erlangでどうやって実現させるかを考えてみました。zlib:crc32があることが分かりましたが、CRC32と一口に言っても利用する多項式によって様々なバリエーションがありますので、多項式を外から与えることができないとまずいということがあるかもしれません。
CRCは前後のブロックに依存しないので、0〜255の計算結果をあらかじめテーブルに保存しておいて、バイト単位でそのテーブルを索くような処理をすることができます。もし自力で実装するとなるとそのようなテーブルを持つことになるわけですが、Erlangで書く場合、任意のインデックスにアクセスするためのデータ構造(要するに配列ですが)として何を使えばいいのでしょうか?
任意のインデックス(のようなもの)にアクセスができるものとして下記の5つを思いつきました。
- tuple: element/2でアクセスできます。追加は erlang:append_element/2が使えます。
- binary: binary:part/3 を使うとバイナリの一部分を持ってくることができます。パターンマッチングでも持ってくることができます。
- dict: インデックスをキーにしてしまえば使えそうです。
- ets: dictと同様に使えそうです。
- プロセス辞書: これもハッシュがわりに使えるのでdict/etsと同様に使えそうです。
今回のように、あらかじめ作成しておいたN個(今回は256個)のテーブルの任意の値を取得するという場合にどれが性能的に有利なのかを調べてみたいと思いました。
(比較用のCソースはTNKソフトウェア - 私的ZIPファイル研究所を参考にさせていただきました)
実行した結果は下記のようになりました。(コードは後ろに載せています)
- Erlang R14B03 (erts-5.8.4) [source] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]
- Mac OS X Server 10.6.8
- Processor 2.53GHz Intel Core 2 Duo
- Memory 4GB 1067MHz DDR3
trial | tuple | binary | dict | process_dictionary | ets | zlib | C(参考) | |
1 | 5760 | 8852 | 14497 | 5638 | 12351 | 427 | 184 | |
2 | 5753 | 8820 | 14530 | 5679 | 12451 | 430 | 184 | |
3 | 5844 | 8811 | 14083 | 5679 | 12179 | 430 | 185 | |
4 | 5784 | 8814 | 14183 | 5637 | 12231 | 431 | 184 | |
5 | 5813 | 8791 | 14517 | 5708 | 12243 | 433 | 186 | |
6 | 5792 | 8837 | 14491 | 5655 | 12477 | 429 | 187 | |
7 | 5785 | 8795 | 14075 | 5670 | 12207 | 430 | 184 | |
8 | 5733 | 8847 | 14473 | 5632 | 12167 | 439 | 184 | |
9 | 5819 | 8813 | 14565 | 5668 | 12474 | 433 | 184 | |
10 | 5752 | 8827 | 14188 | 5692 | 12341 | 430 | 183 |
計測値は、各列の先頭に書かれているアルゴリズムのCRC32を10万回計算したときにかかった時間、単位はmsecです。対象データはすべて同じで、0〜FFが1回ずつ順番にあらわれる256バイトのデータとなっています。
試す前の予想としては、zlib→binary→dict→ets→tuple→process dictionary といった感じでした。結果はzlib→tuple≒process dictionary→binary→ets→dict でした。ぜんぜん違いすぎて笑えてきますね。tupleが速いというのもそうですが、プロセス辞書が予想外に速いことに驚きました。そしてetsとdictが予想と逆だったのも意外です。
zlibは当然ですが速いですね。これが使えれば言うことがないんですが....C版は自力で書いたtuple/プロセス辞書版の30倍ほど速いです。
別の環境でも試してみました。こちらもR14B03です。
- Windows 7
- Processor 2.13GHz Intel Core 2 Duo
- Memory 2GB
loop | tuple | binary | dict | process_dictionary | ets | zlib |
1 | 7799 | 12619 | 22666 | 7862 | 17627 | 482 |
2 | 7721 | 12495 | 22806 | 7846 | 17471 | 467 |
3 | 7534 | 12527 | 22807 | 7923 | 17549 | 530 |
4 | 7598 | 12557 | 22775 | 7986 | 18205 | 467 |
5 | 7736 | 12666 | 22978 | 7940 | 17596 | 482 |
6 | 7597 | 12823 | 23368 | 8033 | 17533 | 483 |
7 | 7690 | 12916 | 22948 | 7845 | 17597 | 484 |
8 | 7643 | 12433 | 23291 | 7972 | 17769 | 498 |
9 | 7784 | 12621 | 23088 | 7971 | 17659 | 483 |
10 | 7628 | 12776 | 23228 | 7941 | 17628 | 514 |
順番はMacのものと同じようです。それにしてもこの手の試行は苦手ですね... 本来計測したいものではないものを一生懸命計測しているのではないだろうか、特定の条件に限って著しく不利/有利なことを気付かずにしてたりしないだろうか、といった心配事がたくさんあります。
計測に使ったコードは下記の通りです( crc:bench_all(10,100000) を実行すると上記のような試行を実施します)。各構造ごとにテーブル作ったりCRCを実行したりしているのは、それぞれ make_table, do_crc という関数になります。
-module(crc). -compile(export_all). -define(POLY_IEEE802_3, 16#EDB88320). times(0, _, Res) -> lists:reverse(Res); times(N, F, Res) when N > 0 -> times(N - 1, F, [F(N) | Res]). bench_all(Loop, Times) -> bench_tests(Loop, Times, [tuple, binary, dict, process_dictionary, ets, zlib]). bench_tests(Loop, Times, Tests) -> Data = lists:seq(0,255), Bin = list_to_binary(Data), times(Loop, fun(N) -> lists:reverse( lists:foldl( fun(Type, List) -> [ bench(Times, Type, Bin) div 1000 | List] end, [(Loop - N + 1)], Tests)) end, [[loop|Tests]]). bench(Times, zlib, Bin) when is_binary(Bin) -> Expected = crc(tuple, binary_to_list(Bin)), BBench = now(), do_bench(Times, zlib, void, Bin, Expected), EBench = now(), timer:now_diff(EBench, BBench); bench(Times, Type, Bin) when is_binary(Bin) -> bench(Times, Type, binary_to_list(Bin)); bench(Times, Type, List) when is_list(List) -> Expected = crc(tuple, List), Table = make_table(Type, ?POLY_IEEE802_3), BBench = now(), do_bench(Times, Type, Table, List, Expected), EBench = now(), timer:now_diff(EBench, BBench). do_bench(0, _Type, _Table, _List, _Expected) -> ok; do_bench(Times, Type, Table, List, Expected) -> Expected = do_crc(Type, List, Table), do_bench(Times - 1, Type, Table, List, Expected). calc_table(Poly, I) -> lists:foldl( fun(_J, U) -> case (U band 1) of 1 -> (U bsr 1) bxor Poly; 0 -> U bsr 1 end end, I, lists:seq(0,7)). calc_index(Res, B) -> (B bxor (Res band 16#ff)). calc_crc1(Res, TableValue) -> (Res bsr 8) bxor TableValue. crc(zlib, Bin) when is_binary(Bin) -> do_crc(zlib, Bin, void); crc(Type, Bin) -> crc(Type, Bin, ?POLY_IEEE802_3). crc(Type, Bin, Poly) when is_binary(Bin) -> List = binary_to_list(Bin), crc(Type, List, Poly); crc(Type, List, Poly) when is_list(List) -> Table = make_table(Type, Poly), do_crc(Type, List, Table). make_table(tuple, Poly) -> lists:foldl( fun(I, Tuple) -> erlang:append_element(Tuple, calc_table(Poly, I)) end, {}, lists:seq(0, 255)); make_table(binary, Poly) -> lists:foldl( fun(I, Binary) -> <<Binary/binary, (calc_table(Poly, I)):32>> end, <<>>, lists:seq(0, 255)); make_table(binary2, Poly) -> make_table(binary, Poly); make_table(dict, Poly) -> lists:foldl( fun(I, Dict) -> dict:store(I, calc_table(Poly, I), Dict) end, dict:new(), lists:seq(0, 255)); make_table(process_dictionary, Poly) -> lists:foldl( fun(I, _) -> put(I, calc_table(Poly, I)) end, void, lists:seq(0, 255)); make_table(ets, Poly) -> Table = ets:new(crc, [set]), lists:foreach( fun(I) -> ets:insert(Table, {I, calc_table(Poly, I)}) end, lists:seq(0, 255)), Table; make_table(zlib, _Poly) -> void. do_crc(tuple, List, Table) -> CRC = lists:foldl( fun(B, Res) -> Index = calc_index(Res, B), calc_crc1(Res, element(Index + 1, Table)) end, 16#ffffffff, List), (bnot CRC) band 16#ffffffff; do_crc(binary, List, Table) -> CRC = lists:foldl( fun(B, Res) -> Index = calc_index(Res, B), <<TableValue:32>> = binary:part(Table, Index * 4, 4), calc_crc1(Res, TableValue) end, 16#ffffffff, List), (bnot CRC) band 16#ffffffff; do_crc(binary2, List, Table) -> CRC = lists:foldl( fun(B, Res) -> Index = calc_index(Res, B) * 4, <<_:Index/binary, TableValue:32, _/binary>> = Table, calc_crc1(Res, TableValue) end, 16#ffffffff, List), (bnot CRC) band 16#ffffffff; do_crc(dict, List, Table) -> CRC = lists:foldl( fun(B, Res) -> {ok, TableValue} = dict:find(calc_index(Res, B), Table), calc_crc1(Res, TableValue) end, 16#ffffffff, List), (bnot CRC) band 16#ffffffff; do_crc(process_dictionary, List, _Table) -> CRC = lists:foldl( fun(B, Res) -> TableValue = get(calc_index(Res, B)), calc_crc1(Res, TableValue) end, 16#ffffffff, List), (bnot CRC) band 16#ffffffff; do_crc(ets, List, Table) -> CRC = lists:foldl( fun(B, Res) -> Index = calc_index(Res, B), [{Index, TableValue}] = ets:lookup(Table, Index), calc_crc1(Res, TableValue) end, 16#ffffffff, List), (bnot CRC) band 16#ffffffff; do_crc(zlib, Bin, _Table) -> Z = zlib:open(), Res = zlib:crc32(Z, Bin), zlib:close(Z), Res.
バイナリよりもリストの方が都合がよかったのでリストで書いていたのですが、zlib:crcはバイナリを与えなければいけないので不自然なコードになってしまいました。また、do_crc / make_table のあたりはもうちょっと書きようがありそうにも思います。当初はテーブル作成にかかる時間も計測していたのですが、どれも1msec未満で終わっていたのでとるのをやめました。
この結果を見てみると、配列のように使いたければtupleを使うのが無難ということでしょうか。結局のところこれだけではなんとも言いがたいところがありますが、少なくとも微々たる差というわけではありませんので、そうなっている理由を調べる手がかりにはなるかもしれません。
つづく.... (といいのですが... Efficiency Guideも読み直しておきます。トップページの存在にさっき気付きました)
(余談1) CRC32の計算の最後で bnotしている場所がありますが、ここでマスクを忘れてしまって合わないなあと5分くらい悩んでしまいました。
(余談2) binary2 は binary:part/3 ではなくパターンマッチングで取得するようにしたバージョンです。手元の結果では処理時間が80%ほどになりました。順位には影響がないので上記の表では binary2 は利用していません。 crc:bench_tests(10, 10000, [binary, binary2]). などとすると、これらのアルゴリズムだけを計測することができます。