Agile Cat — in the cloud

Observers と ZooKeeper _1

Posted in Cloudera, Hadoop, Hadoop I/O Pipeline, Parallel by Agile Cat on January 10, 2010

Observers: Making ZooKeeper Scale Even Further
December 15th, 2009 by Henry Robinson
http://www.cloudera.com/blog/2009/12/15/observers-making-zookeeper-scale-even-further/

Cloudera

As readers of our previous post on the subject will recall, ZooKeeper is a distributed coordination service suitable for implementing coordination primitives like locks and concurrent queues. One of ZooKeeper’s great strengths is its ability to operate at scale. Clusters of only five or seven machines can often serve the coordination needs of several large applications.

このタイトルに関連する以前のポストについて、読み手が思い出すように、 ZooKeeper とは協調のための基本要素である、ロックやコンカレント・キューの実装に適切な、分配協調サービスのことである。 ZooKeeper における重要な長所の 1つとして、様々なスケールで動作する能力がある。いくつかの大規模アプリケーションで必要とされる調整を、たった5台~7台のマシンからなるクラスタで充たすような状況も稀ではなくなる。

We’ve recently added a major new feature to ZooKeeper to improve its scalability even further – a new type of server called Observers. In this blog post, I want to motivate the need for such a feature, and explain how it might help your deployments scale even better. Scalability means many things to many people – here I mean that a system is scalable if we can increase the workload the system can handle by assigning more resources to the system – a non-scalable system might see no performance improvement, or even a degradation as the workload increases.

最近になってZooKeeper に加えられた、新しい主要な機能である、 Observers と呼ばれる新しいタイプのサーバーにより、そのスケーラビリティがさらに改善される。このブログ・ポストでは、こうした特徴に対する必要性を喚起し、また、どのようにしてディプロイメント・スケールを支援するのか、その点についても説明していきたい。スケーラビリティの意味は、人それぞれであるため、こ こでは以下のように定義する。つまり、より多くのリソースをシステムに割り当てることで、処理すべきワークロードを増大できるなら、そのシステムはスケーラブルである。反対に、スケーラブルでないシステムは、ワークロードの増大にあわせてパフォーマンスを改善できない状況というより、低下させる状況に陥ってしまう。

To understand why Observers have an effect on ZooKeeper’s scalability, we need to understand a little about how the service works. Broadly speaking, every operation on a ZooKeeper cluster is either a read or a write operation. ZooKeeper makes sure that all reads and all writes are observed by every client of the system in exactly the same order, so that there’s no confusion about which operation happened first.

Observers が ZooKeeper のスケーラビリティを向上させる理由を理解するために、どのようにして、このサービスが機能するか、理解する必要がある。概括的に言って、ZooKeeper クラスタ上のすべてのオペレーションは、read あるいは write のオペレーションである。 ZooKeeperは、すべてのreadとwriteがシステム上の全てのクライアントから、完全に同じ順序で観察されることを保障するので、どのオペレーションが最初に行われたのか、混乱が起こることが無くなる。

Along with this strong consistency guarantee, ZooKeeper also promises high availability, which can loosely be interpreted to mean that it can withstand a significant number of machine failures before the service stops being available to clients. ZooKeeper achieves this availability in a traditional way – by replicating the data that is being written and read amongst a small number of machines so that if one fails, there are others ready to take over without the client being any wiser.

こうした強固な一貫性の保証とともに、ZooKeeper は高可用性を約束する。それは、クライアントにとってサービスが利用できなくなる前に、マシンにおける相当数の障害に耐えるレベルを、大まかに解釈できるからである。 ZooKeeper は、以下の伝統的な手法で可用性を達成する。 つまり、read/write されたデータを何台かのマシンにリプリケートし、1つの失敗が生じた場合に、クライアントの能力に依存することなく、処理を達成するための別ルートを用意しておく。

However, these two properties – consistency and availability – are hard to achieve together, as now ZooKeeper must make sure that every replica in its cluster agrees on the series of read and write operations. It does this by using a consensus protocol. Simplifying greatly, this protocol operates by having a designated Leader propose a new operation to all the other servers, all of whom vote and respond back to the Leader. Once the Leader has gathered more than half the votes outstanding from the other servers, it can deduce that the vote has passed, and sends a further message telling the servers to go ahead and commit the operation to their memory.

こうした強固な一貫性の保証とともに、 ZooKeeper は高可用性を約束する。この高可用性とは、大まかに解釈すれば、相当数のマシンに障害が起きるまでは、クライアントからはサービスが利用可能だということである。ZooKeeper は、以下の伝統的な手法で可用性を達成する。 つまり、read/write されたデータを何台かのマシンにリプリケートし、1台に障害が生じた場合に、クライアントの能力に依存することなく、他のマシンが処理を代行できるのである。

This data flow is illustrated, from start to finish as the client sees it, in the diagram below. The client proposes a value to the server it is connected to. The server then relays that to the Leader, which initiates the consensus protocol, and once the original server has heard from the Leader it can relay its answer back to the client.

以下の図におけるデータ・フローは、クライアントから start と finish を見通すイメージを示す。 クライアントは、自身が接続するサーバーに対して value を提示する。続いてサーバーは、コンセンサスのプロトコルを開始する Leader に中継する。また、オリジナル・サーバーは、Leader からの返信を受け取った直後に、クライアントに対する返信を中継することができる。

Observers ZooKeeper 1

