asio 03

はじめに

今回はコードを非同期化する上で肝となる io_service について取り上げる。

うんちく

前回までサンプルコードについて一切解説を入れていなかったが、前回密かに以下のような呼び出しを行っている。

io_service.run();

これを呼び出すと、何が起こるのだろうか。以下のことが起こる。

  1. 呼び出し前までにたまっていた非同期呼び出し処理が動き出す
  2. 非同期処理が終わるまで待つ
    1. イベントループが実行される

なので、非同期処理をいくら呼び出しても、io_service::run が実行されないと何も起こらない。逆に、非同期処理を呼び出す前に io_service::run が実行されてしまうとすぐに処理が完了してしまう。前回のコードから、io_service::run の行をコメントアウト実行してみるとよく分かるはずだ。さらにいうなら、コメントアウトしたすぐ下に数秒間待機するコード(Windows なら Sleep など)を書くとさらに納得できるだろう。

  1. 非同期処理を呼び出した後実際の処理が完了せずにすぐに終了してしまうコード
#define _WIN32_WINDOWS 0x0400
#define _WIN32_WINNT 0x400

#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>

using asio::ip::tcp;

void on_resolve( const asio::error_code& error,
	tcp::resolver::iterator endpoint_iterator )
{
	if ( ! error )
	{
		tcp::resolver::iterator end;
		while ( endpoint_iterator != end )
		{
			tcp::endpoint endpoint = endpoint_iterator->endpoint();
			std::cout << "  capacity: " << endpoint_iterator->endpoint().capacity() << std::endl;
			std::cout << "  data    : " << endpoint_iterator->endpoint().data() << std::endl;
			std::cout << "  port    : " << endpoint_iterator->endpoint().port() << std::endl;
			std::cout << "  size    : " < <endpoint_iterator->endpoint().size() << std::endl;

			// print address info.
			asio::ip::address address = endpoint.address();
			std::cout << "  address : " << address.to_string() << std::endl;
			std::cout << "  is_v4   : " << address.is_v4() << std::endl;
			std::cout << "  is_v6   : " << address.is_v6() << std::endl;

			std::cout << std::endl;

			endpoint_iterator++;
		}
	} else {
		std::cerr << error.message() << std::endl;
	}
}

int main( int argc, char* argv[] )
{

	try
	{
		if ( argc != 3 )
		{
			std::cerr << "Usage: resolver <host> <service>" << std::endl;
			return 1;
		}

		asio::io_service io_service;
		tcp::resolver resolver( io_service );
		tcp::resolver::query query( argv[1], argv[2] );

		// print query info.
		std::cout << query.host_name() << std::endl;
		std::cout << query.service_name() << std::endl;
		std::cout << std::endl;

		// start to resolve.
		resolver.async_resolve( query, on_resolve );
		//io_service.run();
	}
	catch ( std::exception& e )
	{
		std::cerr << e.what() << std::endl;
	}

	return 0;
}
  1. 非同期処理を呼び出してからいくら待っても実際の処理が実行されないコード
#define _WIN32_WINDOWS 0x0400
#define _WIN32_WINNT 0x400

#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>

using asio::ip::tcp;

void on_resolve( const asio::error_code& error,
	tcp::resolver::iterator endpoint_iterator )
{
	if ( ! error )
	{
		tcp::resolver::iterator end;
		while ( endpoint_iterator != end )
		{
			tcp::endpoint endpoint = endpoint_iterator->endpoint();
			std::cout << "  capacity: " << endpoint_iterator->endpoint().capacity() << std::endl;
			std::cout << "  data    : " << endpoint_iterator->endpoint().data() << std::endl;
			std::cout << "  port    : " << endpoint_iterator->endpoint().port() << std::endl;
			std::cout << "  size    : " << endpoint_iterator->endpoint().size() << std::endl;

			// print address info.
			asio::ip::address address = endpoint.address();
			std::cout << "  address : " << address.to_string() << std::endl;
			std::cout << "  is_v4   : " << address.is_v4() << std::endl;
			std::cout << "  is_v6   : " << address.is_v6() << std::endl;

			std::cout << std::endl;

			endpoint_iterator++;
		}
	} else {
		std::cerr << error.message() << std::endl;
	}
}

