How to make a lightweight `Replay` operator that can be subscribed only once?(如何制作一个只能订阅一次的轻量级`Replay`运算符?)
问题描述
在各种情况下,我都希望RxReplay
操作符能够缓冲传入的通知,在第一次订阅时同步重放其缓冲区,并在第一次订阅之后停止缓冲。这个轻量级Replay
运算符应该只能为一个订阅者提供服务。可以找到这样一个操作符的一个用例here,在第一次订阅之后继续缓冲只是浪费资源。出于演示目的,我将在这里展示一个我希望可以避免的有问题的行为的人为示例:
可观测对象总共生成800,000个值。Replay
机制立即连接到源,并且在其完成之前订阅了一半。
输出:
订阅后内存使用量持续增长。这是意料之中的,因为所有值都被缓冲,并在重放的可观察对象的整个生命周期内保持缓冲。理想的行为是在订阅后内存使用量骤降。在传播缓冲值之后,应该丢弃该缓冲区,因为在订阅之后它就没有用处了。另外,第二个订阅(await observable.Count()
)应该失败,并显示InvalidOperationException
。在可观察对象失去Replay
功能后,我不想再次订阅它。
下面是我试图实现的自定义ReplayOnce
操作符的存根。有谁知道如何实施它吗?
Replay
运算符具有可按需偶尔清空的缓冲区的。我的问题不同,因为我希望在订阅后完全禁用缓冲区,并且不再开始增长。推荐答案
我想出了ReplayOnce
运算符的实现,它基于组播自定义ReplayOnceSubject<T>
。此主题最初由ReplaySubject<T>
支持,在第一次(且仅允许)订阅期间使用正常的Subject<T>
进行切换:
replaySubject.Subscribe(subject)
行确保不仅将缓冲值传播到观察者,而且还将传播任何可能的OnError
/OnCompleted
通知。订阅后,ReplaySubject
不再被引用,并且有资格进行垃圾回收。
这篇关于如何制作一个只能订阅一次的轻量级`Replay`运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!