Agile Cat — in the cloud

Twitter サーチを 3倍速にする新アーキテクチャとは? _3

Posted in Big Data, Twitter by Agile Cat on April 17, 2011

Twitter Search is Now 3x Faster
Wednesday, April 6, 2011
http://engineering.twitter.com/2011/04/twitter-search-is-now-3x-faster_1656.html

ーーーーー

これは、三部作の最終回です。初めての方は、1 と 2 をお先に ど~ぞ。

Twitter サーチを 3倍速にする新アーキテクチャとは? _1
Twitter サーチを 3倍速にする新アーキテクチャとは? _2
Twitter サーチを 3倍速にする新アーキテクチャとは? _3

ーーー __AC Stamp 2

ーーーーー

MULTIPLEXING INCOMING REQUESTS

Because workflows are mapped to Netty pipelines in Blender, we needed to route incoming client requests to the appropriate pipeline. For this, we built a proxy layer that multiplexes and routes client requests to pipelines as follows:

このワークフローは、Brender における Netty パイプラインにマップされるため、Incoming リクエストを、適切なパイプラインに送る必要があった。 そのため、以下のようなマルチプレックスのプロキシー・レイヤを構築し、パイプラインにクライアント・リクエストをルーティングしている:

  • When a remote Thrift client opens a persistent connection to Blender, the proxy layer creates a map of local clients, one for each of the local workflow servers. Note that all local workflow servers are running inside Blender’s JVM process and are instantiated when the Blender process starts.
  • When the request arrives at the socket, the proxy layer reads it, figures out which workflow is requested, and routes it to the appropriate workflow server.
  • Similarly, when the response arrives from the local workflow server, the proxy reads it and writes the response back to the remote client.
  • リモート Thrift クライアントから Blender に、永続的なコネクションをオープンするときに、このプロキシー・レイヤがローカル・クライアントを作成し、それぞれのローカル・ワークフロー・サーバーにマップしていく。 注意すべきことは、すべてのローカル・ワークフロー・サーバーが、Bernder の JVM プロセス内で実行され、また、Brender プロセスがスタートするときにインスタンス化される点である。
  • それらのリクエストがソケットに到着するとき、プロキシー・レイヤによる読み込みが行われ、求められるワークフローの種類が理解され、適切なワークフロー・サーバーへのルーティングが行われる。
  • それと同様に、それらのローカル・ワークフロー・サーバーからレスポンスが戻ったとき、対象となるプロキシーによる読み込みが行われ、リモート・クライアントへのレスポンスの書込みが完了する。

We made use of Netty’s event-driven model to accomplish all the above tasks asynchronously so that no thread waits on I/O.

それら全てのタスクを非同期化するために、Netty のイベント駆動型モデルを利用することで、I/O の待ち時間が排除された。

imageDISPATCHING BACK-END REQUESTS

Once the query arrives at a workflow pipeline, it passes through the sequence of service handlers as defined by the workflow. Each service handler constructs the appropriate back-end request for that query and issues it to the remote server. For example, the real-time service handler constructs a realtime search request and issues it to one or more realtime index servers asynchronously. We are using the twitter commons library (recently open-sourced!) to provide connection-pool management, load-balancing, and dead host detection.

ワークフロー・パイプラインに到着したクエリーは、対象となるワークフローで定義さたとおりに、サービス・ハンドラーのシーケンスを通過していく。 続いて、それぞれのサービス・ハンドラーが、対象となるクエリーに適したバックエンド・リクエストを構成し、リモート・サーバーへ向けて送り出す。 たとえば、リアルタイム・サービス・ハンドラーは、リアルタイム・サーチ・リクエストを構成し、複数のリアルタイム・インデックス・サーバーへ向けて、そのリクエストを非同期で発行する。私たちは、コネクション・プール管理および、ロード・バランシング、デッド・ホスト検出を提供するために、 Twitter コモン・ライブラリ(最近オープンソース化された!)を使用している。

The I/O thread that is processing the query is freed when all the back-end requests have been dispatched. A timer thread checks every few milliseconds to see if any of the back-end responses have returned from remote servers and sets a flag indicating if the request succeeded, timed out, or failed. We maintain one object over the lifetime of the search query to manage this type of data.