Figure 1: Simplified Write Request Flow

The need for Observers arises from the observation (no pun intended!) that ZooKeeper servers are playing two roles in this protocol. They accept connections and operation requests from clients, and also vote upon the result of these operations. These two responsibilities stand in opposition to each other when it comes to scaling ZooKeeper. If we wish to increase the number of clients attached to a ZooKeeper cluster (and we are often considering the case with 10000 or more clients), then we have to increase the number of servers available to support those clients. However, we can see from the description of the consensus protocol, that increasing the number of servers can place pressure on the performance of the voting part of the protocol. The Leader has to wait for at least half of the machines in the cluster to respond with a vote. The chance of one of these machines running slowly and holding up the entire vote process therefore gets bigger, and the performance of the voting step can decrease commensurately. This is something that we have seen in practice – as the size of the ZooKeeper cluster gets bigger, throughput of voting operations goes down.

Observers の必要性は、このプロトコルにおいて、ZooKeeper サーバーが 2つの役割を担うという観察結果(observation というダジャレではない!)から生じている。それらは、クライアントからの接続と処理のリクエストを受け入れ、それらの処理結果について vote を行う。 この 2つの責任は、ZooKeeper をスケーリングする際にも、相互に代役を務める。 もし、ZooKeeper クラスタ(10,000 以上のクライアント数を想定)にアタッチされるクライアント数を増やすなら、それらのクライアントをサポートするために、利用することが可能なサーバー数を増やす必要がある。しかし、サーバー数を増やすことが、このプロトコルの vote 部分のパフォーマンスに圧力をかける可能性について、コンセンサスのプロトコル記述に基づいて確認できる。少なくとも、クラスタ内の半分のマシンが vote に反応するまで、Leader は待たなければならない。したがって、これらのマシンの 1つに速度の低下が見られ、また、すべての vote プロセスを渋滞させる可能性が高まり、そのサイズも大きくなると、vote ステップのパフォーマンスは相応に低下する。 つまり、ZooKeeper クラスタ・サイズが大きくなるにつれて、vote オペレーションのスループットが低下していくことが、私たちのプラクティスで確認されている。

So there is a tension between our desire to scale the number of clients, and our desire to keep performance reasonable in terms of throughput. To decouple this tension, we introduced non-voting servers called Observers to the cluster. Observers can accept client connections, and will forward write requests to the Leader. However, the Leader knows not to ask the Observers to vote. Instead, Observers takes no part in the voting process, but instead are informed about the result of the vote in Step 3 along with all the other servers.

そのため、 クライアント数のスケールを調整したいいう望みと、スループットに関して適切なパフォーマンスを維持したいという望みの間に、張力が生じている。この張力を吸収するために、Observers と呼ばれる non-voting サーバーをクラスタに導入した。 Observers はクライアント接続の受け入れに対応し、write リクエストを Leader に転送することになる。 しかし、Leader は Observers に対して、vote を依頼しないことを知っている。Observers は vote のプロセスに参加しないが、それに代えて、すべての他のサーバーとともに、Step 3 の vote 結果を通知される。

This simple extension opens up new vistas of scalability to ZooKeeper. We may now add as many Observers as we like to the cluster without dramatically affecting write throughput. The scaling is not absolutely perfect – there is one step in the protocol (the ‘inform’ step) that is linear in the number of servers to inform, but the serial overhead of this step is extremely low. We would expect to hit other bottlenecks in the system before the cost of sending an inform packet to every server dominates the throughput performance of a ZooKeeper cluster.

このシンプルな拡張により、ZooKeeper のスケーラビリティについて、新しい視界が開ける。つまり、write スループットに大幅な影響を与えることなく、必要とされるクラスタの分だけ、Observers を追加できるだろう。このスケーリングは、絶対的な完ぺきさをもたらすものではない。 つまり、通知を行うサーバーの数に対して、リニアに対応するプロトコルのワン・ステップ(inform ステップ)になるが、このステップの連続的なオーバーヘッドは極めて低くなる。 すべてのサーバーに対する通知パケットを送信するためのコストが、ZooKeeper クラスタにおけるスループットのパフォーマンスを独占する前に、システム上の他のボトルネックに遭遇すると予期したい。

Observers ZooKeeper 2 Figure 2: Observers Write Throughput Benchmark

Figure 2 shows the results of one microbenchmark. The vertical axis measures the number of synchronous write operations per second that I was able to issue from a single client (a fully tuned ZooKeeper installation can significantly more operations per second – it’s the relative size of the bars we’re interested in here) . The horizontal axis denotes the size of the ZooKeeper cluster used. The blue bars are ZooKeeper clusters where every server is a voting server, where the green bars are ZooKeeper clusters where all but three servers are Observers. The chart shows that write performance stays approximately constant as we scale out the number of Observers, but falls off dramatically if we expand the size of the voting cluster. This is a win for Observers!

Figure 2 が示すのは、簡単なベンチマークの結果である。タテ軸は、シングル・クライアントから発行が可能な、毎秒あたりの同期 write 数の測定値である(ZooKeeper のインストレーションを完全に調整すると、毎秒あたりの処理量が大幅に増大する。つまり、ここでのバーの長さに影響することになる)。ヨコ軸は、使用された ZooKeeper クラスタ・サイズを示す。青いバーは、すべてのサーバーが vote サーバーの場合の ZooKeeper クラスタであり、緑のバーは、3つのサーバーに Observers を加えた場合の ZooKeeper クラスタを示す。 このチャートからは、Observers 数をスケールアウトするときに write パフォーマンスが安定し、また、voting クラスタのサイズを拡大するときには write パフォーマンスが大幅に低下する状況が読み取れる。 この点が、Observer にとっての勝利である!

