PostgreSQL の NOTIFY/LISTEN を使用してテーブルの変化を PHP から検知してみた
PostgreSQL 9.0 から NOTIFY/LISTEN という Pub/Sub を行う仕組みが入っていたので、試してみた。
NOTIFY/LISTEN の挙動を確認する
psql を3枚立ち上げる。
LISTEN <channel>;
で <channel>
の購読を開始する。
NOTIFY <channel>;
で <channel>
を購読しているセッションに配信を行う。
購読を psql 上で待つときは、; [Enter]
など、何かしら実行させることで通知が表示される。
トランザクションで囲んだ場合は
のように、コミット時に配信される。
トランザクション内で同じチャンネルに対して複数回配信を行った場合は
のように、複数回配信されることはなく、一度だけ配信される。
詳しくはマニュアルに記載されている。
リアルタイム配信システムを作ってみる
チャットのように、発言テーブルに INSERT
されたら画面上にリアルタイムに発言が表示されるシステムを作ってみる。
テーブル定義
CREATE TABLE timeline ( id serial NOT NULL PRIMARY KEY, body text NOT NULL, created timestamp NOT NULL DEFAULT now() );
シーケンス、内容、時刻を持つだけの単純なテーブルを作った。
このテーブルに INSERT
されたときに自動的に NOTIFY
してほしいので、トリガーを作成する。
CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $$ BEGIN PERFORM pg_notify(TG_ARGV[0], NULL); RETURN NULL; END; $$ LANGUAGE 'plpgsql'; CREATE TRIGGER timeline_update AFTER INSERT ON timeline EXECUTE PROCEDURE notify_trigger('timeline_update');
pg_notify
関数は、NOTIFY
文が関数になったもので、チャンネル名やペイロード(通知の際に使用できる任意の文字列)が不定である場合に便利とされている。
トリガー関数の戻り値が NULL
であるが、これはトリガーが AFTER INSERT
であり、戻り値が利用されないため、何でも良い。
なお、トリガーに UPDATE
や DELETE
も指定し、通知のペイロードとして変更のあった id
を渡すことで、リアルタイムに変更を監視することも可能なはずである。
(追記) tcnモジュールを利用することでわざわざトリガー関数を作らなくても良いようだ。ただしこのモジュールは行指向のため、今回の用途からすると少しオーバースペックではある。
API
PHP 5.6 で作った。
なお、ここで使用している PDO::pgsqlGetNotify
は PHP 5.6 以降で使用可能な関数であるため、
それ以前のバージョンを使うときは PostgreSQL モジュールに用意されている pg_get_notify
を使用することになる。
(ただし、pg_get_notify
はタイムアウトが指定できず、通知が無い場合は即座に FALSE
が返るため、PHP 内でループを回すことになる… ダサい)
WebSocket などのモダンな技術で作っても良いのだが、単純化するために古典的な Long polling スタイルで作った。
全ソースコードは少し長いので Gist にアップロードした。
function retrieve($since_id = 0, $limit = 10) { global $db; $stmt = $db->prepare("SELECT * FROM timeline WHERE id > :since_id ORDER BY id DESC LIMIT :limit"); $stmt->bindValue(':since_id', $since_id); $stmt->bindValue(':limit', $limit); $stmt->execute(); return $stmt->fetchAll(); } $timeline = retrieve($since_id); if (empty($timeline)) { $db->exec('LISTEN timeline_update'); $result = $db->pgsqlGetNotify(PDO::FETCH_ASSOC, 30000); if ($result === false) { output(); } $timeline = retrieve($since_id); }
Twitter 風に、since_id
を指定できるようにした。
API 呼び出し時点で id
が since_id
より上の発言があればそれを返し、無ければ発言されるまで(INSERT
されるまで)待機し、新規発言を返す。
30秒たっても新規発言がなければ、空配列が返る。
retrieve.php?since_id=<max_id>
をブラウザで開くと読み込み中のままになり、psql などから timeline
テーブルに追加すると即座にブラウザ上にレスポンス (JSON) が表示される。
since_id
を用いた取得なので、INSERT INTO timeline (body) VALUES ('foo'), ('bar')
のように2行が一度に INSERT
されても、ちゃんと2行とも表示される。
フロントエンド
$(function() { var retrieve_and_display = (function() { var current_id = 0; return function() { $.post('retrieve.php', { since_id: current_id }) .done(function(result) { if (result.length > 0) { current_id = result[0].id; result.reverse(); $.each(result, function() { $('<li />').text(this.body).prependTo($('#timeline')); }); } retrieve_and_display(); }); }; })(); retrieve_and_display(); });
<!DOCTYPE html> <html lang="ja"> <head> <meta charset="UTF-8" /> <script src="//code.jquery.com/jquery-2.1.3.min.js" type="text/javascript"></script> <script src="pushtest.js" type="text/javascript"></script> <title></title> </head> <body> <p>timeline:</p> <ul id="timeline"> </ul> </body> </html>
retrieve_and_display
関数を呼び出すたびに未取得の発言を最大10件取得し、#timeline
の先頭に追加する。
サンプルなのでいろいろツッコミどころがあるが(ajax request が fail したときの処理がない、jQuery で DOM を生成するな、そもそも jQuery を使うな etc.)、そこはサンプルということでひとつ。
ブラウザで開き、最新の10件が表示され、psql 等で timeline
テーブルに追加すれば即座にブラウザ側にも反映されることが確認できた。
まとめ
PostgreSQL の NOTIFY/LISTEN を使用することで、Pub/Sub を実現できた。 Publish 側は PostgreSQL に任せることができるため、Subscribe 側のみの実装を行うだけで良いのは楽であった。