すべてのバックエンド・リクエストがディスパッチされると、そのクエリーを処理している I/O スレッドが解放される。 タイマー・スレッドは、数ミリ秒ごとにチェックを行い、リモート・サーバーから返されたバックエンド・レスポンスを確認し、トリガーとなったリクエストに対して、succeeded/timed out/failed といったフラグをセットしていく。 私たちのシステムでは、この種のデータを管理するために、サーチ・クエリーのライフタイムをカバーするよう、そのオブジェクトを保持している。

Successful responses are aggregated and passed to the next batch of service handlers in the workflow pipeline. When all responses from the first batch have arrived, the second batch of asynchronous requests are made. This process is repeated until we have completed the workflow or the workflow’s timeout has elapsed.

成功したレスポンスはアグリゲートされ、そのワークフロー・パイプライン内におけるサービス・ハンドラーの、次のバッチに受け渡される。 最初のバッチからの、すべてのレスポンスが到着したとき、非同時性のリクエストにおける 2番目のバッチが作成される。 そのワークフローが完了するまで、あるいは、ワークフローのタイムアウトが経過するまで、このプロセスは繰り返される。

As you can see, throughout the execution of a workflow, no thread busy-waits on I/O. This allows us to efficiently use the CPU on our Blender machines and handle a large number of concurrent requests. We also save on latency as we can execute most requests to back-end services in parallel.

ここまでに確認してきたように、ワークフローの実行を通じて、 I/O 上のスレッド・ビジー待ちは発生しない。つまり、Blender マシン上での CPU 利用および、大量のコンカレント・リクエスト処理において、効率化が実現される。 そして、並列化されたバックエンド・サービスで、大半のリクエストを処理するときには、レイテンシーを抑えることができる。

BLENDER DEPLOYMENT AND FUTURE WORK

To ensure a high quality of service while introducing Blender into our system, we are using the old Ruby on Rails front-end servers as proxies for routing thrift requests to our Blender cluster. Using the old front-end servers as proxies allows us to provide a consistent user experience while making significant changes to the underlying technology. In the next phase of our deploy, we will eliminate Ruby on Rails entirely from the search stack, connecting users directly to Blender and potentially reducing latencies even further.

このシステムに Blender を導入するのと並行して、高品質のサービスを保証するために、従来からの Ruby on Rails フロントエンド・サーバーをプロキシーとして利用し、Blender クラスタへ向けて Thtift リクエストを送っている。 フロントエンド・サーバーをプロキシーとして使用することで、基礎をなすテクノロジーに相当量の変更を施す間も、一貫したユーザー・エクスペリエンスの提供が実現される。 私たちの、次のディプロイ・フェーズでは、このサーチ・スタックから Ruby on Rails を完全に取り除き、ユーザーと Blender のダイレクトな接続を実現し、また、さらなるレイテンシーの低減を目指すだろう。

—@twittersearch

ACKNOWLEDGEMENTS

The following Twitter engineers worked on Blender: Abhi Khune, Aneesh Sharma, Brian Larson, Frost Li, Gilad Mishne, Krishna Gade, Michael Busch, Mike Hayes, Patrick Lok, Raghavendra Prabhu, Sam Luckenbill, Tian Wang, Yi Zhuang, Zhenghua Li.

ーーーーー

いやぁ~~~ 面白かったです。 このポストに関しては、いろいろな方から Twitter でコメントいただきました。 おかげさまで、とても多くの方が注目するアーキテクチャなのだと、実感することができました。 ーーー __AC Stamp 2

ーーーーー

<関連>

Twitter サーチを 3倍速にする新アーキテクチャとは? _1
Twitter サーチを 3倍速にする新アーキテクチャとは? _2
Twitter サーチを 3倍速にする新アーキテクチャとは? _3
ーーーーー
Blenderに近いアーキテクチャになっているMessage Pack RPCのJava版の実装
Talk about Scala – SlideShare ( @yasushia さんより)
Twitter における、Ruby から Java への回帰とは?
ーーーーー
Twitter は 毎日、46万ユーザーを増やし、1億 4000万ツイートを発信する
TOPSY の Twitter 分析 API は、5 億クエリー/月 を処理する!
Twitter が、新しいデータセンターへの移行を完了
Happy Birthday Twitter – 5 歳になりました!

Twitter サーチを 3倍速にする新アーキテクチャとは? _2

Posted in Big Data, Twitter by Agile Cat on April 15, 2011

