Firehose
One of the core primitives of the AT Protocol that underlies Bluesky is the firehose. It is an authenticated stream of events used to efficiently sync user updates (posts, likes, follows, handle changes, etc).
Many applications people will want to build on top of atproto and Bluesky will start with the firehose, from feed generators to labelers, to bots and search engines.
In the atproto ecosystem, there are many different endpoints that serve firehose APIs. Each PDS serves a stream of all of the activity on the repos it is responsible for. From there, relays aggregate the streams of any PDS who requests it into a single unified stream.
This makes the job of downstream consumers much easier, as you can get all the data from a single
location. The main relay for Bluesky is bsky.network
, which we use in the examples below.
To get started, you will open a WebSocket connection to your favorite firehose
provider for the com.atproto.sync.subscribeRepos
endpoint:
- Go
uri := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
con, _, err := websocket.DefaultDialer.Dial(uri, http.Header{})
From there, you need to read off each message as it comes in, and decode the CBOR event data. More details on this can be found here (TODO: link to lexicon page).
Most SDKs have a nice wrapper for this though, In this example we will just print each repo operation in each event we receive. These operations are things like "create post", "create like", "delete follow" and so on.
- Go
rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
fmt.Println("Event from ", evt.Repo)
for _, op := range evt.Ops {
fmt.Printf(" - %s record %s\n", op.Action, op.Path)
}
return nil
},
}
sched := sequential.NewScheduler("myfirehose", rsc.EventHandler)
events.HandleRepoStream(context.Background(), con, sched)
In this snippet we set up a sequential "scheduler", which handles all events sequentially in order. Other schedulers run event handling in parallel, or do limited concurrency based on who the event is for.
Once we have a scheduler, we call into HandleRepoStream
which does the actual
decoding of the data coming over the websocket and calls into the event handler
we wrote.