ーーーーー 玉川竜司さんによる改編 : 2010/2/7

ZooKeeper の予備知識もないのに、Observers 関連の抄訳なので、、、 かなりヨロヨロです。 訳語の問題や勘違いのご指摘など、よろしくお願いします。 ーーー A.C.

<続く>

<関連>
Observers と ZooKeeper _2
The Anatomy of Hadoop I/O Pipeline _1

Observers と ZooKeeper _2

Posted in Cloudera, Hadoop, Hadoop I/O Pipeline, Parallel by Agile Cat on January 4, 2010

Observers: Making ZooKeeper Scale Even Further
December 15th, 2009 by Henry Robinson
http://www.cloudera.com/blog/2009/12/15/observers-making-zookeeper-scale-even-further/

Cloudera

Observers scale read performance too

Scaling the number of clients is an important use case for Observers, but in fact there are significant other advantages to having them in your cluster.

クライアント数のスケーリングが、Observers における重要なユースケースであるが、実際にそれらをクラスタ内に持つことで、他の重要なアドバンテージが生じる。

As an optimisation, ZooKeeper servers may serve read requests out of their local data stores, without going through the voting process. This puts read requests at a very slight risk of a ‘time-travel’ read, where an earlier value is read after a later value; but this only happens when a server fails. Indeed, in that case a client may issue a ’sync’ request that ensures the next value it reads is the most up-to-date.

クライアント数のスケーリングが、Observers における重要なユースケースであるが、実際にそれらをクラスタ内に持つことで、他の重要なアドバンテージが生じる。最適化の際に、 ZooKeeper サーバーは voting プロセスを介することなく、ローカルのデータストアにおける read リクエストを配分するだろう。それにより、read する値の順序が逆転する際の、 ‘time-travel’ read のリスクが最小になるとも見積もられるが、そのこと自体は、サーバーが失敗するときに起きるだけである。実際に、このようなケースでは、次に read される値が最新であることを保証する ‘sync’ read リクエストを、クライアントが発行する可能性がある。

Therefore Observers are a big performance improvement for read-heavy workloads. Writes go through the standard voting path, and so, by the same argument as for client scalability, increasing the number of voting servers in order to serve more reads will have a detrimental effect on write performance. Observers allow us to decouple read performance from write performance. This meshes well with many use cases for ZooKeeper, where most clients issues few writes but many reads.

したがって、Obdervers の存在により、負荷の大きな read ワークロードにおける大幅なパフォーマンスの改善がもたらされる。スタンダードな voting パスを介した write を用いた、また、クライアント・スケーラビリティを考慮した同一の引数を用いた、さらに多くの read をサポートするための voting サーバーの増加は、write パフォーマンスに弊害をもたらすだろう。Observers により、write パフォーマンスと read パフォーマンスの分離が実現される。つまり、大半のクライアントが、少量の write と大量の read を発行するという、ZooKeeper に多くみられるユースケースと調和する。

Observers enable WAN configurations

There’s yet more that Observers can do for you. Observers are excellent candidates for connecting clients to ZooKeeper across wide-area networks. There are three main reasons for this. In order to get good read performance, it is necessary to have your clients relatively near to a server so that round-trip latencies aren’t too high. However, splitting a ZooKeeper cluster between two datacenters is a very problematic design, due to the fact that ZooKeeper works best when the voting servers are able to communicate with each other at low latency – otherwise we get the slowdown problem I described earlier.

Observers は、さらに多くのものをもたらす。広域ネットワークをまたいで、ライアントと ZooKeeper を接続する際にも、Observers は有力な候補となる。これには、3 つの主な理由がある。Read のパフォーマンスを向上させるためには、ラウンドトリップのレイテンシーを過大にしないように、サーバーとクライアントを相対的に近づける必要がある。しかし、ZooKeeper クラスタを2つのデータセンター間で分けることは、きわめて問題の多いデザインとなる。なぜなら、voting サーバーが低いレイテンシーで相互に通信できるときに、ZooKeeper は適切に機能するという事実があるからだ。そうしないと、前述のようなスローダウンに遭遇してしまう。

Observers can be placed in every datacenter that needs to access a ZooKeeper cluster. Therefore the voting protocol doesn’t take place across a high-latency intra-datacenter link, and performance is improved. Also, two fewer messages that are sent between Observers and the Leader during the voting process than between a voting server and the Leader. This can help ease bandwidth requirements on write-heavy workloads from remote datacenters.

Observers は、ZooKeeper クラスタにアクセスする必要がある、すべてのデータセンターに配置できる。 したがって voting プロトコルが、データセンター内でレイテンシーの大きなリンクを引き起こすことはなく、また、パフォーマンスも改善されている。さらに、voting プロセスの際に、Oservers と Leader の間で交わされる 2つのメッセージは、voting サーバーと Leader間のメッセージと比べて少ない。それは、リモート・データセンターが要求する、過大な write ワークロードにおける、帯域幅に関する要件を緩和する。