Twitter Search is Now 3x Faster
Wednesday, April 6, 2011
http://engineering.twitter.com/2011/04/twitter-search-is-now-3x-faster_1656.html

ーーーーー

初めての方は、1 をお先に ど~ぞ。三部作になっています。

Twitter サーチを 3倍速にする新アーキテクチャとは? _1
Twitter サーチを 3倍速にする新アーキテクチャとは? _2
Twitter サーチを 3倍速にする新アーキテクチャとは? _3

ーーー __AC Stamp 2

ーーーーー

BLENDER OVERVIEW

Blender is a Thrift and HTTP service built on Netty, a highly-scalable NIO client server library written in Java that enables the development of a variety of protocol servers and clients quickly and easily. We chose Netty over some of its other competitors, like Mina and Jetty, because it has a cleaner API, better documentation and, more importantly, because several other projects at Twitter are using this framework. To make Netty work with Thrift, we wrote a simple Thrift codec that decodes the incoming Thrift request from Netty’s channel buffer, when it is read from the socket and encodes the outgoing Thrift response, when it is written to the socket.

Blender とは、Netty 上に構築された Thrift と HTTP のサービスであり、Java で記述されたスケーラブルな NIO クライアント・サーバライブラリにより、多様なプロトコルを用いるサーバーとクライアントを、迅速かつ容易に開発していく。たとえば、Mina や Jetty なども提供されるコンペティションの中から Netty を選んだ理由としては、整理された API や、洗練されたドキュメントなどが挙げられるが、それらよりも重要なのは、Twitter ににおける他のプロジェクトが、このフレームワークを用いている点にある。 Netty と Thrift を組み合わせるために、Netty のチャネル・バッファからの incoming Thrift のリクエストをデコードする、シンプルな Thrift コーデックを記述した。それは、ソケットからの読み込みと、outgoing Thrift レスポンスをエンコードするとき、また、ソケットへの書き込みを行うときに機能する。

Netty defines a key abstraction, called a Channel, to encapsulate a connection to a network socket that provides an interface to do a set of I/O operations like read, write, connect, and bind. All channel I/O operations are asynchronous in nature. This means any I/O call returns immediately with a ChannelFuture instance that notifies whether the requested I/O operations succeed, fail, or are canceled.

Netty は、Channel と呼ばれる重要な抽象概念を定義する。それにより、read/write/connect/bind といった I/O オペレーションを設定する、インターフェイスのためのネットワーク・ソケットへの接続がカプセル化される。 すべての Channel I/O オペレーションは、本質的に非同期である。つまり、あらゆる I/O コールが、要求された I/O オペレーションが succeed/fail/canceled されたことを通知する、ChannelFuture インスタンスを直ちに返すことになる。

When a Netty server accepts a new connection, it creates a new channel pipeline to process it. A channel pipeline is nothing but a sequence of channel handlers that implements the business logic needed to process the request. In the next section, we show how Blender maps these pipelines to query processing workflows.

Netty サーバーが新しいコネクションを受け入れるとき、その処理のためお、新しいチャンネル・パイプラインが作成される。 このチャネル・パイプラインは、リクエストを処理するために必要なビジネス・ロジックを実装する、チャネル・ハンドラーのシーケンスに過ぎない。次のセクションでは、それらのパイプラインと、クエリー処理ワークフローを、Blender がマップする方式を説明する。

WORKFLOW FRAMEWORK

In Blender, a workflow is a set of back-end services with dependencies between them, which must be processed to serve an incoming request. Blender automatically resolves dependencies between services, for example, if service A depends on service B, A is queried first and its results are passed to B. It is convenient to represent workflows as directed acyclic graphs (see below).

Blender において、ワークフローとは総合に依存するバックエンド・サービスのセットのことであり、incoming リクエストを取り扱うために処理されるものとなる。 そして Blender は、それらのサービス間の依存性を自動的に解決する。 たとえば、Service A が Service B に依存している場合には、最初に A がクエリーされ、その結果が B に手渡される。このワークフローを、非環式の有向グラフを用いて表現すると解りやすい(以下を参照)。

Sample Blender Workflow with 6 Back-end Services

In the sample workflow above, we have 6 services {s1, s2, s3, s4, s5, s6} with dependencies between them. The directed edge from s3 to s1 means that s3 must be called before calling s1 because s1 needs the results from s3. Given such a workflow, the Blender framework performs a topological sort on the DAG to determine the total ordering of services, which is the order in which they must be called. The execution order of the above workflow would be {(s3, s4), (s1, s5, s6), (s2)}. This means s3 and s4 can be called in parallel in the first batch, and once their responses are returned, s1, s5, and s6 can be called in parallel in the next batch, before finally calling s2.

