How to repeat an observable sequence until it#39;s empty?(如何重复一个可观察到的序列,直到它为空?)
问题描述
我有一个IObservable<int>
序列,它在前9次订阅时发出单个项目,在进一步订阅时不发出任何内容并立即完成:
Repeat
运算符:
问题是该查询永远不会完成。Repeat
将一次又一次地订阅source
序列。更糟糕的是,当source
停止生成元素时,查询进入一个无情的死循环,劫持了CPU的一个核心(我的四核机器报告连续的CPU利用率为25%)。以下是上述代码的输出:
Repeat
运算符的一个变体,它在source
停止生成元素时停止重复source
。通过搜索内置的Rx运算符,我可以看到RepeatWhen
运算符,但显然这只能用于更快地启动下一次重复,而不是完全停止重复:
我不能100%确定,因为handler
参数的描述相当模糊,所以我可能遗漏了一些东西:
为每个观察者调用的函数,并获取可观察序列对象。它应该返回任意项的可观测项,响应于从源可观测项接收到完成信号,应该用信号通知该任意项。如果此可观察到的信号为终端事件,则序列将改为使用该信号终止。
我的问题是:如何实现重复source
序列直到它为空的RepeatUntilEmpty
运算符?是否可以基于上述RepeatWhen
运算符实现?如果不是,我是否应该从低级(Observable.Create
)开始重新实现基本的Repeat
功能?或者,我可以使用Materialize
运算符,以某种方式将其与现有的Repeat
结合使用吗?我现在没有主意了。我愿意接受任何一种解决方案,无论是高水平还是低水平。
将我的原始代码中的Repeat
替换为RepeatUntilEmpty
,应该可以在发出9
元素后立即完成查询。
推荐答案
您可以使用Materialize()
/Dematerialize()
根据从Repeat()
语句收到的通知构建您自己的notifications序列。通知序列如下所示:
因此,我们寻找两个连续的OnCompleted
通知。如果没有找到,我们仍然返回收到的OnNext
通知,否则返回OnCompleted
通知。代码可能如下所示:
RepeatUntilEmpty()
方法如下:
这将生成以下输出:
我还没有测试该代码如何处理OnError()
通知,因此您可能需要检查一下。此外,我还遇到了一些问题,即source.Materialize().Repeat()
部分将从原始源读取更多数据,尽管它后来决定停止可观测数据。特别是使用Do().Wait()
语句时,我有时会收到其他输出,如:
这可能也是您的问题,因为Repeat()
部分仍在尝试读取/连接空的可观察对象。
这篇关于如何重复一个可观察到的序列,直到它为空?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!