int main( int argc, char* argv[] )
{

	try
	{
		if ( argc != 3 )
		{
			std::cerr << "Usage: resolver <host> <service>" << std::endl;
			return 1;
		}

		asio::io_service io_service;
		tcp::resolver resolver( io_service );
		tcp::resolver::query query( argv[1], argv[2] );

		// print query info.
		std::cout << query.host_name() << std::endl;
		std::cout << query.service_name() << std::endl;
		std::cout << std::endl;

		// start to resolve.
		resolver.async_resolve( query, on_resolve );
		//io_service.run();
		Sleep( 60 * 1000 );
	}
	catch ( std::exception& e )
	{
		std::cerr << e.what() << std::endl;
	}

	return 0;
}

そこで問題になるのが、マルチスレッドアプリケーションなどで io_service::run が実行される後にアプリケーションが非同期処理を行うにはどうするか、ということだ。これは、io_service::work というクラスを使うことで対応できる。

  1. 非同期処理を呼び出さなくてもずっと io_service::run がずっと待機してくれるコード
#define _WIN32_WINDOWS 0x0400
#define _WIN32_WINNT 0x400

#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>

using asio::ip::tcp;

void on_resolve( const asio::error_code& error,
	tcp::resolver::iterator endpoint_iterator )
{
	if ( ! error )
	{
		tcp::resolver::iterator end;
		while ( endpoint_iterator != end )
		{
			tcp::endpoint endpoint = endpoint_iterator->endpoint();
			std::cout << "  capacity: " << endpoint_iterator->endpoint().capacity() << std::endl;
			std::cout << "  data    : " << endpoint_iterator->endpoint().data() << std::endl;
			std::cout << "  port    : " << endpoint_iterator->endpoint().port() << std::endl;
			std::cout << "  size    : " << endpoint_iterator->endpoint().size() << std::endl;

			// print address info.
			asio::ip::address address = endpoint.address();
			std::cout << "  address : " << address.to_string() << std::endl;
			std::cout << "  is_v4   : " << address.is_v4() << std::endl;
			std::cout << "  is_v6   : " << address.is_v6() << std::endl;

			std::cout << std::endl;

			endpoint_iterator++;
		}
	} else {
		std::cerr << error.message() << std::endl;
	}
}

int main( int argc, char* argv[] )
{

	try
	{
		if ( argc != 3 )
		{
			std::cerr << "Usage: resolver <host> <service>" << std::endl;
			return 1;
		}

		asio::io_service io_service;
		asio::io_service::work work( io_service );
		tcp::resolver resolver( io_service );
		tcp::resolver::query query( argv[1], argv[2] );

		// print query info.
		std::cout << query.host_name() << std::endl;
		std::cout << query.service_name() << std::endl;
		std::cout << std::endl;

		// start to resolve.
		//resolver.async_resolve( query, on_resolve );
		io_service.run();
	}
	catch ( std::exception& e )
	{
		std::cerr << e.what() << std::endl;
	}

	return 0;
}

このコードでなぜ io_service::run がずっと待機してくれるのかというと、関連づけられた io_service::work が破棄されるまで待機してくれるという作りになっているからだ。これで、アプリケーションが非同期処理を呼び出すタイミングに関わらずに待機することができた。そうすると、今度は非同期処理が完了しても待機が終了しない、という状態になる。

  1. 非同期処理が完了しても io_service::work によっていつまでたっても終了しないコード
#define _WIN32_WINDOWS 0x0400
#define _WIN32_WINNT 0x400

#include <iostream>
#include <boost/array.hpp>
#include <asio.hpp>

using asio::ip::tcp;