上記のサンプル・ワークフローには、相互に依存する 6つの Service  {s1, s2, s3, s4, s5, s6} がある。s3 から s1 へ向けられた方向性を持つエッジは、s1 をコールする前に s3 をホールする必要性を示している。 その理由は、s1 が s3 の結果を要求する点にある。 このようなワークフローを前提として、Blender フレームワークは、それぞれの Service がコールされる順番を決定するために、DAG 上でのトポロジカル・ソートを行なう。 上記のワークフローにおける実行の順序は、{(s3, s4), (s1, s5, s6), (s2)} となるはずだ。つまり、最初のバッチにより、s3 と s4 をパラレルにコールできる。続いて、それらからのリターンの後に、s1 と s5 と s6 をパラレルにバッチ処理し、最後に s2 をコールする。

Once Blender determines the execution order of a workflow, it is mapped to a Netty pipeline. This pipeline is a sequence of handlers that the request needs to pass through for processing.

Blender がワークフローの実行順序を決定した後に、Netty パイプラインとのマップが行われる。 このパイプラインは、全体的な処理を通じて、受け渡しが必要なリクエストを、処理するためのシーケンスとなる。

ーーーーー

昨日の、okachimachiorz1 ジェイムズ・ブッカー先生によりますれば、『 Blenderすか。非同期+DAG+Scalaですか。まさにクラウド型のアーキテクチャです。 要チェックですな 』 とのことです。 なる! ーーー __AC Stamp 2

ーーーーー

<関連>

Twitter サーチを 3倍速にする新アーキテクチャとは? _1
Twitter サーチを 3倍速にする新アーキテクチャとは? _2
Twitter サーチを 3倍速にする新アーキテクチャとは? _3
ーーーーー
Blenderに近いアーキテクチャになっているMessage Pack RPCのJava版の実装
Talk about Scala – SlideShare ( @yasushia さんより)
Twitter における、Ruby から Java への回帰とは?
ーーーーー
Twitter は 毎日、46万ユーザーを増やし、1億 4000万ツイートを発信する
TOPSY の Twitter 分析 API は、5 億クエリー/月 を処理する!
10億人のユーザーを目指す、Twitter の 6つの戦略とは?
日本の大震災で証明された Twitter と Facebook の SOS パワー
Twitter が、新しいデータセンターへの移行を完了
Happy Birthday Twitter – 5 歳になりました!

Twitter サーチを 3倍速にする新アーキテクチャとは? _1

Posted in .Selected, Big Data, Twitter by Agile Cat on April 14, 2011

Twitter Search is Now 3x Faster
Wednesday, April 6, 2011
http://engineering.twitter.com/2011/04/twitter-search-is-now-3x-faster_1656.html

clip_image001

ーーーーー

三部作になっています。続編も ど~ぞ。

Twitter サーチを 3倍速にする新アーキテクチャとは? _1
Twitter サーチを 3倍速にする新アーキテクチャとは? _2
Twitter サーチを 3倍速にする新アーキテクチャとは? _3

ーーー __AC Stamp 2

ーーーーー

In the spring of 2010, the search team at Twitter started to rewrite our search engine in order to serve our ever-growing traffic, improve the end-user latency and availability of our service, and enable rapid development of new search features. As part of the effort, we launched a new real-time search engine, changing our back-end from MySQL to a real-time version of Lucene. Last week, we launched a replacement for our Ruby-on-Rails front-end: a Java server we call Blender. We are pleased to announce that this change has produced a 3x drop in search latencies and will enable us to rapidly iterate on search features in the coming months.

2010年の春のことだが、 Twitter におけるサーチ・チームは、そのサーチ・エンジンを書き直し始めた。 その理由は、永遠に増大していくトラフィックに対処し、エンドユーザーにおけるレイテンシーとサービスの可用性を改善し、新しいサーチ機能の迅速な開発を可能にする点にあった。 その作業の一部として、Twitter のバックエンドを、MySQL  から Lucene のリアルタイム・バージョンに変更することで新しいリアルタイム・サーチ・エンジンを立ち上げている。 そして先週、私たちは Ruby-on-Rails のフロントエンドを、Blender という Java サーバーに置き換えた。 この変更により、サーチ・レイテンシーを 1/3 に低減し、今後の数ヶ月という短い期間において、いくつかのサーチ機能を改善していけるようになった。 このような発表ができて、とても嬉しく思う。

