非同期処理いろいろ

2008.7.24

去年頃から、シングルスレッドかつイベントドリブンなFlash環境で散らかりがちな非同期処理を、すっきりと書けるようにするための試みが多く見受けられるようになりました。それらの特徴をざっと調べてみたメモになります。先駆者達のやり方を広く知り、あわよくば何か洞察を得たいからであって、各ライブラリの優劣を独断と偏見で決定するような主旨ではありません。それからプログラム勉強する前にまず日本語の勉強しろっていうくらい思いやりのない説明が延々続きますが、万が一誰かの参考になればと思い載せておくことにします。

各ライブラリの特徴をつかむため、以下のポイントについて共通で見ていくことにします。

  1. 合成
    複数の処理を直列化・並列化した複合物を区別なく扱えるか?処理の入れ子はどのように実現されているか?
  2. 並列実行
    複数の処理の同時実行と、完了の待ち合わせが可能か?
  3. 例外処理
    エラーハンドラは追加できるか?エラーからの回復は可能か?
  4. 続行
    エラー発生時に、現在の処理をスキップして、残りの処理を続行することが可能か?
  5. 中止
    処理の中止は可能か?
  6. 値の受け渡し
    処理の結果として得られた値を、後続の処理に受け渡す仕組みはあるか?
  7. 冗長さ
    処理を追加する手順はどうか?クラスを新規に作るのか?クロージャを渡すのか?
  8. 動的な処理の追加
    処理を追加できる有効なタイミングは?実行後にも処理を追加でき、実行キューのように振る舞うか?
  9. 開始トリガ
    処理の構築と実行が切り離されていて明示的に開始するのか?それとも構築と同時に暗黙で開始されるのか?
  10. 再実行
    同じ非同期処理を、2回以上実行可能か?
  11. 進捗通知
    ロードなどの進捗を通知する枠組みがあるか?
  12. イベントへの適合
    完了の通知は、AS3のイベントモデルに準拠して行われるか?
    逆に、イベントを各々の非同期処理モデルに適合させる方法は提供されているか?

flashrod Job

var job:Job = new Sequence(
	new Trace("begin"),
	new Concurrent(new URLGet("http://www.google.com/"),
	               new URLGet("http://www.yahoo.com/"),
	               new URLGet("http://www.apple.com/"),
	               new URLGet("http://www.mozilla.org/")),
	new Sequence(new URLGet("http://www.cnn.com/"),
	             new URLGet("http://del.icio.us/")),
	new Trace("wait for 2 sec."),
	new Wait(2000),
	new Concurrent(new URLGet("http://tumblr.com/"),
	               new URLGet("http://flickr.com/")),
	new Trace("end"));
job.start();

非同期処理の同期化を助ける非常にシンプルな方法です。引数なしのstartメソッドで実行が開始され、完了時にfinishedを送出するものをJobとし、全てのクラスはJobを実装します。たとえ同期処理でも、完了時にfinishedを送出することが求められます。

  1. 合成可。SequenceやConcurrentはJobインターフェースを実装している。
  2. 並列実行可。Concurrentクラスを使う。
  3. エラーについては扱っていない。
  4. 続行可。たとえエラーが発生しても、finishedイベントを送出するようJobを実装すればよい。
  5. 中止は扱っていない。
  6. 値の受け渡しは不可。Job側で結果を保持しておく、などの必要があり。
  7. traceすらもJobの実装クラスになる。クロージャ渡せるようにしたい人は各自でといった感じ。
  8. 非同期処理の構築はコンストラクタでのみ行われ、その後の変更は不可。
  9. Job#start()で明示的な実行を行う。
  10. WaitやURLGetを除いた複合非同期処理は再実行不可。一度完了した頃には配列が空になるため。
  11. 進捗は通知されない。
  12. 完了通知はAS3のイベントモデルに準拠している。イベントからJobを作る汎用メソッドは特にない。

fladdict Command

var ar:Array = [];
ar.push( new Command(null, trace, ["start serial command"]) );
ar.push( new ASyncCommand(myLoader, myLoader.load, [new URLRequest("URL")],
	myLoader.loaderContent, Event.COMPLETE));
ar.push( new TimerCommand(1000));
ar.push( new XMLLoaderCommand({url:"hogehoge", dataScope:this, dataProp:"dataResult"}))

var command:SerialCommand = new SerialCommand( ar );
command.execute();

