Firehoseで受信するデータのフォーマットと内容について
続きという程でもないですが、下記も関連してます。
Firehoseとは
凄くざっくりとはAT Protocolのサーバー間でデータを連携するための仕組みです。
具体的には下記の公式を参照してください(本ページの土台になっています)。
- Bluesky/Docs/Firehose
- AT Protocol/Docs/Event Stream
- atproto/lexicons/com/atproto/sync /subscribeRepos.json(lexicon)
フォーマット
com.atproto.sync.subscribeRepos
エンドポイントにWebSocketで接続するとDAG-CBORでエンコードされたブロック(ヘッダーとペイロードの2つで1セット)が受信できます。
CARファイルのようにDAG-CBORでエンコードされたブロックの外側にサイズ情報はないです。
ヘッダー
op
(operation) : integer(必須)1
: 通常(種類はtで示される)-1
: エラー
t
(type) : string(オプション)#commit
: リポジトリの更新。空のコミットも可能で、その場合はリポジトリデータの変更はありませんが、revと署名は更新されます。#identity
: アカウントのIDの変更。更新されたハンドル、署名キー、またはpdsホスティングエンドポイントの可能性があります。すべての下流サービスに対して、IDキャッシュをリフレッシュするように指示します。#handle
: アカウントのハンドルの更新、あるいは無効な状態への移行。注意: #identityに移行のため非推奨となる予定。#migrate
: アカウントのPDSインスタンスの移動。注意 : 未実装。アカウントの移動には#identity
が使われます。#tombstone
: アカウントの削除。注意: #identityまたは将来の#accountイベントに変わる可能性あり。#info
: 不明
サンプル
{
"op": 1,
"t": "#commit"
}
補足
- 不明な値が入っている場合はフレームを無視します。
- 不明なフィールドを無視します。
- フレームのデータ自体やDAG-CBORのエンコーディングがおかしい場合は切断します。
ペイロード(エラー)
error
: string(必須)- エラーの型名(名前空間やプレフィックスの
#
を除く)
- エラーの型名(名前空間やプレフィックスの
message
: string(オプション)- エラーの説明
補足
- エラーフレームを受信したら切断します。
ペイロード(通常)
t
が#commit
のときだけ書きます。主に興味があるのがこのあたりなので。
seq
: integer(必須)- このメッセージのストリームシーケンス番号。
- 連番が振られており処理状況の確認などに使用できる。また、再接続したときに以前読み込んだところから再開する場合などに使用できます。
- 接続時の挙動が
cursor
と組み合わせて変化します(詳細)。 - 正数で単調増加ですが、飛ぶかもしれないし、障害などでリセットされることもあります。
rebase
: boolean(必須)- 廃止
tooBig
: boolean(必須)- このコミットに含まれるオペレーションの数が多すぎるか、データサイズが大きすぎることを示します。
- クライアントは不足データを取得するために別途リクエストを行う必要がありますが、具体的にどうすれば良いのか分かりません。おそらくCARファイルの内容が不足するするので
ops
のpath
やcid
を用いて本来の情報を取得することになります(生レコード以上の情報が欲しければどのみちするのですが)。
repo
: string(必須)- 操作をしたユーザーのDID
commit
: cid-link(必須)- コミットのCID(レコードとは異なる)。
prev
: cid-link(オプション)- 廃止
rev
: string(必須)- 発行されたコミットの
rev
。tooBig
イベントでない限り、この情報はblocks
に含まれるコミットオブジェクトにも含まれることに注意してください。
- 発行されたコミットの
since
: string(必須)- 同じユーザー(このリポジトリ)の直前のコミットの
rev
(存在する場合)。
- 同じユーザー(このリポジトリ)の直前のコミットの
blocks
: bytes(必須)- 関連する修正を含むCARファイル。以前のリポジトリの状態からの差分。
- CARファイルの体裁をとっているので修正されるレコードブロックまでのアドレスブロックなども含みます。
ops
: array(必須)- このコミットでのリポジトリの変更操作(レコードの作成、更新、削除など)の
#repoOp
の配列。
- このコミットでのリポジトリの変更操作(レコードの作成、更新、削除など)の
blobs
: array(必須)- このコミットのレコードによって参照される新しいblobのリスト(CID)。
time
: string(必須)- このメッセージが最初にブロードキャストされたときのタイムスタンプ。
#repoOp
の内容
action
: string(必須)- 以下のどれか
create
update
delete
- 以下のどれか
- path : string(必須)
- リポジトリアクセス用のURI(の一部)(app.bsky.feed.post/<rkey>など)
- cid : cid-link(必須)
- レコードのCID
ポストのサンプル
blocks
のCARファイルは該当レコードを取り出した情報に変換しています。
{
"blobs": [],
"blocks": [
{
"cid": "bafyreihj7nokeio7tw4krnuqmuj6htykx7i4fscgcajbvkrmbuqvh22pee",
"uri": "at://did:plc:mqxsuw5b5rhpwo4lw6iwlid5/app.bsky.feed.post/3ktudvq2ovc2k",
"value": {
"$type": "app.bsky.feed.post",
"createdAt": "2024-06-01T11:33:15.313Z",
"embed": {
"$type": "app.bsky.embed.images",
"images": [
{
"alt": "",
"aspectRatio": {
"height": 4208,
"width": 2368
},
"image": {
"$type": "blob",
"mimeType": "image/jpeg",
"ref": {
"$link": "bafkreihnhh2amyaxshpdzjeojnjcdpwr3khofyx5cdg6eb64jt3b65k6me"
},
"size": 844723
}
}
]
},
"text": "firehoseをデコードできたと思われる",
"via": "Hagoromo"
}
}
],
"commit": {
"$link": "bafyreibj5esksh55hx3gjdaqcsfmdhbc6kcghhlkj7ikquoddnuwzueqde"
},
"ops": [
{
"action": "create",
"cid": {
"$link": "bafyreihj7nokeio7tw4krnuqmuj6htykx7i4fscgcajbvkrmbuqvh22pee"
},
"path": "app.bsky.feed.post/3ktudvq2ovc2k"
}
],
"prev": null,
"rebase": false,
"repo": "did:plc:mqxsuw5b5rhpwo4lw6iwlid5",
"rev": "3ktudvq2uqs2k",
"seq": 557371054,
"since": "3ktuarhlcjc2z",
"time": "2024-06-01T11:33:17.481Z",
"tooBig": false
}
リストからユーザーを削除するサンプル
{
"blobs": [],
"blocks": [],
"commit": {
"$link": "bafyreiagwtkie7qgbadsrxgnb65ufb7z5s4fjvfkq2osbnzoes5n2f4fgy"
},
"ops": [
{
"action": "delete",
"cid": null,
"path": "app.bsky.graph.listitem/3ktuh5zlmrc2n"
}
],
"prev": null,
"rebase": false,
"repo": "did:plc:mqxsuw5b5rhpwo4lw6iwlid5",
"rev": "3ktuhbo5gbs2s",
"seq": 557579051,
"since": "3ktuh5zltm22n",
"time": "2024-06-01T12:33:38.804Z",
"tooBig": false
}