PERFORMANCE GAINS

Twitter search is one of the most heavily-trafficked search engines in the world, serving over one billion queries per day. The week before we deployed Blender, the #tsunami in Japan contributed to a significant increase in query load and a related spike in search latencies. Following the launch of Blender, our 95th percentile latencies were reduced by 3x from 800ms to 250ms and CPU load on our front-end servers was cut in half. We now have the capacity to serve 10x the number of requests per machine. This means we can support the same number of requests with fewer servers, reducing our front-end service costs.

Twitter サーチは、1日あたり 10億以上のクエリーをサポートするという、最も多くのトラフィックを発生させるサーチ・エンジンの 1つである。 この Blender をディプロイする前の週に、日本における #tsunami により、著しいクエリーが発生し、サーチ・レイテンシーに大きなピークが生じた。 そして、この Blender の立ち上げの後には、私たちの 95 パーセンタイル・レイテンシが、800ms から 250ms へと、約 1/3 に低減された。 さらに、フロントエンド・サーバー上の CPU 負荷は、半分にカットされた。 そして、今では、マシンごとのリクエストに関するキャパシティを、10 倍に引き上げる能力を持つに至った。 それにより、以前と同じだけのリクエスト量を、より少ないサーバー台数で処理することが可能となり、フロントエンド・サーバーに関するコストも低減できるようになった。

image

95th Percentile Search API Latencies Before and After Blender Launch

TWITTER’S IMPROVED SEARCH ARCHITECTURE

In order to understand the performance gains, you must first understand the inefficiencies of our former Ruby-on-Rails front-end servers. The front ends ran a fixed number of single-threaded rails worker processes, each of which did the following:

このパフォーマンス向上の理由を理解するには、以前の Ruby-on-Rails フロントエンド・サーバーの非効率について、最初に理解しなければならない。 このフロントエンドは、固定した数のシングル・スレッド Rails Worker プロセスを実行してており、個々のスレッドは以下の項目を処理していく:

  • parsed queries
  • queried index servers synchronously
  • aggregated and rendered results
  • パーズされたクエリー
  • クエリーされたインデックスのサーバー間同期
  • アグリゲーションとレンダリングの結果

We have long known that the model of synchronous request processing uses our CPUs inefficiently. Over time, we had also accrued significant technical debt in our Ruby code base, making it hard to add features and improve the reliability of our search engine. Blender addresses these issues by:

この同期プロセス処理のモデルが、CPU 非効率に消費していることを、私たちは以前から認識していた。 そして、長い時間を経ることで、この Ruby コードベース上にテクニカルな負債を大量に蓄積しており、サーチ・エンジンにおける信頼性の改善や、新しい機能の追加を難しくしていた。 そして、Blender により、以下の問題に取り組んでいる:

  1. Creating a fully asynchronous aggregation service. No thread waits on network I/O to complete.
  2. Aggregating results from back-end services, for example, the real-time, top tweet, and geo indices.
  3. Elegantly dealing with dependencies between services. Workflows automatically handle transitive dependencies between back-end services.
  1. 完全な、非同期アグリゲーション・サービスの構築。ネットワーク I/O の終了を、スレッドが待たない方式。
  2. バックエンド・サービスからの結果をアグリゲート。 たとえば、リアルタイムや、トップ・ツイート、地理インデックスなど。
  3. サービス間の依存性を、エレガントに取り扱う。 Workflows は、バックエンド・サービス間における、推移的な依存性を自動的に処理する。

The following diagram shows the architecture of Twitter’s search engine. Queries from the website, API, or internal clients at Twitter are issued to Blender via a hardware load balancer. Blender parses the query and then issues it to back-end services, using workflows to handle dependencies between the services. Finally, results from the services are merged and rendered in the appropriate language for the client.

以下のダイアグラムは、Twitter サーチ・エンジンのアーキテクチャを示す。  Twitter における Webサイト/API/内部クライアントからのクエリーは、ハードウェア・ロードバランサーを介して Blender に発行される。 Blender により解析されたクエリーは、サービス間に依存性を処理するワークフローを用いて、バックエンド・サービスへと発行される。最終的に、このサービスからの結果がマージされ、対象となるクライアントに適した言語でレンダリングされる。