さきほどのものに近いアプローチだと思いますが、いくつか実用的な機能が備わっています。引数なしのexecuteメソッドで実行が開始され、完了時にcompleteを送出するものをICommandとし、全てのクラスはICommandを実装します。たとえ同期処理でも、完了時にcompleteを送出することが求められますが、単に同期処理を追加したいだけであればわざわざクラスを作らずとも、Commandというクラスのコンストラクタにスコープと関数と引数を指定してやるだけで済みます。一方の非同期処理はAsyncCommandでラップでき、Commandと同様のコンストラクタ引数に加えて、イベントディスパッチャと完了を通知するイベント名を指定して作成します。

  1. 合成可。SerialCommandやParallelCommandはICommandインターフェースを実装している。
  2. 並列実行可。ParallelCommandクラスを使う。
  3. エラーについては扱っていない。
  4. 続行可。たとえエラーが発生しても、completeイベントを送出するようなCommandを実装すればよい。AsyncCommandを利用する場合、ディスパッチャのエラーイベントについて関知しないことに注意。
  5. ICommand.asを見るに、中止は予定しているように見えるが、未実装と思われる。
  6. 値の受け渡しは不可。Command側で結果を保持しておく、などの必要あり。
  7. 複合非同期処理の基底クラスであるBatchCommandには各種ショートカットメソッドがあり、後続の処理の追加を簡単にしている。
  8. 非同期処理の追加を実行中にもできるが、実行後に追加した処理は開始されないと思われる。
  9. ICommand#execute()で明示的な実行を行う。
  10. 複合非同期処理の進捗状況を保存する_indexというカウンタがリセットされないため、再実行は想定していない操作と思われる。
  11. 進捗は通知されない。
  12. 完了通知はAS3のイベントモデルに準拠している。メソッドとその完了を告げるイベントをワンセットとして非同期処理を表現するAsyncCommandが提供されている。

Mochikit Deferred
JSになってしまいますが、ASへの移植も容易なため紹介します。MochikitのDeferredは以下が大変参考になります。
http://d.hatena.ne.jp/brazil/20080721/1216580402

本家TwistedのDeferredは、スレッドを意識せずプログラムするために生まれたのだと聞きますが、スレッドのないFlashにおいては単にaddEventListenerがアプリケーションコードに直接現れない形で、非同期処理を繋げていく仕組みとして理解することができます。JobCommandに見られる、処理をオブジェクトとしてラップするアプローチに対し、Deferredは関数を直にコールバックチェーンに追加します。そして、callbackの呼び出しにより完了が通知されると、コールバックチェーンの要素を順番に呼び出していきます。呼び出しの結果、戻り値がDeferredであれば非同期処理とみなして残りの実行を後回しに、それ以外であれば同期処理と見なして次のコールバックを呼び出します。

ハンドラの追加は、実行が終わった後になってからも可能で、以下のような微妙な違いがあります。

//Mochikit Deferred
var d = new Deferred();
d.callback();
d.addCallback(function() {alert("hoge");}); //呼ばれる

//JSDeferred
var d = new Deferred();
d.call();
d.next(function() {alert("hoge");}); //呼ばれない
  1. 合成可。コールバックの戻り値がDeferredの場合、チェーンの実行は一時停止され、子チェーンの完了を待機。
  2. 並列実行可。DeferredListを使う。
  3. エラーハンドラはaddErrBackで追加できる。ただし、ErrorEventが送出されたときにerrbackを呼ぶように作っておく必要はある。ハンドラ内で非Errorを返した場合、回復したものとしてコールバックチェーンを再開。
  4. 続行可。addBothで、エラーが起こるにせよ起こらないにせよ実行したい後続の処理を追加すればよい。
  5. 中止は可。何もしないと、現在の処理を終えてからようやく中止されるが、コンストラクタに適切なキャンセル関数を指定してやれば即座に中止するDeferredを作ることも可。
  6. 値はコールバック関数の引数で受け取り、戻り値で後続の処理に引き継ぐ。
  7. クロージャを追加できる。第2引数以降を指定すれば、引数を束縛した関数を追加することもできる。
  8. 非同期処理の追加をいつでもできる。実行後であれば、追加した瞬間実行される。ただし、子チェーンであると判明したDeferredには以後コールバックの追加は許されない(Chained Deferreds can not be re-used)。Mochikitの実装でそれを許すと、処理の流れは2本に分岐してしまうからか?
  9. Deferredを返す関数を呼び出して、非同期処理の実行を開始すると同時に、後追いでチェーンを構築するケースが多いと思う(暗黙的開始)。あるいは事前にチェーンを構築してcallbackを呼び出す(明示的開始)。
  10. Deferredは1回しかcallbackが許されない(AlreadyCalledError)。Deferredを返す関数は毎回フレッシュなDeferredを使うのが原則と思われ、そのように実装している限りにおいて再実行は可能といえなくもないが、オブジェクトは再利用できていない。
  11. 進捗は通知されない。
  12. 完了通知をイベントに戻したければ、誰かにdispatchEventしてもらうことになる。イベントを一度だけリッスンしてDeferredを返す関数を定義しておけば、いろいろと便利になると思われる。

