PostgreSQL の NOTIFY/LISTEN を使用してテーブルの変化を PHP から検知してみた

PostgreSQL 9.0 から NOTIFY/LISTEN という Pub/Sub を行う仕組みが入っていたので、試してみた。

NOTIFY/LISTEN の挙動を確認する

psql を3枚立ち上げる。

NOTIFY/LISTEN

LISTEN <channel>;<channel> の購読を開始する。

NOTIFY <channel>;<channel> を購読しているセッションに配信を行う。

購読を psql 上で待つときは、; [Enter] など、何かしら実行させることで通知が表示される。

トランザクションで囲んだ場合は

トランザクション1

のように、コミット時に配信される。

トランザクション内で同じチャンネルに対して複数回配信を行った場合は

トランザクション2

のように、複数回配信されることはなく、一度だけ配信される。

詳しくはマニュアルに記載されている。

リアルタイム配信システムを作ってみる

チャットのように、発言テーブルに INSERT されたら画面上にリアルタイムに発言が表示されるシステムを作ってみる。

テーブル定義

CREATE TABLE timeline (
  id serial NOT NULL PRIMARY KEY,
  body text NOT NULL,
  created timestamp NOT NULL DEFAULT now()
);

シーケンス、内容、時刻を持つだけの単純なテーブルを作った。 このテーブルに INSERT されたときに自動的に NOTIFY してほしいので、トリガーを作成する。

CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $$
  BEGIN
    PERFORM pg_notify(TG_ARGV[0], NULL);
    RETURN NULL;
  END;
$$ LANGUAGE 'plpgsql';

CREATE TRIGGER timeline_update AFTER INSERT ON timeline EXECUTE PROCEDURE notify_trigger('timeline_update');

pg_notify 関数は、NOTIFY 文が関数になったもので、チャンネル名やペイロード(通知の際に使用できる任意の文字列)が不定である場合に便利とされている。 トリガー関数の戻り値が NULL であるが、これはトリガーが AFTER INSERT であり、戻り値が利用されないため、何でも良い。

なお、トリガーに UPDATEDELETE も指定し、通知のペイロードとして変更のあった id を渡すことで、リアルタイムに変更を監視することも可能なはずである。

(追記) tcnモジュールを利用することでわざわざトリガー関数を作らなくても良いようだ。ただしこのモジュールは行指向のため、今回の用途からすると少しオーバースペックではある。

API

PHP 5.6 で作った。 なお、ここで使用している PDO::pgsqlGetNotifyPHP 5.6 以降で使用可能な関数であるため、 それ以前のバージョンを使うときは PostgreSQL モジュールに用意されている pg_get_notify を使用することになる。 (ただし、pg_get_notifyタイムアウトが指定できず、通知が無い場合は即座に FALSE が返るため、PHP 内でループを回すことになる… ダサい)

WebSocket などのモダンな技術で作っても良いのだが、単純化するために古典的な Long polling スタイルで作った。

ソースコードは少し長いので Gist にアップロードした。

function retrieve($since_id = 0, $limit = 10) {
    global $db;

    $stmt = $db->prepare("SELECT * FROM timeline WHERE id > :since_id ORDER BY id DESC LIMIT :limit");
    $stmt->bindValue(':since_id', $since_id);
    $stmt->bindValue(':limit', $limit);
    $stmt->execute();

    return $stmt->fetchAll();
}


$timeline = retrieve($since_id);

if (empty($timeline)) {
    $db->exec('LISTEN timeline_update');
    $result = $db->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000);
    if ($result === false) {
        output();
    }
    $timeline = retrieve($since_id);
}

Twitter 風に、since_id を指定できるようにした。 API 呼び出し時点で idsince_id より上の発言があればそれを返し、無ければ発言されるまで(INSERT されるまで)待機し、新規発言を返す。 30秒たっても新規発言がなければ、空配列が返る。

retrieve.php?since_id=<max_id> をブラウザで開くと読み込み中のままになり、psql などから timeline テーブルに追加すると即座にブラウザ上にレスポンス (JSON) が表示される。 since_id を用いた取得なので、INSERT INTO timeline (body) VALUES ('foo'), ('bar') のように2行が一度に INSERT されても、ちゃんと2行とも表示される。

フロントエンド

モダンなフレームワークを使うべきだが今回は jQuery

$(function() {
    var retrieve_and_display = (function() {
        var current_id = 0;
        return function() {
            $.post('retrieve.php', { since_id: current_id })
            .done(function(result) {
                if (result.length > 0) {
                    current_id = result[0].id;
                    result.reverse();
                    $.each(result, function() {
                        $('<li />').text(this.body).prependTo($('#timeline'));
                    });
                }

                retrieve_and_display();
            });
        };
    })();

    retrieve_and_display();
});
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8" />
    <script src="//code.jquery.com/jquery-2.1.3.min.js" type="text/javascript"></script>
    <script src="pushtest.js" type="text/javascript"></script>
    <title></title>
</head>
<body>

<p>timeline:</p>

<ul id="timeline">
</ul>

</body>
</html>

retrieve_and_display 関数を呼び出すたびに未取得の発言を最大10件取得し、#timeline の先頭に追加する。 サンプルなのでいろいろツッコミどころがあるが(ajax request が fail したときの処理がない、jQuery で DOM を生成するな、そもそも jQuery を使うな etc.)、そこはサンプルということでひとつ。

ブラウザで開き、最新の10件が表示され、psql 等で timeline テーブルに追加すれば即座にブラウザ側にも反映されることが確認できた。

まとめ

PostgreSQL の NOTIFY/LISTEN を使用することで、Pub/Sub を実現できた。 Publish 側は PostgreSQL に任せることができるため、Subscribe 側のみの実装を行うだけで良いのは楽であった。