Finagle future and ...

Twitter seems to have moved their infrastructure to NodeJS from Scala.

Today we moved all of Twitter’s mobile web traffic (that’s like, a lot) to our new web stack – Node.js, Express, React PWA.

I’m wondering they will keep supporting finagle in future…

gimg - Terminal Image Viewer

At the beginning of this year, I decided to learn some new programming languages. For now, these are Go and Haskell. Go is obtaining popularity among system programming and development tools like Docker. Haskell seems to be good for learning another programming paradigm, pure functional programming.

Anyway the best way to learn a new programming language is reading and writing. So I created a tool in Go lang.

This is a terminal image viewer. You can show image file in console without any specific softwares.

For example, you can show the original image

twitter_icon

$ gimg some.jpg

in console.

sample

It shows one pixel in one character space as default. If the image size is larger than your terminal window size, you can specify the width.

$ gimg -size 10 some.jpg

resized

BTW, the frog image is my Twitter icon.

bzip2 and MAPREDUCE-13270

少し前だけれど、HADOOP-13270について書こうと思う。

Hadoop MapReduceフレームワークはInputFormatというインターフェースを通じて様々なフォーマットのファイルを読むことができる。単純なテキストファイルを一行ずつ読んだり、Avroなどのシリアライゼーションフォーマットやgzipなどの圧縮フォーマットもサポートしている。

InputFormatはgetSplitsというメソッドで一つのファイルを複数のSplitという単位に分割することが要求される。

  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

1つのMapperは1つのSplitを読んで処理を行うため、このSplitを作る方法をInputFormatが知らないと巨大なファイルを複数のMapperで分散して処理できないのでSplitの作り方はMapReduceを効率的に走らせるために鍵ともいえる。 通常はデータのローカリティを活かすために、1つのSplitはHDFSのBlockに対応づけられる。つまりブロックサイズが128MBだとするとこの大きさで各Splitが作られることになる。このとき大事なことが各Splitから読み出されるデータはそれだけで読めるようになっていないといけない。どういうことかというと

record(N)
record(N+1)
record(N+2)
---- 128MB ----
record(N+3)
record(N+4)
record(N+5)

上記のように各レコードが1行ずつ書かれたファイルがあったとして、ファイルの先頭から読んでいってrecord(N+2)とrecord(N+3)の間がちょうど128MBになったとする。この場合1つめのSplitの終わりはrecord(N+2)で次のSplitのはじめのレコードはrecord(N+3)になる。ただレコードの区切りがちょうどBlockの区切りになっているとは限らないので、もし下記のようになっていたとすると

record(N)
record(N+1)
reco
---- 128MB ----
rd(N+2)
record(N+3)
record(N+4)
record(N+5)

単純にBlockサイズ区切りにすると読めないレコードがでてくる。Split同士は全く別のMapper(おそらく別のマシンで動く)で読まれるのでデータが正しく読めていないのかどうかもMapperは知らない。 これではまずいのでInputFormatは正しくレコードが区切られるように微調整をしてSplitを作ってくれる。

bzip2

ところがこの様にSplitを作ろうとすると圧縮フォーマットの場合には問題がでてくる。gzipなどの圧縮フォーマットの場合、1つの圧縮されたファイルを読めるように解凍するためにはファイルがまるまる必要になる。これはInputFormatのSplitと相性が悪い。なぜならそれだけで読める部分に分割するということが原理的にできないからだ。gzipで圧縮されたフォーマットをMapperで読めるようにするためには1ファイルすべてを読む必要がある。例えば1.2GBくらいのファイルがあったとすると10blockになり、10Mapperくらいで読んでほしいが、1MapperはすべてのBlockを読まないといけない。これではデータのローカリティを犠牲になっており、大きなファイルをだととても効率が悪くなる。こういったフォーマットをUnsplittableという。

実は圧縮フォーマットの中でもSplittableなものがある。その1つがbzip2だ。bzip2は同期マーカと呼ばれる48ビットの円周率の近似値を用いてファイルを分割可能な単位で圧縮している。

bzip2

上記の図でSplitと書かれている部分は圧縮されているが、それだけでまた解凍が可能だ。つまりこのSplitを読んだMapperは読んだ部分をbzip2のuncopressorにかければ通常の非圧縮ファイルと同じように読むことができる。並列処理を行う上でもデメリットはない。(もちろん各フォーマットで圧縮効率や、圧縮スピードは違うのでアプリケーションにあったフォーマットを選ぶのがよいと思う)

HADOOP-13270

前置きが長くなってしまったけれどHADOOP-13270。これはある特定のサイズのbzip2ファイルを特定のSplitサイズで読もうとするとデータが重複するというバグだった。

Unit test TestTextInputFormat.testSplitableCodecs() failed when the seed is 1313094493. Stacktrace java.lang.AssertionError: Key in multiple partitions. at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.hadoop.mapred.TestTextInputFormat.testSplitableCodecs(TestTextInputFormat.java:223)

bzip2は使ってないけれど、データが欠損したり重複するとデータ分析の意味がなくなるし問題として面白そうなのでなんとかしてみようと思い調べてみた。

問題はBzip2Codecというクラスにあった。このクラスは与えられたoffsetから直近のMarkerを探してそこを始点とするInputStreamを作ってくれる。元々の実装はこうなっていた。

public SplitCompressionInputStream createInputStream(InputStream seekableIn,
    Decompressor decompressor, long start, long end, READ_MODE readMode)
		throws IOException {

  // startから次の場所にあるデータを読める直近のmarkerを探したい

  // ...
  // ファイルの先頭には特別に"BZh9"というマジックが書かれている。これに48bitのMarkerを足すので
  // 合計10bytesの探すべきデータがある。
  final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
		  CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
  long adjStart = 0L;

  // 読むべきMarkerを見つけられる位置までseekして戻る
  adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION));
  ((Seekable)seekableIn).seek(adjStart);

  // 次に読むべきMarker自体はBZip2CompressionInputStreamが見つけてくれる
  SplitCompressionInputStream in =
    new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
  // ...
}

与えられたstartの位置まで(を含んだ圧縮ブロック)は読んでいるので次のMarkerからのデータを読みたい。

alignment

そのMarkerを探す必要があるが、startはbyte単位で計算されているのでstart位置がMarkerの真ん中にある可能性がある。そのままBZip2CompressionInputStreamにseekさせると読みたいMarkerの次のMarkerを見つけてしまうので少し戻す必要がある。これがadjStart.

ajdStart

Markerは通常48bit(=6bytes)だけれど、ファイルの先頭には”BZh9”という文字があり10bytesになっているらしい。FIRST_BZIP2_BLOCK_MARKER_POSITIONは10bytes。ところが”BZh9”がついているのはファイルの先頭だけなのに、常に10bytes戻ってMarkerを探すようになっている。これだと問題がおきるケースが下記。

read-again

startを含んだ圧縮ブロックはすでに読んでいるのにadjStartで戻りすぎたのでまた同じMarkerが見つかってしまう。つまりデータの重複が発生している。本当は6bytesだけ戻らなければいけないところを10bytes戻るので同じMarkerを見つけてしまう。Fix自体は簡単で正しく6bytes戻ればいいだけだった。


final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
	CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
long adjStart = 0L;
if (start != 0) {
	// ファイルの先頭でないなら6bytes (FIRST_BZIP2_BLOCK_MARKER_POSITION - (2bytes + 2bytes))戻る
	// そもそもファイルの先頭だったら戻る必要はなかった
	adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION
			- (HEADER_LEN + SUB_HEADER_LEN)));
}

InputFormat関連のバグはデータの欠損、重複を生み出すので改めて大事な部分だなと思った。

Keep blogging is Fun

This blog was kept for 4 years. Of course there are some times when I stop writing. But anyway updating blog posts itself are kept for 4 years. I’m now considering keep blogging is difficult and tough work. But I found an practical article for keep blogging. This post describes the motivation and mentality for keep blogging.

Want to blog? Read this

I totally agree with him. I thought there is no one who cares what I wrote even I kept writing 4 years. So I don’t have any fear to publishing contents. I posted not only useful posts (I thought at least!) but also wasted, rubbish contents. Anyway no one cares. I received responses rarely. No access growing, no criticism. So please don’t care what you are writing if you want to keep blogging. No one cares.

Just Practice

As the author of this article, English is my second language. So there are a lot of grammatical mistakes in my blog. The reason why I’m writing my blog in English is practice. Writing a lot of Engligh documents can contribute my English skill improvement. I’m not sure I could improve my English writing, but I can write articles in English without any mental barrier in short time. That might be the one I obtained after writing in English.

So what I want to write here is don’t care what you are writing!

Presto Driver,Split and Pipeline

Collecting workload metrics of distributed system is important task to improve performance and make it scalable. Presto is not exception. Presto is fast distributed SQL engine mainly developed by Facebook. Recently AWS start using Presto as backend of Amazon Athena. We are using Presto in daily analysis too. Just same as other distributed system like Hadoop, Presto also has its own terminology. In order to measure cluster workload correctly, understanding these notions and components are necessary.

So this post described the difference of Driver, Split and Pipeline which are used in Presto internally. First of all, a Presto query can be divided into 3 units.

  • Query
  • Stage
  • Task

Query is a query you submitted. The query is divided into several stages which represents some collection of operators. Query is scheduled per stages. And upper stream stages should wait downstream stages completion because upper stream stages depend on the data generated by downstream.

Query Overview

Task is a unit for actual data processing. A stage includes several tasks in it. So task is a parallel unit among multiple workers. Tasks in the same stage are doing same thing other than target data and worker instance on which a task is running. So when you have more tasks in a stage, it means more parallelism.

Query Overview

This is a live plan diagram generated by Presto. You can check this from query UI. It shows each stage and operators run by tasks (task is not described here explicitly though) of the query. The component like tablewriter, exchange are operators. These operators do actual data processing. So this is the basic diagram of Presto query internal.

Driver

Let’s introduce Driver first. We can regard Driver as resource of data processing. Large query consumes a lot of Drivers. We can see the query progress by checking the number of finished drivers. A driver contains multiple operators. So a task runs several drivers which includes multiple operators.

Pipeline

Pipeline is DriverFactory. Pipeline keeps operators and configurations to create driver. All drivers created by a pipeline have same operator types and same configurations. In my understanding, there are some operators which are often used with other operators. (e.g. map and reduce. shuffle and exchange etc). So creating drivers from pipeline good way. Pipeline is just a factory to create drivers, not runtime component. So we cannot see any metrics about pipeline.

Split

What’s Split? Split is a task for data scanning from connector, or other workers. It provides the data source processed by each drivers. It is responsible for handling I/O between other external system. If you have any experience of creating Presto connector, you might see split because connector passes the actual data based on the unit of split. Several splits can be included in a driver. If the pipeline starts with tablescan operator, driver is created per split. If the pipeline is an intermediate data processing, it depends on task.concurrency.

Driver Diagram

Workload Measurement

Overall driver is responsible for actual data processing. And driver can be a good indicator of workload of the cluster. If more drivers is running, we can consider that cluster workload is growing. Of course correct load of each driver also depends on the data size and operator types. But it is difficult to measure CPU usage and processed data size per query. So in reality measuring the number of drivers is a good way to know the cluster workload, I think. If you know the good measurement of Presto or general knowledge of metrics of distrbuted system, I would be glad when you let me know.

Thanks.