This post will describe how to take a source stream and create a new stream for every event the source emits. One application for this is to combine or rather fill-in data from a Realtime database such as Cloud Firestore.
Imagine an online-shop with an article overview that allows to apply some filters. Changes to the filter setting are propagated as a stream and applied to a database-query which return a stream (to receive updates if data encompassed by this query changes). For every change to the filter you need to do two things:
- Cancel the subscription to the old query
- Create a subscription to the new query
But what happens if the user goes to a different page? Now you also need to make sure that cancelling the subscription to the result stream cancels all subscriptions in the chain.
All of the above can be achieved by implementing a custom
StreamTransformer that takes of updating the data stream and makes sure there aren't any dangling subscriptions left behind and can be reused wherever it's needed.
Now that the stage is set, let’s have a look at some code!
First we need class to hold the functionality. This class should extend
StreamTransformerBase so it can be used with
Stream.transform. It also needs a function to handle the transform.
Now we need to implement
StreamTransformerBase. First we need a
StreamController to build our target stream and then two variables to keep track of the source and target subscription:
Next we’ll implement the
StreamController. In the
onListen function we start listening to the source stream. On every source event the old target subscription needs to be cancelled and then recreated using the
handleTransform function we got through the constructor earlier. Events and errors from the target stream are simply passed to the respective controller functions. I've chosen to pass all errors from the source stream to the controller as well but you could also drop or log them instead.
Finally we close the controller when the source stream is done and pass through all actions on the controller stream to both source and and target stream. This ensures that all internal subscriptions are cancelled when the end consumer cancels the subscription.
StreamController looks like this:
As the last step we just need to return the stream from
bind, resulting in this class:
Now this transformer can simply be used. The following example is based on the scenario described at the beginning of the post: each option of a stream of filter options from the UI needs to be turned into a query with these options applied.
And that’s it. Now we have a stream transform that’ll turn every event of a stream into a new stream with a simple transformer function and no dangling subscriptions.
Thanks for reading!