cho45 JSDeferred

next(function () {
    alert("1");
}).
next(function () {
    alert("2");
    // child Deferred
    return next(function () {
        alert("3");
    });
}).
next(function () {
    alert("4");
});
alert("0");

Mochikitの実装をもっとシンプルにできそうなので作ったとのライブラリです。こちらもJSですが、AS3へ移植したものもあるようです(並列処理を直列に繋ぐためのショートカットメソッドDeferred#parallelが加えられている点を除き同じかと思わる)。Mochikitとの違いとして、単方向リストを使用したミニマルな実装であること、nextをメソッドチェーンすると最後に追加されたDeferredを指すこと、上の例のように開始は即座でなく同期処理の後に先送りされること(さもないとチェインの構築が間に合わない?)、callやfailによるチェーンの再実行が許可されていること、チェーンの実行後にハンドラを追加しても自動的には実行されないこと、などがあると思います。

  1. 合成可。コールバックの戻り値がDeferredの場合、このようにチェーンを繋ぎなおす。
  2. 並列実行可。Deferred.parallelを使う。
  3. エラーハンドラはerrorで追加できる。ただし、ErrorEventが送出されたときにfailを呼ぶように作っておく必要はある。ハンドラの中で改めてthrowしなければ、回復したものとしてコールバックチェーンを再開。
  4. 続行可。エラーが起こる可能性がある処理の後に、何もしないエラーハンドラを追加すればよい。
  5. 中止は可。ただし、直列化された非同期処理の中止は、現在実行中のDeferredをライブラリ側で関知してくれないため、少々やりにくいと思われる。
  6. 値はコールバック関数の引数で受け取り、戻り値で後続の処理に引き継ぐ。
  7. クロージャを追加できる。引数を束縛させる機能はないので、やりたければ自前でやる。
  8. 非同期処理の追加を実行中にもできるが、実行後に追加した処理は開始されないと思われる。
  9. 先頭のnextがディレイを噛ませた隙にチェーンを構築するケースが多いと思う(暗黙的開始)。あるいは事前にチェーンを構築してcallを呼び出す(明示的開始)。
  10. チェーン先頭のDeferredを保持しておいて必要なときにcallすれば可。
  11. 進捗は通知されない。
  12. 完了通知をイベントに戻したければ、誰かにdispatchEventしてもらうことになる。イベントを一度だけリッスンしてDeferredを返す関数を定義しておけば、いろいろと便利になると思われる。

voidelement Chain

Chain.parallel({
	foo: { time:5, onComplete:"5000ms" },
	bar: {
		text:"http://www.google.co.jp/",
		onStart:"load text: start",
		onComplete:"load text: complete"
	}
}).
next( function():void {
	trace( this.data.bar );
});

JSDeferredのAS実装であるASDeferredをベースに、よく使う処理を簡単に書けるような工夫が施されています。直列処理の追加は、next(function() {...})のような基本的なメソッドに加え、イベントを一度だけ待つlistenや、アニメーションを行うtweenerといった各種ショートカットメソッドを直にメソッドチェーンできるようになっています。並列処理の追加は、辞書を渡すようになっており、それぞれの非同期処理にラベルをつけて結果を後から参照しやすくするついでに、定型的な処理であればそれも辞書で書いてしまおう、といった感じになっています。

  1. 合成可。コールバックの戻り値がChainの場合、このようにチェーンを繋ぎなおす。
  2. 並列実行可。parallelを使う。
  3. エラーハンドラはerrorで追加できる。ライブラリで提供されているChainは、ErrorEventが送出されたときにエラーハンドラが呼ばれるようになっている。ハンドラの中で改めてthrowしなければ、回復したものとしてコールバックチェーンを再開。
  4. 続行可。エラーが起こる可能性がある処理の後に、何もしないエラーハンドラを追加すればよい。
  5. 中止は可。ただし、直列化された非同期処理の中止は、現在実行中のChainをライブラリ側で関知してくれないため、少々やりにくいと思われる。
  6. 値はコールバック関数の引数ではなく、this.dataで受け取り、戻り値で後続の処理に引き継ぐ。戻り値のない処理がチェーンの途中に挟まっても、最後に得たdataを辿れるよう双方向リストになっている。多分。
  7. nextやerrorでクロージャを追加できる他、よく使う処理の追加は専用メソッドで行える。
  8. 非同期処理の追加を実行中にもできるが、実行後に追加した処理は開始されないと思われる。
  9. 先頭のChain.hogehogeがディレイを噛ませた隙にチェーンを構築するケースが多いと思う(暗黙的開始)。あるいは事前にチェーンを構築してexecuteを呼び出す(明示的開始)。
  10. チェーン先頭のChainを保持しておいて必要なときにexecuteすれば可。
  11. 進捗は通知されない。
  12. 完了通知をイベントに戻したければ、誰かにdispatchEventしてもらうことになる。ListenerChainを利用すると、イベントから簡単にChainを作れる。

secondlife Chain
試してみようと思っていたものの、ソースのリンク死んでる模様??

beinteractive Thread

/* 前略 */

// リサイズ監視用スレッド
new ResizeHandlingThread(context).start();
// 検索制御用スレッド
new SearchControlThread(context).start();
// 初期アニメーション等を行うスレッド
new IntroThread(context).start();
// 3D 制御用スレッド
new PapervisionControlThread(context).start();

/* 後略 */

addEventListenerを隠蔽し、非同期処理を同期処理の感覚で書けるライブラリということですが、むしろ状態遷移の多いリアルタイム処理をスマートに書けるようにするためのフレームワークといった感じで、今までのものとはちょっと異質です。個々のThreadは自分の依存するリソースやイベントのことだけ考えればよく、それらの待機は同期処理の感覚で行えるため、時間的しがらみを超えてコントローラを分割できるようになります。また、ロード中やオーバー待ちといった状態を一つのメソッドと対応付けできるので、状態遷移図を素直にコードに落とし込むようにしてプログラムが書けるのもメリットの一つかと思います。

  1. 合成可。SerialExecutorやParallelExecutorはThreadを継承している。
  2. 並列実行可。ParallelExecutorを使う。
  3. エラーハンドラはerrorで追加できる。第一引数には補足するエラーの型を、第二引数にハンドラを指定。ハンドラが設定されていなかったり、ハンドラ中にさらにエラーが起きれば、親スレッドに伝播。ハンドラが無事終了すると回復したものとしてエラーが発生する前に実行されていた実行関数から再開するが、ハンドラ内でnextを呼び出した場合はそこから再開。
  4. 続行可。このようにする。SerialExecutorが途中でこけたとき、無視して続行するにはどうするんだろう?
  5. 中止はinterruptで可。ただし、割り込みハンドラを設定していない状態で待機中のThreadを中止しようするとInterruptedErrorとなる。自作Threadを中止に対応させるにはcheckInterruptedで割り込みされたか調べて行う。
  6. 値の受け渡しというか保持は、Threadごとの実装となる。例えば、URLLoaderThreadはloaderを保持する。それとは別に、遅延リソースの生産者と消費者を結合させないためにMonitorが使える。
  7. スレッドを使うためのスレッドを書かなければいけないケースがほとんどだと思われる。まるでネズミk(ry
  8. だんだん質問に答える意味がよく分からなくなってきたが、SerialExecutorのaddThreadは実行中にもできると思うけど実行後に追加した処理は開始されないと思われる。
  9. Thread#startで開始。
  10. startは一度しか呼び出せないので、新規のスレッドを作って再実行する。
  11. 進捗はIProgressを実装した各種Threadからイベントで通知される。それらのトータルの進捗を通知してほしければ、それぞれをMultiProgressにaddProgressしてやる。ただし、重みを指定する必要があるので、ファイルサイズの比は事前に調べておく。
  12. Threadを使っていろいろと処理した後に、続きの処理はイベントドリブンにしたいとか万が一あれば、EventDispatcherThreadを継承してdispatchEventする感じになると思われる。eventメソッドで特定のイベントが起きるまでThreadを待機させることができる。ただし、nextメソッドによって次の実行関数が設定された場合は待機しない。これをうまく利用して、読みやすい状態マシンを書くこともできるようだ。