Finally, since Observers can fail without affecting the voting cluster itself, there’s no risk to the availability of the service if the link between datacenters is severed. This is much more likely than the loss of internal rack-to-rack connections, so it is beneficial not to rely on such a link.

究極的には、voting クラスタ自身に影響を与えることなく、Observers を失敗させることができるため、データセンター間のリンクが切断されれも、対象となるサービスの可用性にリスクは生じない。 それは、内部における rack-to-rack 接続のダウンよりも起こり得ることであるため、こうりたリンクに依存しないというメリットがもたらされる。

How to get started with Observers

Observers are not yet part of a ZooKeeper release, so in order to start working with them you will have to download the source code from the Subversion trunk. The following is excerpted from the Observers user guide, found in docs/zooKeeperObservers.html in the source distribution.

現時点において、Observers は ZooKeeper リリースの一部となっていないため、それらを利用する場合には、Subversion トランクからソースコードをダウンロードする必要がある。 以下は、ソース・ディストリビューションの docs/zooKeeperObservers.html における、Observers ユーザーガイドからの抜粋である。

How to use Observers

Note that until ZOOKEEPER-578 is resolved, you must set electionAlg=0 in every server configuration file. Otherwise an exception will be thrown when you try to start your ensemble.

The reason: because Observers do not participate in leader elections, they rely on voting Followers to inform them of changes to the Leader. Currently, only the basic leader election algorithm starts a thread that responds to requests from Observers to identify the current Leader. Work is in progress on other JIRAs to bring this functionality to all leader election protocols.

Setting up a ZooKeeper ensemble that uses Observers is very simple, and requires just two changes to your config files. Firstly, in the config file of every node that is to be an Observer, you must place this line:

peerType=observer

This line tells ZooKeeper that the server is to be an Observer. Secondly, in every server config file, you must add :observer to the server definition line of each Observer. For example:

server.1:localhost:2181:3181:observer

This tells every other server that server.1 is an Observer, and that they should not expect it to vote. This is all the configuration you need to do to add an Observer to your ZooKeeper cluster. Now you can connect to it as though it were an ordinary Follower. Try it out, by running:

bin/zkCli.sh -server localhost:2181

where localhost:2181 is the hostname and port number of the Observer as specified in every config file. You should see a command line prompt through which you can issue commands like ls to query the ZooKeeper service.

Future work

There’s more to be done with the Observers feature. In the short term we are working on making Observers fully compatible with all leader election algorithms that ship with ZooKeeper – we expect this to be finished within the next few days. Longer term, we are hoping to investigate performance optimisations such as batching and off-line reads for Observer-based clusters, to take advantage of the fact that Observers have no strict latency requirement to meet unlike a normal ZooKeeper server.

Observers の機能に関しては、さらに推進すべきものがある。 短期的には、ZooKeeper と共に出荷される、すべての leader election アルゴリズムとの完全な互換性を、Observers に持たせるための作業に注力する。あと数日の作業で、それが完了すると予想している。長期的には、観察者ベースのクラスタにおける、バッチ処理やオフライン read などの、パフォーマンスの最適化について調査したいと考えている。それが、標準的な ZooKeeper サーバーではあり得ない、レイテンシーに関する要件を緩和された、Observers のアドバンテージを証明する。

We hope that Observers will make it into the release of ZooKeeper 3.3.0, due early next year. We would be delighted to hear your feedback, either on the mailing lists, or via direct e-mail. ZooKeeper is always looking for contributors, and we’ve got plenty of interesting problems to solve, so do get in contact if you’d like to get involved and I’d be happy to help you get started.

2010 年の早期に予定されている、ZooKeeper 3.3.0のリリースに Observers を加えたいと考えている。メーリング・リストや、ダイレクトなメールによる、フィードバックを期待している。ZooKeeper は、常にコントリビュータを求めている。そして、解決すべきたくさんの興味深い課題を抱えている。そこに関与してもらえるなら、連絡をして欲しい。 あなたのスタートアップに、私は喜んで手を貸したい。

おわり

<関連>
Observers と ZooKeeper _1
The Anatomy of Hadoop I/O Pipeline _1

 

The Anatomy of Hadoop I/O Pipeline _1

Posted in Big Data, Hadoop I/O Pipeline by Agile Cat on September 25, 2009

 

YDN Hadoop and Distributed Computing at Yahoo!

Hadoop and Distributed Computing at Yahoo!

August 27, 2009

From <http://developer.yahoo.net/blogs/hadoop/>

今日から何回かに分けて、Hadoop I/O Pipeline に関する対訳を掲載していきます。この領域に関するドキュメントは初めてのものらしく、読んでいても解らないことばかりです。 訳について、問題などありましたら、また、こう読むべきみたいなアドバイスがありましたら、ぜひコメントをつけてください。 Windows Azure + Dryad につながる知識が共有できればと思います。 --- A.C.

Introduction

In a typical Hadoop MapReduce job, input files are read from HDFS. Data are usually compressed to reduce the file sizes. After decompression, serialized bytes are transformed into Java objects before being passed to a user-defined map() function. Conversely, output records are serialized, compressed, and eventually pushed back to HDFS. This seemingly simple, two-way process is in fact much more complicated due to a few reasons:

Hadoop MapReduce における一般的なジョブでは、入力ファイルは HDFS からが読み込まれる。 そのファイル・サイズを低減するために、通常ではデータの圧縮と解凍が行われた後に、シリアライズさたバイト列が Java オブジェクトに変換され、ユーザー定義された map () 関数に引き渡される。 その反対に、出力レコードにもシリアライズと圧縮が行われ、最終的に HDFS の中にプッシュバックされる。この双方向のプロセスはシンプルに見えるが、いくつかの理由により、実際には複雑なものとなる:

  • Compression and decompression are typically done through native library code.
  • End-to-end CRC32 checksum is always verified or calculated during reading or writing.
  • Buffer management is complicated due to various interface restrictions.

一般的には、ネイティブのライブラリ・コードを介して実行される圧縮と解凍

一般的には、Read/Write の際に常に検証/計算される End-to-End CRC32 チェックサム

多様な制約事項により、複雑化していくバッファ管理

In this blog post, I attempt to decompose and analyze the Hadoop I/O pipeline in detail, and explore possible optimizations. To keep the discussion concrete, I am going to use the ubiquitous example of reading and writing line records from/to gzip-compressed text files. I will not get into details on the DataNode side of the pipeline, and instead mainly focus on the client-side (the map/reduce task processes). Finally, all descriptions are based on Hadoop 0.21 trunk at the time of this writing, which means you may see things differently if you are using older or newer versions of Hadoop.

このブログ・ポストで試みるのは、Hadoop I/O パイプラインに関する詳細な分解と分析であり、また、実現可能な最適化の追求である。 その説明を具体的にしていくために、gzip 圧縮されたテキスト・ファイルを対象として、ライン・レコードを Read/Write するサンプルを、随所で利用していく。 また、パイプラインの DataNode 側の詳細には入らない代わりに、主としてクライアント側(map/reduce タスク・プロセス)に焦点を合わせる。この執筆は Hadoop 0.21 の主要部分に基づいているため、あなたの利用しているバージョンとの間で、若干の違いが生じるかもしれない。

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

The Anatomy of Hadoop I/O Pipeline _2

Posted in Hadoop I/O Pipeline by Agile Cat on September 24, 2009

From <http://developer.yahoo.net/blogs/hadoop/>

Reading Inputs

Figure 1 illustrates the I/O pipeline when reading line records from a gzipped text file using TextInputFormat. The figure is divided in two sides separated by a thin gap. The left side shows the DataNode process, and the right side the application process (namely, the Map task). From bottom to top, there are three zones where buffers are allocated or manipulated: kernel space, native code space, and JVM space. For the application process, from left to right, there are the software layers that a data block needs to traverse through. Boxes with different colors are buffers of various sorts. An arrow between two boxes represents a data transfer or buffer-copy. The weight of an arrow indicates the amount of data being transferred. The label in each box shows the rough location of the buffer (either the variable that references to the buffer, or the module where the buffer is allocated). If available, the size of a buffer is described in square brackets. If the buffer size is configurable, then both the configuration property and the default size are shown. I tag each data transfer with the numeric step where the transfer happens:

Figure 1 が示すのは、gzip されたテキスト・ファイルから、TextInputFormat を使ってラインレコードを読み込むときの I/O パイプラインである。 この図は、左右の 2つのパートに分割されている。 左側は DataNode プロセスを示し、右側はアプリケーション・プロセス(すなわち Mapタスク)を示す。ボトムらトップへ向けて、Kernel/Native/JVM の 3つのゾーンがあり、それぞれにバッファが割り当てられ操作される。 アプリケーション・プロセスに関しては、左から右へ向けて、データ・ブロックが横断しなくてはならないソフトウェア・レイヤが存在する。それぞれのカラーで色づけされたボックスは、各種のソートに用いるバッファである。ボックス間を結ぶ矢印ラインは、データ転送あるいはバッファコピーを示す。矢印ラインの太さは、それぞれのデータ量を示す。それぞれのボックス内のラベルは、バッファの大まかなロケーション(バッファに参照する変数あるいは、バッファが割り当てられるモジュール)を示す。記載が可能な場合には、バッファ・サイズをカッコ内に記している。バッファ・サイズをコンフィグレーションできる場合には、そのプロパティとデフォルト・サイズの双方を示す。データが転送される際の、それぞれのステップに対して、以下のタグを付けている:

Hadoop IO_1


Figure 1: Reading line records from gzipped text files.

  1. Data transferred from DataNode to MapTask process. DBlk is the file data block; CBlk is the file checksum block. File data are transferred to the client through Java nio transferTo (aka UNIX sendfile syscall). Checksum data are first fetched to DataNode JVM buffer, and then pushed to the client (details are not shown). Both file data and checksum data are bundled in an HDFS packet (typically 64KB) in the format of: {packet header | checksum bytes | data bytes}.
  2. Data received from the socket are buffered in a BufferedInputStream, presumably for the purpose of reducing the number of syscalls to the kernel. This actually involves two buffer-copies: first, data are copied from kernel buffers into a temporary direct buffer in JDK code; second, data are copied from the temporary direct buffer to the byte[] buffer owned by the BufferedInputStream. The size of the byte[] in BufferedInputStream is controlled by configuration property "io.file.buffer.size", and is default to 4K. In our production environment, this parameter is customized to 128K.
  3. Through the BufferedInputStream, the checksum bytes are saved into an internal ByteBuffer (whose size is roughly (PacketSize / 512 * 4) or 512B), and file bytes (compressed data) are deposited into the byte[] buffer supplied by the decompression layer. Since the checksum calculation requires a full 512 byte chunk while a user’s request may not be aligned with a chunk boundary, a 512B byte[] buffer is used to align the input before copying partial chunks into user-supplied byte[] buffer. Also note that data are copied to the buffer in 512-byte pieces (as required by FSInputChecker API). Finally, all checksum bytes are copied to a 4-byte array for FSInputChecker to perform checksum verification. Overall, this step involves an extra buffer-copy.
  4. The decompression layer uses a byte[] buffer to receive data from the DFSClient layer. The DecompressorStream copies the data from the byte[] buffer to a 64K direct buffer, calls the native library code to decompress the data and stores the uncompressed bytes in another 64K direct buffer. This step involves two buffer-copies.
  5. LineReader maintains an internal buffer to absorb data from the downstream. From the buffer, line separators are discovered and line bytes are copied to form Text objects. This step requires two buffer-copies.

