Erlangと配列

(「ErlangとCRC32」から改題)

CRC32が必要になりそうだったので、Erlangでどうやって実現させるかを考えてみました。zlib:crc32があることが分かりましたが、CRC32と一口に言っても利用する多項式によって様々なバリエーションがありますので、多項式を外から与えることができないとまずいということがあるかもしれません。

CRCは前後のブロックに依存しないので、0〜255の計算結果をあらかじめテーブルに保存しておいて、バイト単位でそのテーブルを索くような処理をすることができます。もし自力で実装するとなるとそのようなテーブルを持つことになるわけですが、Erlangで書く場合、任意のインデックスにアクセスするためのデータ構造(要するに配列ですが)として何を使えばいいのでしょうか?

任意のインデックス(のようなもの)にアクセスができるものとして下記の5つを思いつきました。

  • tuple: element/2でアクセスできます。追加は erlang:append_element/2が使えます。
  • binary: binary:part/3 を使うとバイナリの一部分を持ってくることができます。パターンマッチングでも持ってくることができます。
  • dict: インデックスをキーにしてしまえば使えそうです。
  • ets: dictと同様に使えそうです。
  • プロセス辞書: これもハッシュがわりに使えるのでdict/etsと同様に使えそうです。

今回のように、あらかじめ作成しておいたN個(今回は256個)のテーブルの任意の値を取得するという場合にどれが性能的に有利なのかを調べてみたいと思いました。

(比較用のCソースはTNKソフトウェア - 私的ZIPファイル研究所を参考にさせていただきました)

実行した結果は下記のようになりました。(コードは後ろに載せています)

trial tuple binary dict process_dictionary ets zlib C(参考)
1 5760 8852 14497 5638 12351 427 184
2 5753 8820 14530 5679 12451 430 184
3 5844 8811 14083 5679 12179 430 185
4 5784 8814 14183 5637 12231 431 184
5 5813 8791 14517 5708 12243 433 186
6 5792 8837 14491 5655 12477 429 187
7 5785 8795 14075 5670 12207 430 184
8 5733 8847 14473 5632 12167 439 184
9 5819 8813 14565 5668 12474 433 184
10 5752 8827 14188 5692 12341 430 183

計測値は、各列の先頭に書かれているアルゴリズムのCRC32を10万回計算したときにかかった時間、単位はmsecです。対象データはすべて同じで、0〜FFが1回ずつ順番にあらわれる256バイトのデータとなっています。

試す前の予想としては、zlib→binary→dict→ets→tuple→process dictionary といった感じでした。結果はzlib→tuple≒process dictionary→binary→ets→dict でした。ぜんぜん違いすぎて笑えてきますね。tupleが速いというのもそうですが、プロセス辞書が予想外に速いことに驚きました。そしてetsとdictが予想と逆だったのも意外です。

zlibは当然ですが速いですね。これが使えれば言うことがないんですが....C版は自力で書いたtuple/プロセス辞書版の30倍ほど速いです。

別の環境でも試してみました。こちらもR14B03です。

loop tuple binary dict process_dictionary ets zlib
1 7799 12619 22666 7862 17627 482
2 7721 12495 22806 7846 17471 467
3 7534 12527 22807 7923 17549 530
4 7598 12557 22775 7986 18205 467
5 7736 12666 22978 7940 17596 482
6 7597 12823 23368 8033 17533 483
7 7690 12916 22948 7845 17597 484
8 7643 12433 23291 7972 17769 498
9 7784 12621 23088 7971 17659 483
10 7628 12776 23228 7941 17628 514

順番はMacのものと同じようです。それにしてもこの手の試行は苦手ですね... 本来計測したいものではないものを一生懸命計測しているのではないだろうか、特定の条件に限って著しく不利/有利なことを気付かずにしてたりしないだろうか、といった心配事がたくさんあります。

計測に使ったコードは下記の通りです( crc:bench_all(10,100000) を実行すると上記のような試行を実施します)。各構造ごとにテーブル作ったりCRCを実行したりしているのは、それぞれ make_table, do_crc という関数になります。

-module(crc).
-compile(export_all).