void on_resolve( const asio::error_code& error,
	tcp::resolver::iterator endpoint_iterator )
{
	if ( ! error )
	{
		tcp::resolver::iterator end;
		while ( endpoint_iterator != end )
		{
			tcp::endpoint endpoint = endpoint_iterator->endpoint();
			std::cout << "  capacity: " << endpoint_iterator->endpoint().capacity() << std::endl;
			std::cout << "  data    : " << endpoint_iterator->endpoint().data() << std::endl;
			std::cout << "  port    : " << endpoint_iterator->endpoint().port() << std::endl;
			std::cout << "  size    : " << endpoint_iterator->endpoint().size() << std::endl;

			// print address info.
			asio::ip::address address = endpoint.address();
			std::cout << "  address : " << address.to_string() << std::endl;
			std::cout << "  is_v4   : " << address.is_v4() << std::endl;
			std::cout << "  is_v6   : " << address.is_v6() << std::endl;

			std::cout << std::endl;

			endpoint_iterator++;
		}
	} else {
		std::cerr << error.message() << std::endl;
	}
}

int main( int argc, char* argv[] )
{

	try
	{
		if ( argc != 3 )
		{
			std::cerr << "Usage: resolver <host> <service>" << std::endl;
			return 1;
		}

		asio::io_service io_service;
		asio::io_service::work work( io_service );
		tcp::resolver resolver( io_service );
		tcp::resolver::query query( argv[1], argv[2] );

		// print query info.
		std::cout << query.host_name() << std::endl;
		std::cout << query.service_name() << std::endl;
		std::cout << std::endl;

		// start to resolve.
		resolver.async_resolve( query, on_resolve );
		io_service.run();
	}
	catch ( std::exception& e )
	{
		std::cerr << e.what() << std::endl;
	}

	return 0;
}

これに対処するにはアプリケーションが終了を望んだタイミングで io_service::work を破棄する必要がある。

  1. 10 秒後にユーザがアプリケーションの終了を望んだと仮定して終了するコード
#define _WIN32_WINDOWS 0x0400
#define _WIN32_WINNT 0x400

#include <iostream>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <asio.hpp>

using asio::ip::tcp;

void on_resolve( const asio::error_code& error,
	tcp::resolver::iterator endpoint_iterator )
{
	if ( ! error )
	{
		tcp::resolver::iterator end;
		while ( endpoint_iterator != end )
		{
			tcp::endpoint endpoint = endpoint_iterator->endpoint();
			std::cout << "  capacity: " << endpoint_iterator->endpoint().capacity() << std::endl;
			std::cout << "  data    : " << endpoint_iterator->endpoint().data() << std::endl;
			std::cout << "  port    : " << endpoint_iterator->endpoint().port() << std::endl;
			std::cout << "  size    : " << endpoint_iterator->endpoint().size() << std::endl;

			// print address info.
			asio::ip::address address = endpoint.address();
			std::cout << "  address : " << address.to_string() << std::endl;
			std::cout << "  is_v4   : " << address.is_v4() << std::endl;
			std::cout << "  is_v6   : " << address.is_v6() << std::endl;

			std::cout << std::endl;

			endpoint_iterator++;
		}
	} else {
		std::cerr << error.message() << std::endl;
	}
}

void app_thread( std::auto_ptr<asio::io_service::work>& work )
{
	Sleep( 10 * 1000 );
	work.reset();
}

int main( int argc, char* argv[] )
{

	try
	{
		if ( argc != 3 )
		{
			std::cerr << "Usage: resolver <host> <service>" << std::endl;
			return 1;
		}

		asio::io_service io_service;
		std::auto_ptr<asio::io_service::work> work( new asio::io_service::work( io_service ) );

		// run application thread.
		boost::thread t( boost::bind( app_thread, boost::ref( work ) ) );

		tcp::resolver resolver( io_service );
		tcp::resolver::query query( argv[1], argv[2] );

		// print query info.
		std::cout << query.host_name() << std::endl;
		std::cout << query.service_name() << std::endl;
		std::cout << std::endl;

		// start to resolve.
		resolver.async_resolve( query, on_resolve );
		io_service.run();

		t.join();
	}
	catch ( std::exception& e )
	{
		std::cerr << e.what() << std::endl;
	}

	return 0;
}