1: DataNode から MapTask プロセスへ向けた、データの転送。 DBlk はファイルのデータ・ブロックであり、CBlk はファイルのチェックサム・ブロックである。ファイル・データは、Java nio transferTo (UNIX sendfile syscall のこと)を介してクライアントに転送される。最初にチェックサム・データが DataNode JVM バッファにフェッチされ、続いてクライアント(細部は示していない)にプッシュされる。 ファイルとチェックサムのデータが {packet header | checksum bytes | data bytes} のフォーマットにより、HDFS パケット(一般は 64kb)にまとめられる。

2: ソケットから受信されたデータは、おそらく、カーネルに対する syscalls 数を減らすことを目的として、 BufferedInputStream 内のバッファに入れられる。 実際には、2つのバッファを用いたコピーが行われる。データは最初に、JDK コードによるカーネル・バッファから、テンポラリ・バッファへダイレクトにコピーされる。続いて、テンポラリ・バッファ内のデータは、BufferedInputStream が管理する byte [] バッファにコピーされる。 BufferedInputStream の byte [] のサイズは、コンフィグレーション・プロパティである "io.file.buffer.size" により制御されるが、デフォルトは 4K となっている。 私たちの実運用環境では、このパラメータを128K にカスタマイズした。

3: BufferedInputStream を介することで、チェックサム・バイトは(その大まかなサイズは(PacketSize / 512 * 4)あるいは 512 B )内部の ByteBuffer 内に保存される。そして、ファイル・バイト(圧縮されたデータ)は、解凍レイヤが提供する byte [] バッファ内に堆積していく。 ユーザー・リクエストはチャンクの境界線をそろえないかもしれないが、チェックサムの計算ではフルの 512 byte チャンクが必要とされるため、ユーザーが提供する byte [] バッファ内に、部分的なチャンクをコピーしてしまう前に、512 byte [] バッファが使用される。 さらに、(FSInputChecker API が必要とする場合には)512 byte 単位のバッファに、データがコピーされることに注意すべきだ。 最終的に、FSInputChecker によるチェックサム検査を実現するために、すべてのチェックサム・バイトは 4 byte 配列内にコピーされる。 全体的に、このステップでは、余分のバッファ・コピーが生じる。

4: 解凍レイヤでは、DFSClient レイヤからのデータを受信するために、byte [] バッファが使用される。 DecompressorStream では、byte [] バッファから 64K のダイレクト・バッファにデータをコピーし、データを解凍するためにネイティブ・ライブラリ・コードをコールし、解凍されたデータを別の 64K ダイレクト・バッファにストアする。 このステップでは、2つのバッファ・コピーが生じる。

5: LineReader はダウンストリームからのデータを取り入れるために、内部バッファを保持する。 バッファから、ライン・セパレータが発見され、ライン・バイトが Text オブジェクトを形成するためにコピーされる。このステップでは、2つのバッファ・コピーが必要となる。

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

The Anatomy of Hadoop I/O Pipeline _3

Posted in Hadoop I/O Pipeline by Agile Cat on September 23, 2009

From <http://developer.yahoo.net/blogs/hadoop/>

Optimizing Input Pipeline

Adding everything up, including a "copy" for decompressing bytes, the whole read pipeline involves seven buffer-copies to deliver a record to MapTask’s map() function since data are received in the process’s kernel buffer. There are a couple of things that could be improved in the above process:

バイト列を解凍するための ”Copy” も含めて、すべてを合算すると、データがプロセスのカーネル・バッファに受信されてから、MapTask の map () 関数にレコードを受け渡すまでに、Read パイプラインの全体で7つのバッファ・コピーを伴うことになる。 上記のプロセスにおいて、改善が可能な2つのものがある:

  • Many buffer-copies are needed simply to convert between direct buffer and byte[] buffer.
  • Checksum calculation can be done in bulk instead of one chunk at a time.

大量のバッファ・コピーが、ダイレクト・バッファと byte[] バッファ間での、シンプルな変換を必要とする。

チェックサムの計算は、チャンクごとではなく、バルクでの一括処理が可能である。

Hadoop IO_2

Figure 2: Optimizing input pipeline.

Figure 2 shows the post-optimization view where the total number of buffer copies is reduced from seven to three:

Figure 2 が示すのは、バッファ・コピーの回数を 7回から 3回に減らす場合の、ポスト・オプティマイゼーションの様子である:

  1. An input packet is decomposed into the checksum part and the data part, which are scattered into two direct buffers: an internal one for checksum bytes, and the direct buffer owned by the decompression layer to hold compressed bytes. The FSInputChecker accesses both buffers directly.
  2. The decompression layer deflates the uncompressed bytes to a direct buffer owned by the LineReader.
  3. LineReader scans the bytes in the direct buffer, finds the line separators from the buffer, and constructs Text objects.