-define(POLY_IEEE802_3, 16#EDB88320).

times(0, _, Res) ->
    lists:reverse(Res);
times(N, F, Res) when N > 0 ->
    times(N - 1, F, [F(N) | Res]).

bench_all(Loop, Times) ->
    bench_tests(Loop, Times, [tuple, binary, dict, process_dictionary, ets, zlib]).

bench_tests(Loop, Times, Tests) ->
    Data = lists:seq(0,255),
    Bin = list_to_binary(Data),
    times(Loop,
	  fun(N) ->
		  lists:reverse(
		    lists:foldl(
		      fun(Type, List) ->
			      [ bench(Times, Type, Bin) div 1000 | List]
		      end, [(Loop - N + 1)], Tests))
	  end, [[loop|Tests]]).

bench(Times, zlib, Bin) when is_binary(Bin) ->
    Expected = crc(tuple, binary_to_list(Bin)),
    BBench = now(),
    do_bench(Times, zlib, void, Bin, Expected),
    EBench = now(),
    timer:now_diff(EBench, BBench);
bench(Times, Type, Bin) when is_binary(Bin) -> 
    bench(Times, Type, binary_to_list(Bin));
bench(Times, Type, List) when is_list(List) ->
    Expected = crc(tuple, List),
    Table = make_table(Type, ?POLY_IEEE802_3),
    BBench = now(),
    do_bench(Times, Type, Table, List, Expected),
    EBench = now(),
    timer:now_diff(EBench, BBench).

do_bench(0, _Type, _Table, _List, _Expected) -> ok;
do_bench(Times, Type, Table, List, Expected) ->
    Expected = do_crc(Type, List, Table),
    do_bench(Times - 1, Type, Table, List, Expected).

calc_table(Poly, I) ->
    lists:foldl(
      fun(_J, U) ->
	      case (U band 1) of
		  1 ->
		      (U bsr 1) bxor Poly;
		  0 ->
		      U bsr 1
	      end
      end, I, lists:seq(0,7)).
calc_index(Res, B) ->
    (B bxor (Res band 16#ff)).
calc_crc1(Res, TableValue) ->
    (Res bsr 8) bxor TableValue.

crc(zlib, Bin) when is_binary(Bin) ->
    do_crc(zlib, Bin, void);
crc(Type, Bin) ->
    crc(Type, Bin, ?POLY_IEEE802_3).

crc(Type, Bin, Poly) when is_binary(Bin) ->
    List = binary_to_list(Bin),
    crc(Type, List, Poly);    
crc(Type, List, Poly) when is_list(List) ->
    Table = make_table(Type, Poly),
    do_crc(Type, List, Table).


make_table(tuple, Poly) ->
    lists:foldl(
      fun(I, Tuple) ->
	      erlang:append_element(Tuple, calc_table(Poly, I))
      end, {}, lists:seq(0, 255));
make_table(binary, Poly) ->
    lists:foldl(
      fun(I, Binary) ->
	      <<Binary/binary, (calc_table(Poly, I)):32>>
      end, <<>>, lists:seq(0, 255));
make_table(binary2, Poly) ->
    make_table(binary, Poly);
make_table(dict, Poly) ->
    lists:foldl(
      fun(I, Dict) ->
	      dict:store(I, calc_table(Poly, I), Dict)
      end, dict:new(), lists:seq(0, 255));
make_table(process_dictionary, Poly) ->
    lists:foldl(
      fun(I, _) ->
	      put(I, calc_table(Poly, I))
      end, void, lists:seq(0, 255));
make_table(ets, Poly) ->
    Table = ets:new(crc, [set]),
    lists:foreach(
      fun(I) ->
	      ets:insert(Table, {I, calc_table(Poly, I)})
      end, lists:seq(0, 255)),
    Table;
make_table(zlib, _Poly) ->
    void.

do_crc(tuple, List, Table) ->
    CRC = lists:foldl(
	    fun(B, Res) ->
		    Index = calc_index(Res, B),
		    calc_crc1(Res, element(Index + 1, Table))
	    end, 16#ffffffff, List),
    (bnot CRC) band 16#ffffffff;
do_crc(binary, List, Table) ->
    CRC = lists:foldl(
	    fun(B, Res) ->
		    Index = calc_index(Res, B),
		    <<TableValue:32>> = binary:part(Table, Index * 4, 4),
		    calc_crc1(Res, TableValue)
	    end, 16#ffffffff, List),
    (bnot CRC) band 16#ffffffff;
do_crc(binary2, List, Table) ->
    CRC = lists:foldl(
	    fun(B, Res) ->
		    Index = calc_index(Res, B) * 4,
		    <<_:Index/binary, TableValue:32, _/binary>> = Table,
		    calc_crc1(Res, TableValue)
	    end, 16#ffffffff, List),
    (bnot CRC) band 16#ffffffff;
do_crc(dict, List, Table) ->
    CRC = lists:foldl(
	    fun(B, Res) ->
		    {ok, TableValue} = dict:find(calc_index(Res, B), Table),
		    calc_crc1(Res,  TableValue)
	    end, 16#ffffffff, List),
    (bnot CRC) band 16#ffffffff;
do_crc(process_dictionary, List, _Table) ->
    CRC = lists:foldl(
	    fun(B, Res) ->
		    TableValue = get(calc_index(Res, B)),
		    calc_crc1(Res, TableValue)
	    end, 16#ffffffff, List),
    (bnot CRC) band 16#ffffffff;
do_crc(ets, List, Table) ->
    CRC = lists:foldl(
	    fun(B, Res) ->
		    Index = calc_index(Res, B),
		    [{Index, TableValue}] = ets:lookup(Table, Index),
		    calc_crc1(Res, TableValue)
	    end, 16#ffffffff, List),
    (bnot CRC) band 16#ffffffff;
do_crc(zlib, Bin, _Table) ->
    Z = zlib:open(),
    Res = zlib:crc32(Z, Bin),
    zlib:close(Z),
    Res.

バイナリよりもリストの方が都合がよかったのでリストで書いていたのですが、zlib:crcはバイナリを与えなければいけないので不自然なコードになってしまいました。また、do_crc / make_table のあたりはもうちょっと書きようがありそうにも思います。当初はテーブル作成にかかる時間も計測していたのですが、どれも1msec未満で終わっていたのでとるのをやめました。

この結果を見てみると、配列のように使いたければtupleを使うのが無難ということでしょうか。結局のところこれだけではなんとも言いがたいところがありますが、少なくとも微々たる差というわけではありませんので、そうなっている理由を調べる手がかりにはなるかもしれません。

つづく.... (といいのですが... Efficiency Guideも読み直しておきます。トップページの存在にさっき気付きました)

(余談1) CRC32の計算の最後で bnotしている場所がありますが、ここでマスクを忘れてしまって合わないなあと5分くらい悩んでしまいました。

(余談2) binary2 は binary:part/3 ではなくパターンマッチングで取得するようにしたバージョンです。手元の結果では処理時間が80%ほどになりました。順位には影響がないので上記の表では binary2 は利用していません。 crc:bench_tests(10, 10000, [binary, binary2]). などとすると、これらのアルゴリズムだけを計測することができます。