Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом документе описывается, как отменить связь целевого блока потока данных от его источника.
Примечание.
Библиотека потоков данных TPL (пространство имен System.Threading.Tasks.Dataflow) не распространяется с помощью .NET. Чтобы установить пространство имен System.Threading.Tasks.Dataflow в Visual Studio, откройте проект, выберите Управление пакетами NuGet в меню Project и найдите пакет System.Threading.Tasks.Dataflow
в Интернете. Кроме того, чтобы установить его с помощью cli .NET Core, запустите dotnet add package System.Threading.Tasks.Dataflow
.
Пример
В следующем примере создаются три объекта TransformBlock<TInput,TOutput>, каждый из которых вызывает метод TrySolution
для вычисления значения. В этом примере требуется только результат первого вызова TrySolution
для завершения.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
{
// Receives the value from the first provided source that has
// a message.
public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
{
// Create a WriteOnceBlock<T> object and link it to each source block.
var writeOnceBlock = new WriteOnceBlock<T>(e => e);
foreach (var source in sources)
{
// Setting MaxMessages to one instructs
// the source block to unlink from the WriteOnceBlock<T> object
// after offering the WriteOnceBlock<T> object one message.
source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
}
// Return the first value that is offered to the WriteOnceBlock object.
return writeOnceBlock.Receive();
}
// Demonstrates a function that takes several seconds to produce a result.
static int TrySolution(int n, CancellationToken ct)
{
// Simulate a lengthy operation that completes within three seconds
// or when the provided CancellationToken object is cancelled.
SpinWait.SpinUntil(() => ct.IsCancellationRequested,
new Random().Next(3000));
// Return a value.
return n + 42;
}
static void Main(string[] args)
{
// Create a shared CancellationTokenSource object to enable the
// TrySolution method to be cancelled.
var cts = new CancellationTokenSource();
// Create three TransformBlock<int, int> objects.
// Each TransformBlock<int, int> object calls the TrySolution method.
Func<int, int> action = n => TrySolution(n, cts.Token);
var trySolution1 = new TransformBlock<int, int>(action);
var trySolution2 = new TransformBlock<int, int>(action);
var trySolution3 = new TransformBlock<int, int>(action);
// Post data to each TransformBlock<int, int> object.
trySolution1.Post(11);
trySolution2.Post(21);
trySolution3.Post(31);
// Call the ReceiveFromAny<T> method to receive the result from the
// first TransformBlock<int, int> object to finish.
int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);
// Cancel all calls to TrySolution that are still active.
cts.Cancel();
// Print the result to the console.
Console.WriteLine($"The solution is {result}.");
cts.Dispose();
}
}
/* Sample output:
The solution is 53.
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
' Receives the value from the first provided source that has
' a message.
Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
' Create a WriteOnceBlock<T> object and link it to each source block.
Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
For Each source In sources
' Setting MaxMessages to one instructs
' the source block to unlink from the WriteOnceBlock<T> object
' after offering the WriteOnceBlock<T> object one message.
source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
Next source
' Return the first value that is offered to the WriteOnceBlock object.
Return writeOnceBlock.Receive()
End Function
' Demonstrates a function that takes several seconds to produce a result.
Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
' Simulate a lengthy operation that completes within three seconds
' or when the provided CancellationToken object is cancelled.
SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))
' Return a value.
Return n + 42
End Function
Shared Sub Main(ByVal args() As String)
' Create a shared CancellationTokenSource object to enable the
' TrySolution method to be cancelled.
Dim cts = New CancellationTokenSource()
' Create three TransformBlock<int, int> objects.
' Each TransformBlock<int, int> object calls the TrySolution method.
Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)
' Post data to each TransformBlock<int, int> object.
trySolution1.Post(11)
trySolution2.Post(21)
trySolution3.Post(31)
' Call the ReceiveFromAny<T> method to receive the result from the
' first TransformBlock<int, int> object to finish.
Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)
' Cancel all calls to TrySolution that are still active.
cts.Cancel()
' Print the result to the console.
Console.WriteLine("The solution is {0}.", result)
cts.Dispose()
End Sub
End Class
' Sample output:
'The solution is 53.
'
Чтобы получить значение от первого объекта TransformBlock<TInput,TOutput>, завершающего работу, в этом примере определяется метод ReceiveFromAny(T)
. Метод ReceiveFromAny(T)
принимает массив объектов ISourceBlock<TOutput> и связывает каждый из этих объектов с объектом WriteOnceBlock<T>. При использовании метода LinkTo для связывания блока потока данных источника с целевым блоком источник передает сообщения в целевой объект по мере того, как данные становятся доступными. Так как класс WriteOnceBlock<T> принимает только первое сообщение, которое оно предлагается, метод ReceiveFromAny(T)
создает результат путем вызова метода Receive. Это создает первое сообщение, предлагаемое объекту WriteOnceBlock<T>. Метод LinkTo имеет перегруженную версию, которая принимает объект DataflowLinkOptions с свойством MaxMessages, которое, если задано значение 1
, указывает исходному блоку отменить связь с целевым объектом после того, как целевой объект получает одно сообщение от источника. Важно, чтобы объект WriteOnceBlock<T> не связывается со своими источниками, так как связь между массивом источников и объектом WriteOnceBlock<T> больше не требуется после того, как объект WriteOnceBlock<T> получает сообщение.
Чтобы разрешить остальным вызовам TrySolution
завершиться после того, как один из них вычисляет значение, метод TrySolution
принимает объект CancellationToken, который будет отменен после того как вызов ReceiveFromAny(T)
завершится. Метод SpinUntil возвращается при отмене этого объекта CancellationToken.