1: 入力パケットは、チェックサム・バイトのためのインターなものと、圧縮されたバイト列を保持する解凍レイヤが管理するダイレクト・バッファで構成される、2つのダイレクト・バッファの中で、チェックサム部分とデータ部分に分解される。 FSInputChecker は、双方のバッファに対して、ダイレクトにアクセスする。

2: この解凍レイヤは、LineReader が管理するダイレクト・バッファへ向けて、圧縮されていないバイト列を収縮させる。

3: LineReader は、ダイレクト・バッファ内のバイト列をスキャンし、バッファからライン・セパレータを探し出し、Text オブジェクトを構成する。

 

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

The Anatomy of Hadoop I/O Pipeline _4

Posted in Hadoop I/O Pipeline by Agile Cat on September 22, 2009

From <http://developer.yahoo.net/blogs/hadoop/>

Writing Outputs

Now let’s shift gears and look at the write-side of the story. Figure 3 illustrates the I/O pipeline when a ReduceTask writes line records into a gzipped text file using TextOutputFormat. Similar to Figure 1, each data transfer is tagged with the numeric step where the transfer occurs:

ここで視点を変えて、Write 側からのストーリーを追いかけていく。 Figure 3 が示すのは、ReduceTask が TextOutputFormat を用いて、gzip されたテキスト・ファイルにライン・レコードを書き込む様子である。 Figure 1 のように、それぞれのデータ転送について、そのステップ順を示すタグを付けている:

Hadoop IO_3 Figure 3: Writing line records into gzipped text files.

  1. TextOutputFormat’s RecordWriter is unbuffered. When a user emits a line record, the bytes of the Text object are copied straight into a 64KB direct buffer owned by the compression layer. For a very long line, it will be copied to this buffer 64KB at a time for multiple times.
  2. Every time the compression layer receives a line (or part of a very long line), the native compression code is called, and compressed bytes are stored into another 64KB direct buffer. Data are then copied from that direct buffer to an internal byte[] buffer owned by the compression layer before pushing down to the DFSClient layer because the DFSClient layer only accepts byte[] buffer as input. The size of this buffer is again controlled by configuration property "io.file.buffer.size". This step involves two buffer-copies.
  3. FSOutputSummer calculates the CRC32 checksum from the byte[] buffer from the compression layer, and deposits both data bytes and checksum bytes into a byte[] buffer in a Packet object. Again, checksum calculation must be done on whole 512B chunks, and an internal 512B byte[] buffer is used to hold partial chunks that may result from compressed data not aligned with chunk boundaries. Checksums are first calculated and stored in a 4B byte[] buffer before being copied to the packet. This step involves one buffer-copy.
  4. When a packet is full, the packet is pushed to a queue whose length is limited to 80. The size of the packet is controlled by configuration property "dfs.write.packet.size" and is default to 64KB. This step involves no buffer-copy.
  5. A DataStreamer thread waits on the queue and sends the packet to the socket whenever it receives one. The socket is wrapped with a BufferedOutputStream. But the byte[] buffer is very small (no more than 512B) and it is usually bypassed. The data, however, still needs to be copied to a temporary direct buffer owned by JDK code. This step requires two data copies.
  6. Data are sent from the ReduceTask’s kernel buffer to the DataNode’s kernel buffer. Before the data are stored in Block files and checksum files, there are a few buffer-copies in DataNode side. Unlike the case of DFS read, both file data and checksum data will traverse out of kernel, and into JVM land. The details of this process are beyond the discussion here and are not shown in the figure.

1: TextOutputFormat の RecordWriter はバッファリングされていない。 ユーザーがライン・レコードを発行するとき、Text オブジェクトのバイト列は、圧縮レイヤが管理する 64KB ダイレクト・バッファ内に、そのままコピーされる。きわめて長いラインの場合には、何回かに分けて、64KB ずつバッファにコピーされるだろう。

2: 圧縮レイヤがライン(あるいは長いラインの一部)を受信するときには必ず、ネイティブの圧縮コードがコールされ、そして、圧縮されたバイト列は別の 64KB ダイレクト・バッファにストアされる。 DFSClient レイヤは、byte [] バッファを単なる入力として受け入れるため、データが DFSClient レイヤにプッシュされる前に、圧縮レイヤが管理する内部の byte [] バッファへ向けたコピーが実行される。 このバッファのサイズは、コンフィグレーション・プロパティである "io.file.buffer.size" により制御される。このステップは、2つのバッファコピーを伴う。

3: FSOutputSummer は、圧縮レイヤから得られる byte [] バッファに基づいてCRC32 チェックサムを計算し、また、チェックサム・バイトとデータ・バイトの双方を Packet オブジェクト内の byte [] バッファに蓄積する。 チェックサム計算を、512B チャンク全体に対して再実施しななくてはならない。そして、チャンクの境界線が不揃いな圧縮データに起因するかもしれない、部分的なチャンクを保持するために、内部の 512B byte [] バッファが使用される。 チェックサムが最初に計算され、パケットにコピーされる前に、4B byte [] バッファにストアされる。 このステップで、1つのバッファ・コピーが生じる。

4: パケットがあふれるときには、長さが 80 に制限されたキューに、パケット自体がプッシュされる。 パケット・サイズはコンフィグレーション・プロパティ "dfs.write.packet.size” により制御されるが、そのデフォルトは 64KB となる。このステップは、バッファコピーを伴わない。

5: DataStreamer スレッドは対象となるキューの支配下にあり、何らかのデータを受信するときには必ず、ソケットへ向けてパケットを送信する。 このソケットは、BufferedOutputStream に取り込まれている。しかし、byte [] バッファはきわめて小さいため(512B 以上にはならない)、通常ではバイパスされる。 ただ、JDK コードによるい管理されるテンポラリなダイレクト・バッファに、データをコピーする必要が依然として残っている。 このステップでは、2つのデータ・コピーが必要となる。

6: ReduceTask のカーネル・バッファから DataNode のカーネル・バッファまで、データは送信される。 データが Block ファイルとチェックサム・ファイルにストアされる前に、 DataNode サイドで何回かのバッファ・コピーが行われる。 DFS Read のケースとは異なり、ファイル・データとチェックサム・データの双方が、カーネルを横断してJVM にたどり着く。 そのプロセスの詳細については、この図に示していない。

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

The Anatomy of Hadoop I/O Pipeline _5

Posted in Hadoop I/O Pipeline by Agile Cat on September 21, 2009

From <http://developer.yahoo.net/blogs/hadoop/>

Optimizing Output Pipeline

Overall, including the "copy" for compressing bytes, the process described above requires six buffer-copies for an output line record to reach ReduceTask’s kernel buffer. What could we do to optimize the write pipeline?

出力ライン・レコードが ReduceTask のカーネル・バッファにたどり着くためには、バイトを圧縮する "Copy” も含めた全体として、6つのバッファ・コピーが必要となる。この Write パイプラインを最適化するために、何ができるだろうか?

  • We can probably reduce a few buffer-copies.
  • The native compression code may be called less frequently if we call it only after the input buffer is full (block compression codecs like LZO already do this).
  • Checksum calculations can be done in bulk instead of one chunk at a time.

おそらく、いくつかのバッファ・コピーを省略できる。

入力バッファがフルになったときだけに、ネイティブのコードをコールするようにすれば(LZO などの圧縮コーデックでは実現済み)、その頻度を低減できる。

チェックサムの計算は、チャンクごとではなく、バルクでの一括処理が可能である。

Hadoop IO_4

Figure 4: Optimizing output pipeline.

Figure 4 shows how it looks like after these optimizations, where a total of four buffer-copies are necessary:

Figure 4 が示すのは、それらの最適化を行った後の、4回のバッファ・コピーが必要な場合の様子である:

  1. Bytes from a user’s Text object are copied to a direct buffer owned by the TextOutputFormat layer.
  2. Once this buffer is full, native compression code is called and compressed data is deposited to a direct buffer owned by the compression layer.
  3. FSOutputSummer computes the checksum for bytes in the direct buffer from the compression layer and saves both data bytes and checksum bytes into a packet’s direct buffer.
  4. A full packet will be pushed into a queue, and, in background, the DataStreamer thread sends the packet through the socket, which copies the bytes to be copied to kernel buffers.

1: ユーザーの Text オブジェクトから得られるバイト列は、TextOutputFormat レイヤが管理するダイレクト・バッファにコピーされる。

2: このバッファがフルになった後にネイティブの圧縮コードがコールされ、圧縮レイヤが管理するダイレクト・バッファに圧縮されたデータが蓄積される。

3: FSOutputSummer が、圧縮レイヤから得られたダイレクト・バッファ内のバイト列に対して、チェックサムを計算し、データ・バイトとチェックサム・バイトの双方を、パケットのダイレクト・バッファ内に保存する。

4: いっぱいになったパケットはキューにプッシュされ、カーネル・バッファにコピーされるべきバイト列をコピーするために、DataStreamer はバック・グラウンドにおいて、ソケットを介してパケットを送信する。

Conclusion

This blog post came out of an afternoon spent asking ourselves specific questions about Hadoop’s I/O and validating the answers in the code. It turns out, after combing through class after class, that the pipeline is more complex than we originally thought. While each of us is familiar with one or more components, we found the preceding, comprehensive picture of Hadoop I/O elucidating, and we hope other developers and users will, too. Effecting the optimizations outlined above will be a daunting task, and this is the first step toward a more performant Hadoop.

このブログ・ポストは、私たち自身で Hadoop の I/O に関する議論を行い、コードにおける妥当性を検査した結果である。クラスごとに綿密に調査した後に、このパイプ・ラインは、これまでに考えてきたよりも、さらに複雑であることが分かった。私たちの一人一人は、1つあるいは複数のコンポーネントに精通しているが、Hadoop I/O を説明するための包括的な図の重要性を認識した。他のデベロッパーやユーザーも、そう考えてくれると期待している。前述の効果的な最適化の概要は、困難なタスクになるだろう。そして、このことが、効果的な Hadoop へ向けた最初のステップとなる。

– Hong Tang

From <http://developer.yahoo.net/blogs/hadoop/>

The Anatomy of Hadoop I/O Pipeline _1
The Anatomy of Hadoop I/O Pipeline _2
The Anatomy of Hadoop I/O Pipeline _3
The Anatomy of Hadoop I/O Pipeline _4
The Anatomy of Hadoop I/O Pipeline _5

%d bloggers like this: