Skip to main content

Firehose

One of the core primitives of the AT Protocol that underlies Bluesky is the firehose. It is an authenticated stream of events used to efficiently sync user updates (posts, likes, follows, handle changes, etc).

Many applications people will want to build on top of atproto and Bluesky will start with the firehose, from feed generators to labelers, to bots and search engines.

In the atproto ecosystem, there are many different endpoints that serve firehose APIs. Each PDS serves a stream of all of the activity on the repos it is responsible for. From there, relays aggregate the streams of any PDS who requests it into a single unified stream.

This makes the job of downstream consumers much easier, as you can get all the data from a single location. The main relay for Bluesky is bsky.network, which we use in the examples below.

To get started, you will open a WebSocket connection to your favorite firehose provider for the com.atproto.sync.subscribeRepos endpoint:

uri := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
con, _, err := websocket.DefaultDialer.Dial(uri, http.Header{})

From there, you need to read off each message as it comes in, and decode the CBOR event data. More details on this can be found here

Most SDKs have a nice wrapper for this though, In this example we will just print each repo operation in each event we receive. These operations are things like "create post", "create like", "delete follow" and so on.

rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
fmt.Println("Event from ", evt.Repo)
for _, op := range evt.Ops {
fmt.Printf(" - %s record %s\n", op.Action, op.Path)
}
return nil
},
}

sched := sequential.NewScheduler("myfirehose", rsc.EventHandler)
events.HandleRepoStream(context.Background(), con, sched)

In this snippet we set up a sequential "scheduler", which handles all events sequentially in order. Other schedulers run event handling in parallel, or do limited concurrency based on who the event is for.

Once we have a scheduler, we call into HandleRepoStream which does the actual decoding of the data coming over the websocket and calls into the event handler we wrote.

If you want a more simplified firehose where events are JSON, you can connect to the JetStream websocket:

import asyncio
import websockets

uri = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post"

async def listen_to_websocket():
async with websockets.connect(uri) as websocket:
while True:
try:
message = await websocket.recv()
print(message)
except websockets.ConnectionClosed as e:
print(f"Connection closed: {e}")
break
except Exception as e:
print(f"Error: {e}")

asyncio.get_event_loop().run_until_complete(listen_to_websocket())