Firehoseで受信するデータのフォーマットと内容について

@ioriayane.relog.tech

Firehoseで受信するデータのフォーマットと内容について

続きという程でもないですが、下記も関連してます。

Firehoseとは

凄くざっくりとはAT Protocolのサーバー間でデータを連携するための仕組みです。

具体的には下記の公式を参照してください(本ページの土台になっています)。

フォーマット

com.atproto.sync.subscribeReposエンドポイントにWebSocketで接続するとDAG-CBORでエンコードされたブロック(ヘッダーとペイロードの2つで1セット)が受信できます。

CARファイルのようにDAG-CBORでエンコードされたブロックの外側にサイズ情報はないです。

ヘッダー

  • op(operation) : integer(必須)
    • 1 : 通常(種類はtで示される)
    • -1 : エラー
  • t(type) : string(オプション)
    • #commit : リポジトリの更新。空のコミットも可能で、その場合はリポジトリデータの変更はありませんが、revと署名は更新されます。
    • #identity : アカウントのIDの変更。更新されたハンドル、署名キー、またはpdsホスティングエンドポイントの可能性があります。すべての下流サービスに対して、IDキャッシュをリフレッシュするように指示します。
    • #handle : アカウントのハンドルの更新、あるいは無効な状態への移行。注意: #identityに移行のため非推奨となる予定。
    • #migrate : アカウントのPDSインスタンスの移動。注意 : 未実装。アカウントの移動には#identityが使われます。
    • #tombstone : アカウントの削除。注意: #identityまたは将来の#accountイベントに変わる可能性あり。
    • #info : 不明

サンプル

{
    "op": 1,
    "t": "#commit"
}

補足

  • 不明な値が入っている場合はフレームを無視します。
  • 不明なフィールドを無視します。
  • フレームのデータ自体やDAG-CBORのエンコーディングがおかしい場合は切断します。

ペイロード(エラー)

  • error : string(必須)
    • エラーの型名(名前空間やプレフィックスの#を除く)
  • message : string(オプション)
    • エラーの説明

補足

  • エラーフレームを受信したら切断します。

ペイロード(通常)

t#commitのときだけ書きます。主に興味があるのがこのあたりなので。

  • seq : integer(必須)
    • このメッセージのストリームシーケンス番号。
    • 連番が振られており処理状況の確認などに使用できる。また、再接続したときに以前読み込んだところから再開する場合などに使用できます。
    • 接続時の挙動がcursorと組み合わせて変化します(詳細)。
    • 正数で単調増加ですが、飛ぶかもしれないし、障害などでリセットされることもあります。
  • rebase : boolean(必須)
    • 廃止
  • tooBig : boolean(必須)
    • このコミットに含まれるオペレーションの数が多すぎるか、データサイズが大きすぎることを示します。
    • クライアントは不足データを取得するために別途リクエストを行う必要がありますが、具体的にどうすれば良いのか分かりません。おそらくCARファイルの内容が不足するするのでopspathcidを用いて本来の情報を取得することになります(生レコード以上の情報が欲しければどのみちするのですが)。
  • repo : string(必須)
    • 操作をしたユーザーのDID
  • commit : cid-link(必須)
    • コミットのCID(レコードとは異なる)。
  • prev : cid-link(オプション)
    • 廃止
  • rev : string(必須)
    • 発行されたコミットのrevtooBigイベントでない限り、この情報はblocksに含まれるコミットオブジェクトにも含まれることに注意してください。
  • since : string(必須)
    • 同じユーザー(このリポジトリ)の直前のコミットのrev(存在する場合)。
  • blocks : bytes(必須)
    • 関連する修正を含むCARファイル。以前のリポジトリの状態からの差分。
    • CARファイルの体裁をとっているので修正されるレコードブロックまでのアドレスブロックなども含みます。
  • ops : array(必須)
    • このコミットでのリポジトリの変更操作(レコードの作成、更新、削除など)の#repoOpの配列。
  • blobs : array(必須)
    • このコミットのレコードによって参照される新しいblobのリスト(CID)。
  • time : string(必須)
    • このメッセージが最初にブロードキャストされたときのタイムスタンプ。

#repoOpの内容

  • action : string(必須)
    • 以下のどれか
      • create
      • update
      • delete
  • path : string(必須)
    • リポジトリアクセス用のURI(の一部)(app.bsky.feed.post/<rkey>など)
  • cid : cid-link(必須)
    • レコードのCID

ポストのサンプル

blocksのCARファイルは該当レコードを取り出した情報に変換しています。

{
    "blobs": [],
    "blocks": [
        {
            "cid": "bafyreihj7nokeio7tw4krnuqmuj6htykx7i4fscgcajbvkrmbuqvh22pee",
            "uri": "at://did:plc:mqxsuw5b5rhpwo4lw6iwlid5/app.bsky.feed.post/3ktudvq2ovc2k",
            "value": {
                "$type": "app.bsky.feed.post",
                "createdAt": "2024-06-01T11:33:15.313Z",
                "embed": {
                    "$type": "app.bsky.embed.images",
                    "images": [
                        {
                            "alt": "",
                            "aspectRatio": {
                                "height": 4208,
                                "width": 2368
                            },
                            "image": {
                                "$type": "blob",
                                "mimeType": "image/jpeg",
                                "ref": {
                                    "$link": "bafkreihnhh2amyaxshpdzjeojnjcdpwr3khofyx5cdg6eb64jt3b65k6me"
                                },
                                "size": 844723
                            }
                        }
                    ]
                },
                "text": "firehoseをデコードできたと思われる",
                "via": "Hagoromo"
            }
        }
    ],
    "commit": {
        "$link": "bafyreibj5esksh55hx3gjdaqcsfmdhbc6kcghhlkj7ikquoddnuwzueqde"
    },
    "ops": [
        {
            "action": "create",
            "cid": {
                "$link": "bafyreihj7nokeio7tw4krnuqmuj6htykx7i4fscgcajbvkrmbuqvh22pee"
            },
            "path": "app.bsky.feed.post/3ktudvq2ovc2k"
        }
    ],
    "prev": null,
    "rebase": false,
    "repo": "did:plc:mqxsuw5b5rhpwo4lw6iwlid5",
    "rev": "3ktudvq2uqs2k",
    "seq": 557371054,
    "since": "3ktuarhlcjc2z",
    "time": "2024-06-01T11:33:17.481Z",
    "tooBig": false
}

リストからユーザーを削除するサンプル

{
    "blobs": [],
    "blocks": [],
    "commit": {
        "$link": "bafyreiagwtkie7qgbadsrxgnb65ufb7z5s4fjvfkq2osbnzoes5n2f4fgy"
    },
    "ops": [
        {
            "action": "delete",
            "cid": null,
            "path": "app.bsky.graph.listitem/3ktuh5zlmrc2n"
        }
    ],
    "prev": null,
    "rebase": false,
    "repo": "did:plc:mqxsuw5b5rhpwo4lw6iwlid5",
    "rev": "3ktuhbo5gbs2s",
    "seq": 557579051,
    "since": "3ktuh5zltm22n",
    "time": "2024-06-01T12:33:38.804Z",
    "tooBig": false
}
ioriayane.relog.tech
IoriAYANE

@ioriayane.relog.tech

Blueskyクライアントの羽衣を作ってます
https://hagoromo.relog.tech/ja/
epubを作るソフトのLeMEを作ってます。
https://leme.style
X:@ioriayane
アイコンは X:@keikawagutiさん

Post reaction in Bluesky

*To be shown as a reaction, include article link in the post or add link card

Reactions from everyone (0)