ここまで理解できたら、途中でユーザが終了を望んだときに速やかに終了することができるコードを書くことができる。上のサンプルコードでは、asio 以外に boost::thread、boost::bind、std::auto_ptr などが登場しているが、asio でうまくコードを書くにはどれも欠かすことのできないものだ(使わなくてもできるが困難を極めるだろう)。


なお、他にも io_service::run のイベントループを止める方法はいくつかある。たとえば、io_service::stop というものがあるが、これはイベントループに終了依頼をするだけで、待機処理は含まれていない。これは英語があまり理解できなくてもコードを見ても明らかである。Windows の場合以下に要になっている。

  // Stop the event processing loop.
  void stop()
  {
    if (::InterlockedExchange(&stopped_, 1) == 0)
    {
      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
      {
        DWORD last_error = ::GetLastError();
        asio::system_error e(
            asio::error_code(last_error,
              asio::error::get_system_category()),
            "pqcs");
        boost::throw_exception(e);
      }
    }
  }

ちなみに、io_service::reset というものがあるが、これはイベントループを止めるという目的で使ってはならない。ドキュメントには以下のようにある。

This function must not be called while there are any unfinished calls to the run(), run_one(), poll() or poll_one() functions.

この関数は、run()、run_one()、poll()、poll_one() といった関数の呼び出しが完了していない場合は呼び出してはいけません。

というわけで、イベントループの終了をアプリケーションから要求する場合には、io_service::work を用いるべきだと考える。これは強制されているわけではないが、事実上の標準とみなせる。これを使うことで、イベントループの終了条件を容易かつ安全に複数個にすることができる。なぜこれが安全かというと、例えばユーザがアプリケーションの×ボタンを押してウィンドウが閉じられたという条件が、イベントループを止めるための決定条件にならないことがあるからだ。アプリケーションによっては×ボタンが押された後、サーバに何らかの情報を送信してから終了する、というようなことがある。このような場合には、io_service::stop を呼び出してしまうとその後の非同期処理が実行されなくなってしまう。それはまずいだろう。


では、io_service::reset はいつ使うべきなのか?それは、io_service のインスタンスを使い回して何度かイベントループを実施したいような場合だ。


つまり、io_service::stop は本当に通信を中断しなければならないときは呼ばないようにするべきだと思う(ここまで長々と語っておきながらここにきて「思う」かよ、と思われた方、すみませんがその通りです)。


以上、ここまで述べてきた内容は、asio のリファレンスに詳しく記載されている。

補足

ここまで見てきて「こう書けばこう動くということは理解できても、なぜこうなるのかが理解できない」という方もいると思いますので補足しておきます。まず、io_service とは何なのか、ということですが、これは I/O 完了ポートを使ったイベントループを提供してくれるクラスです。io_service::work は、io_service クラスが終了する条件をインスタンスの生成・破棄という形で記述するためのクラスです。C++ はスコープによるリソース管理がとても美しく書ける言語なので、そこが活かされています。


実際、io_service::work クラスにはこれといった機能がありません。リファレンスを見ても基盤となる io_service を取得できる以外には何も機能がありません。コードを見てみると、基盤となる io_service のとあるカウンタを 1 増やしているだけです。これにより、io_service は io_service::work がすべて破棄されるまで io_service::run が完了しない、という処理を実現しています(もちろん内部では異常が発生した場合や io_service::stop が呼び出された場合には処理を抜けるようになっていますが)。

まとめ

  1. io_service::work を使って io_service::run が完了するタイミングを制御する
  2. io_service の振る舞いを十分に把握する
    1. io_service::stop を呼び出す必要があるか?
    2. io_service::reset を呼び出す必要があるか?

我ながら今回はあまり「まとめ」になっていません。

次回に向けて

ここまで asio を眺めてみると「うまくいけそうだ」と思って自分が持っているコードにも適用したくなってくるころだと思います。しかし、非同期処理を安定動作させるのは至難の業であり、その基盤となるライブラリを提供するのは困難を極めるはずです。つまり「asio は利用できるほどに安定しているのか?」ということを何かしら確認しなければいけません。次回は、asio の信頼性、asio でできないことについてできる限り検討してみます。