Twitter Search Architecture with Blender

ーーーーー

大好評だった、昨日のポスト 『 Twitter における、Ruby から Java への回帰とは? 』の元ネタは、この Twitter Blog の記事です。 長いので、とりあえず Part_1 としてポストします。なお、Twitter で頂いたコメントは、Facebook Page に整理しています。 ーーー __AC Stamp 2

ーーーーー

<関連>

Twitter サーチを 3倍速にする新アーキテクチャとは? _1
Twitter サーチを 3倍速にする新アーキテクチャとは? _2
Twitter サーチを 3倍速にする新アーキテクチャとは? _3
ーーーーー
Blenderに近いアーキテクチャになっているMessage Pack RPCのJava版の実装
Talk about Scala – SlideShare ( @yasushia さんより)
Twitter における、Ruby から Java への回帰とは?
ーーーーー

Twitter は 毎日、46万ユーザーを増やし、1億 4000万ツイートを発信する
TOPSY の Twitter 分析 API は、5 億クエリー/月 を処理する!
10億人のユーザーを目指す、Twitter の 6つの戦略とは?
日本の大震災で証明された Twitter と Facebook の SOS パワー
Twitter が、新しいデータセンターへの移行を完了
Happy Birthday Twitter – 5 歳になりました!

Dryad が DAG をつかう理由 – Dryad & DryadLINQ Team Blog

Posted in Hadoop, MS-MapReduce by Agile Cat on August 24, 2010

Why does Dryad use a DAG? – Dryad & DryadLINQ Team Blog
http://blogs.msdn.com/b/dryad/archive/2010/07/23/why-does-dryad-use-a-dag.aspx

image

The basic computational model we decided to adopt for Dryad is the directed-acyclic graph (DAG). Each node in the graph is a computation, and each edge in the graph is a stream of data traveling in the direction of the edge. The amount of data on any given edge is assumed to be finite, the computations are assumed to be deterministic, and the inputs are assumed to be immutable. This isn’t by any means a new way of structuring a distributed computation (for example Condor had DAGMan long before Dryad came along), but it seemed like a sweet spot in the design space given our other constraints.

私たちが Dryad に適用することに決めた基本的な計算モデルは、DAG(directed-acyclic graph:有向非巡回グラフ)である。 このグラフにおける個々のノードで計算が行われ、また、グラフにおける個々のエッジが、その方向をデータを送り出すストリームとなる。 あらゆるエッジ上のデータ量は有限と想定され、また、その計算は決定論的なものと想定され、入力は不変であると想定されるべきである。 それは、分散された計算を構成するための新しい方法ではないが(たとえば Dryad が登場するずっと以前に、CondorDAGMan を有している)、他の制約を持つデザイン・スペースにおけるスイート・スポットだと想定される。

So, why is this a sweet spot? A DAG is very convenient because it induces an ordering on the nodes in the graph. That makes it easy to design scheduling policies, since you can define a node to be ready when its inputs are available, and at any time you can choose to schedule as many ready nodes as you like in whatever order you like, and as long as you always have at least one scheduled you will continue to make progress and never deadlock. It also makes fault-tolerance easy, since given our determinism and immutability assumptions you can backtrack as far as you want in the DAG and re-execute as many nodes as you like to regenerate intermediate data that has been lost or is unavailable due to cluster failures.

それは、なぜ、スイート・スポットになるのだろうか? DAG の利便性の高さは、グラフ内にノードの順序を取り込んでいる点にある。 そのため、入力に際してノードが行うべきことを定義できるようになるため、スケジューリング・ポリシーのデザインが容易になる。そして、準備が済んでいる大量のノードを、どんな時にでも希望する順序のとおりに選択して、スケジューリングすることが可能となる。さらに、少なくとも 1つのスケジュールされたノードがある限り、その処理は継続され、また、デッドロックが起こることはないだろう。また、私たちの決定性と普遍性の仮説を前提として、DAG  内の任意の場所をバックトラックできるため、フォールト・トレランスも容易になる。なお、クラスタの失敗などにより、消失あるいは利用不能になってしまった中間データを再生したいする場合には、それに見合うだけのノードを再実行できる。

image

クリックで拡大 ⇒
https://nosqleast.com/2009/slides/yu-dryad.pdf より

The obvious way to generalize the DAG model would be to allow cycles in the graph, but that makes both scheduling and fault-tolerance more complicated. They are still possible, but we decided to go with the simpler approach and see what we ended up being able to do with it; and leave as a research question the extent to which adding cycles would be useful.

DAG モデルを普及させるには、そのグラフでの巡回を許す方式が明らかに有効だろうが、ケジューリングとフォールト・トレランスがさらに複雑になってしまう。それを実現する可能性も残されているが、私たちの決定においては、より単純なアプローチを選択し、その結果を見届けることになった。つまり、巡回を追加する有用性については、研究課題として残すことに決定した。

On the other hand, there’s no obvious way to restrict the DAG model that makes the underlying system any simpler. Although MapReduce is I think a simpler programming model than Dryad, arguably the system itself is, at least conceptually, more complicated, since the Map nodes and the Reduce nodes have different scheduling and fault-tolerance properties and implementations. In addition when the system wants to optimize some data transfer, e.g. building an aggregation tree or sampling a dataset to find a balanced range partition, the optimization can usually be expressed as a subgraph of a DAG, and doesn’t need to be “hand-rolled” the way it would if you wanted to add it to a system like Hadoop. So the big advantage of adopting a general DAG as the low-level computational abstraction is that there’s just one state machine that handles all computation nodes. Of course, any real implementation of MapReduce or a DAG-driven system like Dryad can get complicated, but the topology of the data flow is not in itself a source of complexity in Dryad.

その一方で、基本的なシステムをシンプルにするために、DAG モデルを制約していく明白な方法は存在しない。 おそらく、MapReduce は Dryad よりも単純なプログラミング・モデルだと思われるが異なるスケジューリングとフォールト・トレランスのプロパティおよび実装を、Map ノードと Reduce ノードが有するため、少なくともシステム自身は、概念的に複雑なものとなる。 それに加えて、たとえば、アグリゲーション・ツリーの構築や、バランスのとれたレンジのパーティションを見つけるためのデータセットのサンプリングといった、データ転送に関する最適化をシステムが要求することがある、そのときに、この最適化は DAG のサブ・グラフとして表現できるが、たとえば Hadoop のようなシステムに加えようとするなら、そこで望まれる “hand-rolled”  の方式を取り入れる必要がなくなってしまう。したがって、一般的な DAG を低レベルの計算抽象概念として採用する大きいアドバンテージは、すべての計算ノードを取り扱う 1つのステート・マシンが、そこに存在するという点に集約される。 もちろん、 現実に実装するとき、MapReduce や、Dryad などの DAG 駆動システムは、複雑なものになり得る。 しかし、データフロー・トポロジーが、Dryad の複雑さ自身の根本に含まれることはない。

One thing we learned fairly early in the Dryad project was that people don’t want to build DAGs by hand: it’s too low-level, and people don’t want to have to learn to think about how to think about their algorithms in terms of dataflow graphs. So this seems like it might be a disadvantage of something like Dryad compared with MapReduce, whose simple programming model is often touted as its big advantage. But with a couple of years of experience, and having talked to a lot of people in academia and industry who have been using Hadoop, I’m not sure that’s really true. In practice, once you get beyond a coursework assignment, most interesting computations cannot be expressed purely as a single Map followed by a single Reduce. Instead, it turns out, you typically need to take the output of the first Reduce and do another round of Map and another round of Reduce, and so on. Often you are trying to write an iterative algorithm (e.g. something like k-means) or very commonly you need to do something like a Join that combines information from two input sets. Join is the basic construct that is used for most graph traversal algorithms (you are combining data at the nodes with an edge map), and it is also used all over the place when there are common subexpressions in a computation: the common result is computed once and then “joined” with other data several times as necessary.

