2007年12月11日

memcached

※本記事は当初HTMLに整形せずに掲載してしまい、ご覧になった方にはご不便をおかけしました。お詫び申し上げます。

consistent hashingの記事でmemcachedが名前だけ出てきたので少し調べてみた。

memcachedは単純なキーと値のペアによる分散キャッシュサーバ。高速で、非常に台数にスケールすることが利点という。ほとんど設定いらずでインストールしてそのまま起動できる。

動的に生成するウェブページのキャッシュ等に用いられる。キャッシュの他HTTPのセッション情報のような消えてしまっても致命的でない情報の保持に使われることもある。検索してみると国内でもmixiやはてな等で広く使われているようだ。各種言語のクライアントライブラリも多い。

目次

libevent

memcachedはlibeventというフレームワークを利用して書かれている。 libeventはネットワークサーバ用のイベント駆動型フレームワーク。後述のクライアント1万台問題も考慮しノンブロッキングI/Oを駆使する高速なネットワークソフトウェアが書けるようになっている。 一般的なselectpollの他、kqueue (*BSD)、epoll (Linux)、/dev/poll (Solaris)といったOS固有のイベント通知機構にも対応している。

libevent利用の基本的な流れは次のようになる。kqueue等とよく似ている。

  1. event_init()を呼び初期化
  2. 捕捉したいイベントを登録
    1. event_set()で監視するファイル記述子(ノンブロッキングモード)や条件、イベントハンドラの情報を構造体にセット
    2. event_add()で登録
  3. event_dispatch()でイベントループに入る

libeventのサイトにあるevent-test.cがごく簡単なサンプルプログラム。そのWindows用コードを取り除いて簡潔にしたものを引用しておく。

/*
 * Compile with:
 * cc -I/usr/local/include -o event-test event-test.c -L/usr/local/lib -levent
 */

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/queue.h>
#include <unistd.h>
#include <sys/time.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

#include <event.h>

void
fifo_read(int fd, short event, void *arg)
{
        char buf[255];
        int len;
        struct event *ev = arg;

        /* Reschedule this event */
        event_add(ev, NULL);

        fprintf(stderr, "fifo_read called with fd: %d, event: %d, arg: %p\n",
                fd, event, arg);
        len = read(fd, buf, sizeof(buf) - 1);

        if (len == -1) {
                perror("read");
                return;
        } else if (len == 0) {
                fprintf(stderr, "Connection closed\n");
                return;
        }

        buf[len] = '\0';
        fprintf(stdout, "Read: %s\n", buf);
}

int
main (int argc, char **argv)
{
        struct event evfifo;
        struct stat st;
        char *fifo = "event.fifo";
        int socket;

        if (lstat (fifo, &st) == 0) {
                if ((st.st_mode & S_IFMT) == S_IFREG) {
                        errno = EEXIST;
                        perror("lstat");
                        exit (1);
                }
        }

        unlink (fifo);
        if (mkfifo (fifo, 0600) == -1) {
                perror("mkfifo");
                exit (1);
        }

        /* Linux pipes are broken, we need O_RDWR instead of O_RDONLY */
#ifdef __linux
        socket = open (fifo, O_RDWR | O_NONBLOCK, 0);
#else
        socket = open (fifo, O_RDONLY | O_NONBLOCK, 0);
#endif

        if (socket == -1) {
                perror("open");
                exit (1);
        }

        fprintf(stderr, "Write data to %s\n", fifo);

        /* Initalize the event library */
        event_init();

        /* Initalize one event */
        event_set(&evfifo, socket, EV_READ, fifo_read, &evfifo);

        /* Add it to the active events, without a timeout */
        event_add(&evfifo, NULL);

        event_dispatch();

        return (0);
}

クライアント1万台問題

クライアント(Client)と1万(10K)から「C10K problem」と書かれる。 十分強力なハードウェアが安価に得られる現在、マシンパワーだけなら数多のクライアントも恐くはなくなったが、同時に無数のクライアントを捌かねばならない時代には別の要因がボトルネックとして浮上しているというもの。

これを日本語で紹介した@ITの記事「web2.0の先にあるC10K問題」ではOSのプロセス番号の最大数等からくる問題とされているが、元ネタのThe C10K problemの問題意識の中心はそれとは異なる。

クライアントごとにスレッドを使うモデルは通用しない

OSのマルチスレッドが一般化してからの一頃は1クライアントに1スレッドを割り当てるサーバも多く書かれたが、C10Kの状況ではこのモデルは破綻する。大抵のOSは少なくとも以前はそのような非常に多数のスレッドを扱う状況をあまり想定していなかったことや、スレッドの数だけスタックフレーム(例えば2MB)も必要なためにプロセスのメモリ空間の大きさからスレッド数も制約されてしまう(例えば512個)という理由による。

スレッドの代わりにクライアントごとにプロセスを割り当てる方式だとそれこそ@ITのプロセス数の制約の議論になる。

一スレッドで多数のクライアントを同時に処理

必然的に一つのスレッド(またはプロセス)で多数のクライアントを同時に処理する必要があり、The C10K problemでは

  • ノンブロッキングI/Oと状態変化通知機構を利用する
  • 非同期I/Oを使う

この2つに特に焦点を当てている。

状態変化通知

状態変化通知(readiness change notification)とは、データが到着して入力が可能になった場合、または先の出力が完了してバッファが空き、次の出力が可能になった場合の変化をユーザプロセスに通知する方式。*BSDのkqueue、Linuxのepoll等がこれに当たる。

従来からUNIXで使われてきたselectpollは、プログラムが調査するたびにそのとき読み込み可能・出力可能なファイル記述子を報告するもの。selectpollの大きな問題として、selectには調査するファイル記述子の数がFD_SETSIZEに制約されることが挙げられる。pollにはそのような制約はないものの、C10K環境では毎回長大なファイル記述子のリストを全てスキャンすることになり、多くの場合ほとんどのファイル記述子は準備状態にないだろうことから非常に無駄が多い。

Solarisの/dev/pollpollを代替するもので、監視するファイル記述子のリストを一回だけ登録し、毎回渡す必要をなくしてパフォーマンスを改善したもの。

ノンブロッキングI/O

ノンブロッキングI/Oは文字通りブロックしない入出力方法。通常の入出力では指定したデータの出力が完了するまで、または指定したバイト数のデータを読み込むまでシステムコールから復帰せず、その間プロセスの実行は停止する。ノンブロッキングI/Oはそのとき到着していたデータだけを読み込み、またはそのときカーネルのバッファに書き込める分量だけを出力して即座に復帰する。1バイトも処理できない場合にはブロックしないでエラー(EWOULDBLOCK)を返す。

この場合にボトルネックはどこからくるかというと、ディスクアクセスで発生する(ディスクアクセスに対してノンブロッキングモードの指定は無効)。mmapされたファイルへのアクセスでも、sendfileでも同様。とにかく読み込むデータがメモリに載っていたかったが最後、ブロックは生じてしまう、すなわち無数のクライアントの処理が停止する。解決策は非同期I/Oを用いるかワーカースレッドに処理を任せてさっさと次のクライアントの処理に回ること。なおFreeBSDにはsendfileにブロックを避けるオプションがある。

非同期I/O

非同期I/Oはaio_*(aio_read, aio_write等)というAPIで提供されているが、比較的新しいためあまり普及していないらしい。

非同期I/Oではデータの読み込み・書き出しの指示だけをカーネルに与えると即座に復帰する。カーネルが入出力を完了すると、ユーザプロセスはシグナルによってその通知を受け取る。

O'Reillyの本POSIX.4: Programming for the Real Worldに非同期I/Oのよい紹介があるとされている。

WindowsにもI/O Completion Portという非同期I/Oと完了通知の枠組みがあるという。

総じて

The C10K problemでは、いかに不要なブロッキングが発生してしまうのを回避し、マシンパワーにスケールした性能を出せるプログラムを構成するかが命題となっている。

読んでいるうちに「ザ・ゴール」(制約条件の理論を題材にした小説。面白い)を思い出すような話だった。

memcachedの概要

話をmemcachedに戻す。

memcachedは次のように起動する。オプションにはlistenするアドレス、ポート番号、使用するメモリサイズを指定する。

# ./memcached -d -m 2048 -l 10.0.0.40 -p 11211

クライアント1万台問題でも出てきたように、ディスクへのアクセスが入るとパフォーマンスが下がるのでオンメモリを保つことが重要なポイントとなる。そのためmlockallを用いて物理ページを占有するオプションが用意されている。

基本的な使い方は次のようなものになる。

  1. データベースへ問い合わせる前にキャッシュに載っていないかを調べる。載っていればそれを返す。
  2. 載っていない場合はデータベースに問い合わせる。得られた結果はキャッシュに登録する。

キーは250文字までの空白を含まないテキスト、値は最大1MBまでの任意のデータ。

memcachedプロトコルには次のような操作が用意されている。

  • set: オブジェクトを登録
  • add: 未登録の場合のみオブジェクトを登録
  • replace: 登録済の場合のみオブジェクトを変更
  • append, prepend: オブジェクトに追加
  • cas: 以前アクセスした時点から変更されていない場合のみオブジェクトを変更
  • incr, decr: オブジェクトを64ビット整数として更新
  • get: オブジェクトを得る
  • delete: オブジェクトを抹消
  • flush_all: 全て抹消
  • stats: キャッシュサーバの状態を報告
  • quit: キャッシュサーバを終了

consistent hashingとの関連は?

memcachedそのもの(や、クライアントのほとんど)は全く関係ない。

memcachedそのものはキャッシュサーバ単体としての責任のみを担当していて、キャッシュサーバの選択方法、どれかのキャッシュサーバが落ちたときの対応等の一切はクライアントライブラリ任せとなっている。

そして多くのクライアントはconsistent hashingは実装していないため、FAQにもキャッシュサーバの追加や削除は全てのキャッシュを無効にするのと同じだという警告がある。

consistent hashingを利用した例としてlibketamaというクライアントライブラリが存在する(後述)。

memcachedのソースコード

ソースコードは小さく、*.cファイルだけだと7つ、5000行余りしかない。中心的なモジュールは次の5つ。

  • assoc.c: キーからオブジェクト(厳密には次の節で述べるキャッシュアイテム)へマップするハッシュテーブル。
  • slab.c: キャッシュアイテムに用いるためのメモリ管理。キャッシュアイテムの大きさを十数段階に分け、階級ごとにメモリプールを作る。それぞれのメモリプールには必要に応じて1MBのメモリブロック単位(slab)でメモリを追加する。メモリプールは一定長のアイテムに分割され、要求があると大きさに応じた階級のメモリプールから空いているアイテムを返す。
  • items.c: キャッシュアイテムのモジュール。
  • memcached.c: メイン関数、ネットワークサーバとしての接続処理。
  • stats.c: 統計情報。
items.c

キャッシュアイテムは次のような形式で一括してメモリ上に作られる。

+------------------------------------+
| ヘッダ                             |
+------------------------------------+
| キー(NULL終端文字列) (+パディング) |
+------------------------------------+
| サフィクス                         |
+------------------------------------+
| データ                             |
+------------------------------------+

サフィクスはデータの長さやフラグを示す1行の文字列で、memcachedがクライアントの要求に対してデータ本体の前に通知するプレフィクスそのものになっている。

このアイテムは2つのデータ構造の要素になる。その一つはassoc.cによるハッシュテーブル、もう一つはitems.c内のリスト。items.cではメモリブロックの大きさ別にアイテムのリストを作り、キャッシュ管理(LRUアルゴリズムによるエクスパイア)を行っている。

またヘッダにはカウンタを持ち、参照カウント方式で管理もしている。

memcached.c

ネットワークサーバとしての本体部分。その大きさは約2700行と全体の半分を占める。

メイン関数は各種初期化をしlistenするソケットを作った後、libeventのイベントループに突入する。

conn (構造体)

次の構造体がヘッダmemcached.hで定義されている。 ノンブロッキングI/Oのためのバッファ以外にも要素は盛り込まれており、これを見るだけでも接続の処理は有限状態機械になっていることが感じられる。

typedef struct {
    int    sfd;
    int    state;
    struct event event;
    short  ev_flags;
    short  which;   /* which events were just triggered */

    char   *rbuf;   /* buffer to read commands into */
    char   *rcurr;  /* but if we parsed some already, this is where we stopped */
    int    rsize;   /* total allocated size of rbuf */
    int    rbytes;  /* how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    int    write_and_go; /* which state to go into after finishing current write */
    void   *write_and_free; /* free this memory after finishing writing */

    char   *ritem;  /* when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /*
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */
    int    item_comm; /* which one is it: set/add/replace */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
    struct iovec *iov;
    int    iovsize;   /* number of elements allocated in iov[] */
    int    iovused;   /* number of elements used in iov[] */

    struct msghdr *msglist;
    int    msgsize;   /* number of elements allocated in msglist[] */
    int    msgused;   /* number of elements used in msglist[] */
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */

    item   **ilist;   /* list of items to write out */
    int    isize;
    item   **icurr;
    int    ileft;

    /* data for UDP clients */
    bool   udp;       /* is this is a UDP "connection" */
    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
    struct sockaddr request_addr; /* Who sent the most recent request */
    socklen_t request_addr_size;
    unsigned char *hdrbuf; /* udp packet headers */
    int    hdrsize;   /* number of headers' worth of space is allocated */

    int    binary;    /* are we in binary mode */
    int    bucket;    /* bucket number for the next command, if running as
                         a managed instance. -1 (_not_ 0) means invalid. */
    int    gen;       /* generation requested for the bucket */
} conn;
conn_new()

ファイル記述子のイベント処理をlibeventに登録する。イベントハンドラはevent_handler()関数で、これはほとんどそのままdrive_machine()関数を呼び出すだけのもの。

drive_machine()

入力(または出力)が可能になるたびに呼ばれ、名前の通り、状態機械を駆動する。 状態によって分岐し、stopフラグがセットされるまでループする。このフラグはブロックせずにできる入出力を処理し終えたり、接続をクローズすることになった場合にセットされる。言い方を変えると何かしらやることがある間はセットされない。

状態の種類はmemcached.hで定義されている。

enum conn_states {
    conn_listening,  /* the socket which listens for connections */
    conn_read,       /* reading in a command line */
    conn_write,      /* writing out a simple response */
    conn_nread,      /* reading in a fixed number of bytes */
    conn_swallow,    /* swallowing unnecessary bytes w/o storing */
    conn_closing,    /* closing this connection */
    conn_mwrite      /* writing out many items sequentially */
};

各状態で行われる入出力は大体どれも次のような形になっている。

  1. 入力ならバッファの大きさや空きのチェック
  2. res = read(...)
  3. if (res > 0): まだ入力が残っているかも知れないのでループの繰り返しへ
  4. else if (res == 0): EOF。conn_closingへ状態を遷移
  5. else: errnoがEAGAINかEWOULDBLOCKならブロックせずに読めるデータがないのでstopフラグを立てる
  6. libeventに登録している監視条件が状態に合致しているか確認し必要なら変更。状態遷移ではイベント監視条件の変更までは行わないため。

主な状態を以下に挙げる。

conn_listening

新たにクライアントが接続してきたのでacceptして新しいファイル記述子のconnを作成。初期状態はconn_read。

conn_read

入力データを読み込んでバッファに追加。既に1行の読み込みが完了していれば(memcachedのプロトコルでは、最初の1行でコマンドを通知する。add等の場合はその後にオブジェクトのデータが続く)、コマンドを解析してそれぞれのコマンドの処理へ分岐する。

コマンドがadd等の場合、後続するデータを読み込むためにconn_nread状態に遷移する。

conn_nread

データ本体の前に通知されたバイト数まで入力を読み込む。所定バイト数の読み込みを完了すると、completion_nread()経由でstore_item()が呼ばれる。

コマンドの結果("STORED", "NOT STORED", "CLIENT_ERROR ...")をクライアントに返すため、状態はconn_writeに遷移する。

conn_write, conn_mwrite

sendmsgを用いてデータを送出。全データの出力が完了すると状態はconn_readに戻る。

クライアントの例: Ruby-MemCache

RubyのmemcachedクライアントライブラリであるRuby-MemCacheを覗いてみた。

APIのドキュメントを見ると主役はMemCacheクラスとServerクラスだけだ。

MemCacheクラス

c_thresholdやcompressionといったメンバがあり、キャッシュに載せるデータがある程度大きい場合は自動圧縮・伸長できるようになっている。

サーバリストの設定方法 (MemCache#servers=(servers))

引数にサーバ("hostname:port""hostname:port:weight")のリストを一括して与えるようになっている。

サーバの選択 (MemCache#get_server(key))

一見consistent hashingをしているのかと思ったが台数に依存する剰余をとっているので そうではない。

サーバごとにメモリサイズに応じたウェイトをつけ、サーバが選択される可能性がウェイトに比例するようにしている。

ただしこのコードでは、あるサーバが死んでいるとその代替先はリスト中の次のサーバ一択となり負荷が集中してしまう上、ウェイトの値の大きさによっては代替先を選ぶコードが全く役に立たない。代替先の選択方法を変えるか、@bucketsを生成後にシャッフルすることが必要だろう(シャッフルは全クライアントで同じ結果になるようにする)。

def get_server( key )
    svr = nil

    @mutex.synchronize( Sync::SH ) {
        if @servers.length == 1
            self.debug_msg( "Only one server: using %p", @servers.first )
            svr = @servers.first
        else

            # If the key is an integer, it's assumed to be a precomputed hash
            # key so don't bother hashing it. Otherwise use the hashing function
            # to come up with a hash of the key to determine which server to
            # talk to
            hkey = nil
            if key.is_a?( Integer )
                hkey = key
            else
                hkey = @hashfunc.call( key )
            end

            # Set up buckets if they haven't been already
            unless @buckets
                @mutex.synchronize( Sync::EX ) {
                    # Check again after switching to an exclusive lock
                    unless @buckets
                        @buckets = []
                        @servers.each do |svr|
                            self.debug_msg( "Adding %d buckets for %p", svr.weight, svr )
                            svr.weight.times { @buckets.push(svr) }
                        end
                    end
                }
            end

            # Fetch a server for the given key, retrying if that server is
            # offline
            20.times do |tries|
                svr = @buckets[ (hkey + tries) % @buckets.nitems ]
                break if svr.alive?
                self.debug_msg( "Skipping dead server %p", svr )
                svr = nil
            end
        end
    }

    raise MemCacheError, "No servers available" if 
        svr.nil? || !svr.alive?

    return svr
end
サーバの生死判定 (Server#alive?)
def alive?
    return !self.socket.nil?
end

クライアントは全てのキャッシュサーバと接続を張りっぱなしにしておく。

Server#socket

サーバとの接続を返す。まだ接続していなかった場合は接続を張る(応答がなさそうならタイムアウト)。サーバの死亡が検出されると一定時間はいちいち再接続を試みないようにしている。

def socket

    # Connect if not already connected
    unless @sock || (!@sock.nil? && @sock.closed?)

        # If the host was dead, don't retry for a while
        if @retry
            return nil if @retry > Time::now
        end

        # Attempt to connect, 
        begin
            @sock = timeout( @connect_timeout ) {
                TCPSocket::new( @host, @port )
            }
            @status = "connected"
        rescue SystemCallError, IOError, TimeoutError => err
            # $deferr.puts "Error while connecting to %s:%d: %s" %
            #  [ @host, @port, err.message ]
            self.mark_dead( err.message )
        end
    end

    return @sock
end

libketama

libketamaはconsistent hashingを利用したライブラリ。他のmemcachedクライアントライブラリを完全に置き換えるものではなく、機能はキャッシュサーバ選択に限定されている。

このページにはSubversionレポジトリのURLも書かれてはいるが接続できなかったので、ketama-0.1.1.tar.bz2をダウンロードした。

ketamaではハッシュの値域をcontinuumと呼び、0から232までとしている。 この空間に、サーバごとに100〜200個の点をばらまく。 キャッシュサーバの選択はキーのハッシュ値以上で一番近い点のものを選ぶ。

サーバのリストは次のようなファイルで与える。サーバごとに、IPアドレス・ポート番号・メモリサイズの組で指定する。メモリサイズはcontinuumにばらまく点の数を増減するウェイトになる。

重要な構造体はmcsとcontinuumの2つ。

mcs

continuum上のサーバ点を表す構造体。点がとる値とサーバのアドレス(ポート番号も含む、以下同様)からなる。

typedef struct
{
        unsigned int point;  // point on circle
        char ip[22];
} mcs;
continuum

continuumを表す。ばらまいた点の数、サーバリストファイルの更新時刻、点の配列からなる。

typedef struct
{
        int numpoints;
        void* modtime;
        void* array; //array of mcs structs
} continuum;

typedef continuum* ketama_continuum;
ketama_get_server(char* key, ketama_continuum cont)

キャッシュサーバを選択する。

まず、キーからハッシュを計算する。 次に、cont->array (mcsのソート済配列)を二分探索してサーバを探す。

ketama_create_continuum(key_t key, char* filename)

continuumを作成する。

サーバリストを読み込み、メモリを確保するとサーバに対応する点をcontinuum上に配置する。

点の総数はサーバの数×160で、サーバのメモリサイズに応じて割り振られる。 サーバのアドレスに"-1"等の番号をくっつけたもののMD5をとって乱数とし、一つのMD5を元に4つの点を決める(128ビットのMD5を32ビット×4に分割)。 点を生成し終えると二分探索に備えてソートしておく。

生成したcontinuumは共有メモリに置かれる。引数keyはそのためのキー。


この記事へのトラックバック
この記事へのコメント
コメントを書く
お名前: [必須入力]

メールアドレス:

ホームページアドレス:

コメント: [必須入力]

認証コード: [必須入力]


※画像の中の文字を半角で入力してください。