Section121.1:ActionBlock<T>
(foreach)
This class can be thought of logically as a buffer for data to be processed combined with tasks for processing that data,withthe“dataflowblock”managingboth.Initsmostbasicusage,wecaninstantiateanActionBlockand“post”data to it; the delegate provided at the ActionBlock’sconstruction will be executed asynchronously for every piece of data posted.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-111.png?resize=598%2C151&ssl=1)
SynchronousComputation
varab=newActionBlock<TInput>(i=>
{
Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);
ThrottlingAsynchronousDownloadstoatmost5concurrently
vardownloader=newActionBlock<string>(asyncurl=>
{
byte[]imageData=awaitDownloadAsync(url);
Process(imageData);
},newDataflowBlockOptions{MaxDegreeOfParallelism=5});
downloader.Post(“http://website.com/path/to/images”);
downloader.Post(“http://another-website.com/path/to/images”);
IntroductiontoTPLDataflowbyStephenToub
Section121.2:BroadcastBlock<T>
(Copyanitemandsendthecopiestoeveryblockthatit’slinkedto)
UnlikeBufferBlock,BroadcastBlock’smissioninlifeistoenablealltargetslinkedfromtheblocktogetacopyof every element published, continually overwriting the “current”value with those propagated to it.
Additionally,unlikeBufferBlock,BroadcastBlockdoesn’tholdontodataunnecessarily.Afteraparticulardatumhas been offered to all targets, that element will be overwritten by whatever piece of data is next in line (as with all dataflow blocks, messages are handled in FIFO order). That element will be offered to all targets, and so on.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-112.png?resize=736%2C155&ssl=1)
AsynchronousProducer/ConsumerwithaThrottledProducer
varui=TaskScheduler.FromCurrentSynchronizationContext();
varbb=newBroadcastBlock<ImageData>(i=>i);
varsaveToDiskBlock=newActionBlock<ImageData>(item=>item.Image.Save(it
em.Path)
);
varshowInUiBlock=newActionBlock<ImageData>(item=>imagePanel.AddIma
ge(item.Image),
newDataflowBlockOptions{TaskScheduler=TaskScheduler.FromCurrentSynchronizationContext()}
);
bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);
ExposingStatusfromanAgent
publicclassMyAgent
{
publicISourceBlock<string>Status{get;privateset;}
publicMyAgent()
{
Status=newBroadcastBlock<string>(); Run();
}
privatevoidRun()
{
Status.Post(“Starting”);
Status.Post(“Doingcoolstuff”);
…
Status.Post(“Done”);
}
}
IntroductiontoTPLDataflowbyStephenToub
(FIFOQueue:Thedatathatcomesinisthedatathatgoesout)
Inshort,BufferBlockprovidesanunboundedorboundedbufferforstoringinstancesofT.
You can “post”instances of T to the block, which cause the data being posted to be stored in a first-in-first-out (FIFO) order by the block.
You can “receive”from the block, which allows you to synchronously or asynchronously obtain instances of T previously stored or available in the future (again, FIFO).
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-113.png?resize=656%2C160&ssl=1)
AsynchronousProducer/ConsumerwithaThrottledProducer
//Hand-offthroughaboundedBufferBlock<T>
privatestaticBufferBlock<int>_Buffer=newBufferBlock<int(newDataflow
BlockOptions{BoundedCapacity=10});
//Producer
privatestaticasyncvoidProducer()
{
while(true)
{
await_Buffer.SendAsync(Produce());
}
}
//Consumer
privatestaticasyncTaskConsumer()
{
while(true)
{
Process(await_Buffer.ReceiveAsync());
}
}
//StarttheProducerandConsumer
privatestaticasyncTaskRun()
{
awaitTask.WhenAll(Producer(),Consumer());
}
IntroductiontoTPLDataflowbyStephenToub
Section121.4:JoinBlock<T1,T2,…>
(Collects2-3inputsandcombinesthemintoaTuple)
Like BatchBlock, JoinBlock<T1, T2, …> is able to group data from multiple data sources. In fact, that’sJoinBlock<T1, T2, …>’sprimary purpose.
Forexample,aJoinBlock<string,double,int>isanISourceBlock<Tuple<string,double,int>>.
AswithBatchBlock,JoinBlock<T1,T2,…>iscapableofoperatinginbothgreedyandnon-greedymode.
- Inthedefaultgreedymode,alldataofferedtotargetsareaccepted,eveniftheothertargetdoesn’thavethe necessary data with which to form a tuple.
- Innon-greedymode,theblock’stargetswillpostponedatauntilalltargetshavebeenofferedthenecessary datatocreateatuple,atwhichpointtheblockwillengageinatwo-phasecommitprotocoltoatomically retrieveallnecessaryitemsfromthesources.Thispostponementmakesitpossibleforanotherentityto consumethedatainthemeantimesoastoallowtheoverallsystemtomakeforwardprogress.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-114.png?resize=862%2C154&ssl=1)
ProcessingRequestswithaLimitedNumberofPooledObjects
varthrottle=newJoinBlock<ExpensiveObject,Request>();
for(inti=0;i<10;i++)
{
requestProcessor.Target1.Post(newExpensiveObject());
}
varprocessor=newTransform<Tuple<ExpensiveObject,Request>,ExpensiveObject>(pair=>
{
varresource=pair.Item1;
varrequest=pair.Item2;
request.ProcessWith(resource);
returnresource;
});
throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);
IntroductiontoTPLDataflowbyStephenToub
Section121.5:WriteOnceBlock<T>
(Readonly variable: Memorizesits first dataitem and passesout copies ofit as itsoutput. Ignores allother data items)
IfBufferBlockisthemostfundamentalblockinTPLDataflow,WriteOnceBlockisthesimplest.
Itstoresatmostonevalue,andoncethatvaluehasbeenset,itwillneverbereplacedoroverwritten.
YoucanthinkofWriteOnceBlockinasbeingsimilartoareadonlymembervariableinC#,exceptinsteadofonly being settable in a constructor and then being immutable, it’sonly settable once and is then immutable.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-115.png?resize=433%2C151&ssl=1)
SplittingaTask’sPotentialOutputs
publicstaticasyncvoidSplitIntoBlocks(thisTask<T>task,
outIPropagatorBlock<T>result,
outIPropagatorBlock<Exception>exception)
{
result=newWriteOnceBlock<T>(i=>i);
exception=newWriteOnceBlock<Exception>(i=>i);
try
{
result.Post(awaittask);
}
catch(Exceptionex)
{
exception.Post(ex);
}
}
IntroductiontoTPLDataflowbyStephenToub
Section121.6:BatchedJoinBlock<T1,T2,…>
(Collectsacertainnumberoftotalitemsfrom2-3inputsandgroupsthemintoaTupleofcollectionsofdataitems) BatchedJoinBlock<T1, T2,…> is in a sense a combination of BatchBlock and JoinBlock<T1, T2,…>.
WhereasJoinBlock<T1,T2,…>isusedtoaggregateoneinputfromeachtargetintoatuple,andBatchBlockisused toaggregateNinputsintoacollection,BatchedJoinBlock<T1,T2,…>isusedtogatherNinputsfromacrossallofthe targets into tuples of collections.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-116.png?resize=806%2C148&ssl=1)
Scatter/Gather
Consider a scatter/gather problem where N operations are launched, some of which may succeed and produce string outputs, and others of which may fail and produce Exceptions.
varbatchedJoin=newBatchedJoinBlock<string,Exception>(10);
for(inti=0;i<10;i++)
{
Task.Factory.StartNew(()=>{
try{batchedJoin.Target1.Post(DoWork());}
catch(Exceptionex){batchJoin.Target2.Post(ex);}
});
}
varresults=awaitbatchedJoin.ReceiveAsync();
foreach(stringsinresults.Item1)
{
Console.WriteLine(s);
}
foreach(Exceptioneinresults.Item2)
{
Console.WriteLine(e);
}
IntroductiontoTPLDataflowbyStephenToub
Section121.7:TransformBlock<TInput,TOutput>
(Select,one-to-one)
AswithActionBlock,TransformBlock<TInput,TOutput>enablestheexecutionofadelegatetoperformsomeaction foreachinputdatum;unlikewithActionBlock,thisprocessinghasanoutput.Thisdelegatecanbea Func<TInput, TOutput>, in which case processing of that element is considered completed when the delegate returns, or it can be a Func<TInput,Task>, in which case processing of that element is considered completed not when the delegate returns but when the returned Task completes. For those familiar with LINQ, it’ssomewhat similar to Select() in that it takes an input, transforms that input in some manner, and then produces an output.
Bydefault,TransformBlock<TInput,TOutput>processesitsdatasequentiallywithaMaxDegreeOfParallelismequal to 1. In addition to receiving buffered input and processing it, this block will take all of its processed output and buffer that as well (data that has not been processed, and data that has been processed).
Ithas2tasks:Onetoprocessthedata,andonetopushdatatothenextblock.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/p-6.png?resize=882%2C146&ssl=1)
AConcurrent Pipeline
varcompressor=newTransformBlock<byte[],byte[]>(input=>Compress(input)); varencryptor=newTransformBlock<byte[],byte[]>(input=>Encrypt(input));
compressor.LinkTo(Encryptor);
IntroductiontoTPLDataflowbyStephenToub
Section121.8:TransformManyBlock<TInput,TOutput>
(SelectMany,1-m:Theresultsofthismappingare “flattened”,justlikeLINQ’sSelectMany) TransformManyBlock<TInput,TOutput>isverysimilartoTransformBlock<TInput,TOutput>.
The key difference is that whereas a TransformBlock<TInput, TOutput> produces one and only one output for each input, TransformManyBlock<TInput, TOutput> produces any number (zero or more) outputs for each input. As with ActionBlock and TransformBlock<TInput, TOutput>, this processing may be specified using delegates, both for synchronous and asynchronous processing.
AFunc<TInput,IEnumerable>isusedforsynchronous,andaFunc<TInput,Task<IEnumerable>>isusedfor asynchronous.AswithbothActionBlockandTransformBlock<TInput,TOutput>,TransformManyBlock<TInput, TOutput> defaults to sequential processing, but may be configured otherwise.
Themappingdelegateretunsacollectionofitems,whichareinsertedindividuallyintotheoutputbuffer.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-117.png?resize=884%2C178&ssl=1)
vardownloader=newTransformManyBlock<string,string>(asyncurl=>
{
Console.WriteLine(“Downloading“+url); try
{
returnParseLinks(awaitDownloadContents(url));
}
catch{}
returnEnumerable.Empty<string>();
});
downloader.LinkTo(downloader);
ExpandinganEnumerableIntoItsConstituentElements
varexpanded=newTransformManyBlock<T[],T>(array=>array);
Filteringbygoingfrom1to0or1 elements
publicIPropagatorBlock<T>CreateFilteredBuffer<T>(Predicate<T>filter)
{
returnnewTransformManyBlock<T,T>(item=>
filter(item)?new[]{item}:Enumerable.Empty<T>());
}
IntroductiontoTPLDataflowbyStephenToub
(Groupsacertainnumberofsequentialdataitemsintocollectionsofdataitems)
BatchBlockcombinesNsingleitemsintoonebatchitem,representedasanarrayofelements.Aninstanceis createdwithaspecificbatchsize,andtheblockthencreatesabatchassoonasit’sreceivedthatnumberof elements, asynchronously outputting the batch to the output buffer.
BatchBlockiscapableofexecutinginbothgreedyandnon-greedymodes.
- In the default greedy mode, all messages offered to the block from any number of sources are accepted and buffered to be converted into batches.
- In non-greedy mode, all messages are postponed from sources until enough sources have offered messagestotheblocktocreateabatch.Thus,aBatchBlockcanbeusedtoreceive1elementfrom each of N sources, N elements from 1 source, and a myriad of options in between.
![](https://i0.wp.com/venturesoftwares.in/wp-content/uploads/image-118.png?resize=870%2C156&ssl=1)
BatchingRequestsintogroupsof100toSubmittoaDatabase
varbatchRequests=newBatchBlock<Request>(batchSize:100);
varsendToDb=newActionBlock<Request[]>(reqs=>SubmitToDatabase(reqs));
batchRequests.LinkTo(sendToDb);
Creatingabatchonceasecond
varbatch=newBatchBlock<T>(batchSize:Int32.MaxValue);
newTimer(()=>{batch.TriggerBatch();}).Change(1000,1000);
IntroductiontoTPLDataflowbyStephenToub
About us and this blog
We are a digital marketing company with a focus on helping our customers achieve great results across several key areas.
Request a free quote
We offer professional SEO services that help websites increase their organic search score drastically in order to compete for the highest rankings even when it comes to highly competitive keywords.