Dryad プロジェクトの早期において、私たちが正しく学んだ 1つに、人々が手作業で DAG を構築したがらないという事があった。 つまり、あまりにも低レベル過ぎるという事であり、データフロー・グラフの観点で、そのアルゴリズムについて学習しようと望まないことであった。 その単純なプログラミング・モデルが、大半のケースでアドバンテージとして推奨される MapReduce との比較において、そのことが Dryad のようなテクノロジーの、ディスアドバンテージになっているのかも知れない。 ただし、この 2年間のエクスペリエンスにおいて、そして Hadoop を使用している学界と業界とのディスカッションにおいて、それが本当に本当であるとは確信していない。 実際のところ、教科書的なレベルを超えると、大半の興味深い処理が、単一の Map と Reduce の組み合わせでは、表現できなくなるだろう。 それに換えて、最初の Reduce の出力を取得し、別の Map と Reduce へと展開していく必要性が導かれる。 大半のケースにおいて、反復性のアルゴリズム(たとえば k-means など)の記述や、きわめて一般的な Join  のようなものを用いた、2つの入力セットに基づく情報の結合などが必要となる。 Join は、大半のグラフ横断型アルゴリズム(データ・ノードにおいてデータとエッジ・マップを結合)において用いられる、基本的な概念である。 そして、計算における共通の副次式が存在する場合にも、園周辺で使用される。つまり、共通の結果は一度だけ計算された後に、必要に応じて何度でも、他のデータと " Join " される。

In other words, MapReduce is really too low level for most programming tasks as well; programmers don’t want to have to break up a complicated algorithm into a set of Maps and Reductions, and manually manage all the types and manually write a script to sequentially execute a series of MapReduce jobs. Good evidence for this comes from the proliferation of higher-level language layers that have been built on MapReduce and Hadoop, including Sawzall, PigLatin, HIVE, Cloudbase, and Yahoo!’s Hadoop SQL.

言い換えれば、大半のプログラミング・タスクに対して、MapReduce はあまりにも低レベルすぎる。つまり、プログラマー Map と Reduuce のセットの中に、複雑なアルゴリズムを分解し、また、それらすべての手作業で管理し、一連の MapReduce ジョブを連続して実行するスクリプトを、手作業を書こうとは思わない。その良い証拠が、MapReduce と Hadoop の上に構築された Sawzall/PigLatin/HIVE/Cloudbase/Yahoo! Hadoop SQL などを含む、高レベルい言語レイヤへの拡散である。

So: if programmers aren’t actually directly writing Map and Reduce functions but instead using a higher-level language; and the MapReduce system isn’t easier to implement than a general DAG model (especially once you have added optimizations for things like sorting and hierarchical aggregation and distribution); and the high-level language you write can generate better-performing execution plans when it has access to a general DAG rather than a limited set of constructs such as Map and Reduce, then why would you choose to implement MapReduce over something like Dryad? We don’t think you would, but then we are biased :)

そうだとすると、もしもプログラマーがMapReduce を書きたがらず代わりに高レベルの言語を使うのであれば、また、MapReduceがDAGモデルよりも簡単に書けるのではなければ(とくにひとたびソートや階層的な集約・分散に関しての最適化を加えてしまったりしたときに)、さらには汎用のDAGにアクセスできる高レベルの言語がMapReduceのような限定された構造よりよい性能の実行プランを作れるのであれば、Dryadのようなものの上にMapReduceを実装しようとするだろうか?そうは思えないが考え方が偏っているのかもしれない。 :)

Michael Isard

ーーーーー

コメント欄にあるように、最後の段落ですが、上田さんに校正していただきました訳文に、差し替えました。 おかげさまで、とても読みやすくなりました。 有難うございます。 ーーー A.C.

ーーーーー

これがウワサの、DAG(directed-acyclic graph)ですね。 日本語にすると、有向非巡回グラフ となるのでしょうか? たしかに、たとえば Hadoop などを活用していくと、その Map/Reduce が多段化してくることになり、そのフローを簡潔かつ確実に制御するための仕組みが必要になると思われます。

そのため、Apache は Zookeeper などに取り組むという構図になっているのでしょうが、あくまでも Hadoop の利用を前提としています。 その点、この記事でいうノードの中身なのですが、そこが、どのように考えられているのか、とても興味がありますね。

それと、訳に自身のないところが多々ありで、なにか変なところなど見つかりましたら、ぜひ、お知らせくださ~~~い!ーーー A.C.

ーーーーー

<関連>
MapReduce in DryadLINQ
Windows Azure MapReduce Demo
Hadoop が Microsoft の教材に?
教育機関への Dryad 提供が始まる
Apache ZooKeeper による分散並列キューの構築
Observers と ZooKeeper
Hadoop による Web 広告システムの構築と運用 :ヨーロッパでの事例
Gridmix3 : Apache Hadoop の実運用負荷をエミュレート
Microsoft readying Hadoop for Windows Azure の対訳

%d bloggers like this: