在 “桥接现有 .NET 事件”中,我们已将现有 .NET 事件转换为可观察的序列以订阅它们。 在本主题中,我们将介绍可观测序列作为 IObservable<T> 对象的一类性质,其中泛型 LINQ 运算符由 Rx 程序集提供以操作这些对象。 大多数运算符采用可观测序列并对其执行一些逻辑,并输出另一个可观测序列。 此外,从代码示例中可以看到,甚至可以将源序列上的多个运算符链接在一起,以便根据确切要求调整生成的序列。
使用不同的运算符
在前面的主题中,我们已使用 Create 和 Generate 运算符来创建和返回简单序列。 我们还使用 FromEventPattern 运算符将现有 .NET 事件转换为可观察序列。 在本主题中,我们将使用可观测类型的其他静态 LINQ 运算符,以便筛选、分组和转换数据。 此类运算符将可观测序列 () 作为输入,并生成可观测序列 () 作为输出。
组合不同的序列
在本部分中,我们将检查一些运算符,这些运算符将各种可观测序列组合成单个可观测序列。 请注意,当我们组合序列时,不会转换数据。
在下面的示例中,我们使用 Concat 运算符将两个序列合并为一个序列并订阅该序列。 出于说明目的,我们将使用非常简单的 Range (x, y) 运算符创建一个整数序列,该序列以 x 开头,随后生成 y 序列。
var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();
请注意,生成的序列为 1,2,3,1,2,3
。 这是因为使用 Concat 运算符时,第 2 个序列 (source2) 将不处于活动状态,直到第 1 个序列 (source1
) 完成推送其所有值。 只有在 source1
完成后,才会 source2
开始将值推送到生成的序列。 然后,订阅服务器将从结果序列中获取所有值。
将此与 Merge 运算符进行比较。 如果运行以下示例代码,将获得 1,1,2,2,3,3
。 这是因为这两个序列同时处于活动状态,并且值在源中出现时被推送出去。 仅当最后一个源序列已完成推送值时,生成的序列才会完成。
请注意,若要使 Merge 正常工作,所有源可观测序列都需要属于同一类型的 IObservable<T>。 生成的序列的类型为 IObservable<T>。 如果在 source1
序列中间生成 OnError,则生成的序列将立即完成。
var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();
另一个比较可以使用 Catch 运算符完成。 在这种情况下,如果 source1
完成且没有任何错误,则 source2
不会启动。 因此,如果运行以下示例代码,则只会source2
因为1,2,3
忽略生成4,5,6
) (。
var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();
最后,让我们看看 OnErrorResumeNext。 即使source1
由于错误而无法完成,此运算符也会继续source2
。 在下面的示例中,即使source1
表示使用 Throw 运算符) 以异常 (终止的序列,订阅服务器也会接收由 发布的source2
(1,2,3
) 的值。 因此,如果希望任一源序列产生任何错误,则使用 OnErrorResumeNext 保证订阅者仍会收到一些值是更安全的。
var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
.Subscribe(Console.WriteLine);
Console.ReadLine();
请注意,要使所有这些组合运算符正常工作,所有可观测序列都需要属于同一类型的 T。
投影
Select 运算符可以将可观测序列的每个元素转换为另一种形式。
在下面的示例中,我们分别将一系列整数投影为长度为 n 的字符串。
var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();
在以下示例(我们在桥接现有 .NET 事件主题中看到的 .NET 事件 转换示例的扩展)中,我们使用 Select 运算符将 IEventPattern<MouseEventArgs> 数据类型投影为 Point 类型。 通过这种方式,我们将鼠标移动事件序列转换为可以进一步分析和操作的数据类型,如下一个“筛选”部分所示。
var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);
最后,让我们看一下 SelectMany 运算符。 SelectMany 运算符具有许多重载,其中一个重载采用选择器函数参数。 此选择器函数在源可观测值推送的每个值上调用。 对于其中每个值,选择器将其投影到可观测的微型序列中。 最后,SelectMany 运算符将所有这些微型序列平展为单个结果序列,然后将该序列推送到订阅服务器。
从 SelectMany 返回的可观测项在源序列和选择器生成的所有可观测序列完成后发布 OnCompleted。 当源流中发生错误、选择器函数引发异常或在任何可观察的微型序列中发生错误时,它将触发 OnError。
在下面的示例中,我们首先创建一个源序列,该序列每 5 秒生成一个整数,并决定仅采用使用 Take 运算符) (生成的前 2 个值。 然后,我们使用 SelectMany
通过另一个 序列 {100, 101, 102}
来投影其中每个整数。 通过这样做,将生成两个微型可观测序列: {100, 101, 102}
和 {100, 101, 102}
。 它们最终平展为 的单个整数 {100, 101, 102, 100, 101, 102}
流,并推送到观察者。
var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);
var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
ex => Console.WriteLine("Error : {0}", ex.ToString()),
() => Console.WriteLine("Completed"));
Console.ReadKey();
筛选
在下面的示例中,我们使用 Generate 运算符创建一个简单的可观测数字序列。 Generate 运算符具有多个重载。 在本示例中,示例采用初始状态 (0) ,一个终止 (小于 10 次) 的条件函数,迭代器 (+1) ,结果选择器 (当前值) 的平方函数。 ,并使用 Where 和 Select 运算符仅打印小于 15 的那些。
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
where n < 5
select n;
source.Subscribe(x => {Console.WriteLine(x);}); // output is 0, 1, 4, 9
Console.ReadKey();
以下示例是前面在本主题中看到的投影示例的扩展。 在该示例中,我们已使用 Select 运算符将 IEventPattern<MouseEventArgs> 数据类型投影为 Point 类型。 在下面的示例中,我们使用 Where 和 Select 运算符仅选取我们感兴趣的鼠标移动。 在本例中,我们将筛选鼠标移动到第一个双函数 (,其中 x 和 y 坐标相等) 。
var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
select evt.EventArgs.Location;
var overfirstbisector = from pos in points
where pos.X == pos.Y
select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);
基于时间的操作
可以使用 Buffer 运算符执行基于时间的操作。
缓冲可观察序列意味着可观测序列的值根据指定的时间跨度或计数阈值放入缓冲区中。 当你期望序列推送大量数据,而订阅服务器没有资源来处理这些值时,这尤其有用。 通过基于时间或计数缓冲结果,并且仅在超出条件 (或源序列完成) 时返回值序列,订阅者可以按自己的节奏处理 OnNext 调用。
在下面的示例中,我们首先为每秒创建一个简单的整数序列。 然后,我们使用 Buffer 运算符并指定每个缓冲区将保存序列中的 5 个项。 当缓冲区已满时,将调用 OnNext。 然后,我们使用 Sum 运算符计算缓冲区的总和。 缓冲区会自动刷新,另一个周期开始。 打印输出将是 10, 35, 60…
10=0+1+2+3+4、35=5+6+7+8+9 等。
var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();
还可以创建具有指定时间跨度的缓冲区。 在以下示例中,缓冲区将保存累积了 3 秒的项。 打印输出将为 3、12、21... 其中 3=0+1+2、12=3+4+5 等。
var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));
Console.ReadKey();
请注意,如果使用缓冲区或窗口,则必须在筛选序列之前确保序列不为空。
按类别划分的 LINQ 运算符
按类别划分的 LINQ 运算符主题列出了可观测类型按类别实现的所有主要 LINQ 运算符;具体而言:创建、转换、合并、函数、数学、时间、异常、杂项、选择和基元。