Tapで遊んでみた

@usounds.work

この記事は Bluesky / ATProtocol Advent Calendar 2025 の14日目です。

Tapとは

最近、公式から Tap という仕組みがリリースされました。どういうときに使うの?ということに対して、下記のようなことが示唆されています。

  • Automatic historical backfill when tracking new repositories
  • Native webhook support for serverless architectures
  • Full network backfill and mirroring capabilities for research and analysis
  • Guaranteed delivery with acknowledgement mode
  • For tracking specific subsets of repositories without needing to process the full firehose

ユースケースとしてパッと思いつくのは Full network backfillでしょうか

Full network backfill

ATProtoの公式リレーは特定のタイミングまで通過したデータを全て保持していましたが、特定のタイミングからそれを停止しました。

これにより、ATProtoの世界で過去データを正しく拾うには、PDSに問い合わせして取る必要が出てきました。総量が見えている公式PDSだけでも結構面倒なのに、セルフホストPDSを含めるとさらにハードルが上がります。

ここらへんをいい感じに処理してくれるのがTapということのようです。ですので、新しいコレクションを作ってこれから運営するケースなどの過去のデータを気にしなくていい場合や、そこまでの厳密性を求められない場合においては特に出番はないと思います。

Native webhook support for serverless architectures(サーバーレスアプリケーション向けのWebhook)も若干気になりますが、Tapを動かすサーバーがいずれにせよ必要になるとこから限定的なのではという気もしますが有識者の判断に任せたいと思います。

早速動かしてみた

注意 Tap自体が最近リリースされたものなので、大いに間違っていると思います。この記事はあくまでも取っ掛かりとして活用していただければ幸いです。

公式でも警告されているTAP_FULL_NETWORK=trueで動かしてみました。SQLiteだと即厳しい状態に陥るので、ポスグレにご登場いただきながらdocker-compose.ymlを作りました。なお、TAP_COLLECTION_FILTERSで特定のコレクションに絞った場合は未検証です。

なお、TAP_FULL_NETWORK=trueで動作させる場合に限るとは思いますが、Tapのプロセスは最低でも6GBはあった方が良さそうです。yamlはSQLiteで四苦八苦していた名残で16GBを最大値にしています

それっぽいパラメータチューニングしてそうなコメントがありますが、ChatGPTさんが記載したものなので、このまま使うかどうかは皆さまにて判断いただければと思います。

services:
  postgres:
    image: postgres:16
    container_name: tap-postgres
    restart: unless-stopped
    environment:
      POSTGRES_DB: tap
      POSTGRES_USER: tap
      POSTGRES_PASSWORD: tap
    volumes:
      - ./pg-data:/var/lib/postgresql/data
    shm_size: 1gb
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U tap"]
      interval: 10s
      timeout: 5s
      retries: 5

  tap:
    image: ghcr.io/bluesky-social/indigo/tap:latest
    container_name: tap
    restart: unless-stopped
    depends_on:
      postgres:
        condition: service_healthy
    ports:
      - "2480:2480"
    environment:
      # DB
      TAP_DATABASE_URL: postgres://tap:tap@postgres:5432/tap?sslmode=disable

      # ネットワーク全体を追う(重い)
      TAP_FULL_NETWORK: "true"

      # 並列はデフォルトに戻す(=指定しない)
      # TAP_RESYNC_PARALLELISM: 5 (指定しない)

      # 安定性寄り設定(推奨)
      TAP_CURSOR_SAVE_INTERVAL: 2s
      TAP_OUTBOX_CAPACITY: 200000
      TAP_LOG_LEVEL: info

      # メモリ節約系(地味に効く)
      TAP_IDENT_CACHE_SIZE: 500000
    mem_limit: 16g

これで全力でデータを取りに行きます。コマンドで下記を実行します

docker compose up -d

ログを見るとデフォルトで https://relay1.us-east.bsky.network に繋ぎに行っているので、これからアカウントを割り出しながらPDSにデータをとりに行っているのではないかなという気がします。

データを取得してみた

起動しただけでは面白くないので、これを使って拙宅 ATProto Dashboard のバックフィルを試してみます。このダッシュボード、運営を開始した2025年1月17日以前のデータを取得していないので、ちょうどいいです。

公式が @atproto/tap というライブラリを出しているので、これに全力で甘えることとします。

なお、ここでのポスグレは、ATProto Dashboardのデータを溜めているポスグレを指し示します。要するに、Tapのためのポスグレとは直結する必要はなく、WebSocket経由でやり取りをします。

RecordEventの型定義を見るに、時間を示すものはありません。PDSのレコードを漁るために、データを作った時間はそのrecordがcreatedAtを持っていない限りは判定できないし、それも詐称される可能性を考慮し、バックフィルについては一律1900/1/1に発生したことにしました。

import { Tap, SimpleIndexer } from '@atproto/tap'
import pg from 'pg';
import * as dotenv from 'dotenv';
import logger from './logger';
import PQueue from 'p-queue';

const queue = new PQueue({ concurrency: 1 });

dotenv.config();

const { Pool } = pg;

const pool = new Pool({
    user: process.env.PG_USER,
    host: process.env.PG_HOST,
    database: process.env.PG_DATABASE,
    password: process.env.PG_PASSWORD,
    port: process.env.PG_PORT,
});

const client = await pool.connect();

async function main() {
    const tap = new Tap('http://localhost:2480')
    const indexer = new SimpleIndexer()

    const BACKFILL_TIMEOUT = 30 * 60 * 1000 // 30分
    const LOG_INTERVAL = 60 * 1000 // 1分

    let backfillTimer: NodeJS.Timeout | null = null
    let logInterval: NodeJS.Timeout | null = null
    let lastBackfillEventTime = Date.now()

    // タイマーリセット関数(live: false イベントでのみリセット)
    const resetBackfillTimer = () => {
        lastBackfillEventTime = Date.now()
        if (backfillTimer) clearTimeout(backfillTimer)
        backfillTimer = setTimeout(async () => {
            logger.info('No live=false events for 30 minutes. Exiting...')
            await queue.onIdle()
            process.exit(0)
        }, BACKFILL_TIMEOUT)
    }

    // 1分おきにバックフィル待機ログ(live: false イベントが対象)
    logInterval = setInterval(() => {
        const elapsed = Date.now() - lastBackfillEventTime
        if (elapsed >= LOG_INTERVAL) {
            logger.info('Waiting for live=false events...')
        }
    }, LOG_INTERVAL)

    indexer.record(async (evt) => {
        const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`

        // live: false イベントならタイマーリセット
        if (!evt.live) {
            resetBackfillTimer()
        }

        // バックフィル対象は 3rd party コレクションのみ
        if (
            evt.action === 'create' &&
            !evt.live &&
            !(evt.collection.startsWith('app.bsky') || evt.collection.startsWith('chat.bsky'))
        ) {
            logger.info(`[BACKFILL] Detect Event ${evt.action} ${uri}`)

            const createdAt = new Date('1900-01-01T00:00:00Z');

            queue.add(async () => {
                const checkQuery = `
                    SELECT 1 
                    FROM public.collection 
                    WHERE did = $1 AND collection = $2 AND rkey = $3
                    LIMIT 1;
                `;
                const res = await client.query(checkQuery, [
                    evt.did,
                    evt.collection,
                    evt.rkey,
                ]);

                if (res.rowCount === 0) {
                    const insertQuery = `
                        INSERT INTO public.collection (did, collection, rkey, "createdAt")
                        VALUES ($1, $2, $3, $4);
                    `;
                    await client.query(insertQuery, [
                        evt.did,
                        evt.collection,
                        evt.rkey,
                        createdAt,
                    ]);
                    logger.info(`Insert successed: ${uri}`);
                } else {
                    logger.info(`Skipped existing: ${uri}`);
                }
            });
        }
    })

    indexer.error((err) => {
        logger.error(err)
    })

    const channel = tap.channel(indexer)
    logger.info('Connecting to Tap...')
    await channel.start()
}



main().catch(console.error)

なお、現在、DID単位で3,540,150件のバックフィルを終わらせましたが、Tapのポスグレは11GBを突破しました。

おそらくはTap自体にはrecord自体を溜め込む機能はないでしょうから、recordを溜め込む場合はその分のデータ容量が別途必要になると思います。

終わりに

ここまでお読みいただきありがとうございました。明日は marilさん がご担当となります。アニメ視聴履歴を管理できるAniBlueやカスタム絵文字が使えるStellarを運営されていました。明日の記事もどうぞご覧ください!

usounds.work
ゆー

@usounds.work

#FF14 (Yu Sounds@Masamune) #F1jp #高校野球 #ぼざろ

ヒカセンですが、ヒカセンっぽい投稿はほぼありません。

Developer of
- Skyblur
- Rito
- FF14ヒカセンラベリング
- ATP Dashboard
https://linkat.blue/usounds.work

Post reaction in Bluesky

*To be shown as a reaction, include article link in the post or add link card

Reactions from everyone (0)