Chapter121:TaskParallelLibrary(TPL) DataflowConstructs

No Comments

Section121.1:ActionBlock<T>

(foreach)

This class can be thought of logically as a buffer for data to be processed combined with tasks for processing that data,withthe“dataflowblock”managingboth.Initsmostbasicusage,wecaninstantiateanActionBlockand“post”data to it; the delegate provided at the ActionBlock’sconstruction will be executed asynchronously for every piece of data posted.

SynchronousComputation

varab=newActionBlock<TInput>(i=>

{

Compute(i);

});

ab.Post(1);

ab.Post(2);

ab.Post(3);

ThrottlingAsynchronousDownloadstoatmost5concurrently

vardownloader=newActionBlock<string>(asyncurl=>

{

byte[]imageData=awaitDownloadAsync(url);

Process(imageData);

},newDataflowBlockOptions{MaxDegreeOfParallelism=5});

downloader.Post(“http://website.com/path/to/images”);

downloader.Post(“http://another-website.com/path/to/images”);

IntroductiontoTPLDataflowbyStephenToub

Section121.2:BroadcastBlock<T>

(Copyanitemandsendthecopiestoeveryblockthatit’slinkedto)

UnlikeBufferBlock,BroadcastBlock’smissioninlifeistoenablealltargetslinkedfromtheblocktogetacopyof every element published, continually overwriting the “current”value with those propagated to it.

Additionally,unlikeBufferBlock,BroadcastBlockdoesn’tholdontodataunnecessarily.Afteraparticulardatumhas been offered to all targets, that element will be overwritten by whatever piece of data is next in line (as with all dataflow blocks, messages are handled in FIFO order). That element will be offered to all targets, and so on.

AsynchronousProducer/ConsumerwithaThrottledProducer

varui=TaskScheduler.FromCurrentSynchronizationContext();

varbb=newBroadcastBlock<ImageData>(i=>i);

varsaveToDiskBlock=newActionBlock<ImageData>(item=>item.Image.Save(it

em.Path)

);

varshowInUiBlock=newActionBlock<ImageData>(item=>imagePanel.AddIma

ge(item.Image),

newDataflowBlockOptions{TaskScheduler=TaskScheduler.FromCurrentSynchronizationContext()}

);

bb.LinkTo(saveToDiskBlock);

bb.LinkTo(showInUiBlock);

ExposingStatusfromanAgent

publicclassMyAgent

{

publicISourceBlock<string>Status{get;privateset;}

publicMyAgent()

{

Status=newBroadcastBlock<string>(); Run();

}

privatevoidRun()

{

Status.Post(“Starting”);

Status.Post(“Doingcoolstuff”);

Status.Post(“Done”);

}

}

IntroductiontoTPLDataflowbyStephenToub

Section121.3:BufferBlock<T>

(FIFOQueue:Thedatathatcomesinisthedatathatgoesout)

Inshort,BufferBlockprovidesanunboundedorboundedbufferforstoringinstancesofT.

You can “post”instances of T to the block, which cause the data being posted to be stored in a first-in-first-out (FIFO) order by the block.

You can “receive”from the block, which allows you to synchronously or asynchronously obtain instances of T previously stored or available in the future (again, FIFO).

AsynchronousProducer/ConsumerwithaThrottledProducer

//Hand-offthroughaboundedBufferBlock<T>

privatestaticBufferBlock<int>_Buffer=newBufferBlock<int(newDataflow

BlockOptions{BoundedCapacity=10});

//Producer

privatestaticasyncvoidProducer()

{

while(true)

{

await_Buffer.SendAsync(Produce());

}

}

//Consumer

privatestaticasyncTaskConsumer()

{

while(true)

{

Process(await_Buffer.ReceiveAsync());

}

}

//StarttheProducerandConsumer

privatestaticasyncTaskRun()

{

awaitTask.WhenAll(Producer(),Consumer());

}

IntroductiontoTPLDataflowbyStephenToub

Section121.4:JoinBlock<T1,T2,…>

(Collects2-3inputsandcombinesthemintoaTuple)

Like BatchBlock, JoinBlock<T1, T2, …> is able to group data from multiple data sources. In fact, that’sJoinBlock<T1, T2, …>’sprimary purpose.

Forexample,aJoinBlock<string,double,int>isanISourceBlock<Tuple<string,double,int>>.

AswithBatchBlock,JoinBlock<T1,T2,…>iscapableofoperatinginbothgreedyandnon-greedymode.

  • Inthedefaultgreedymode,alldataofferedtotargetsareaccepted,eveniftheothertargetdoesn’thavethe necessary data with which to form a tuple.
  • Innon-greedymode,theblock’stargetswillpostponedatauntilalltargetshavebeenofferedthenecessary datatocreateatuple,atwhichpointtheblockwillengageinatwo-phasecommitprotocoltoatomically retrieveallnecessaryitemsfromthesources.Thispostponementmakesitpossibleforanotherentityto consumethedatainthemeantimesoastoallowtheoverallsystemtomakeforwardprogress.

ProcessingRequestswithaLimitedNumberofPooledObjects

varthrottle=newJoinBlock<ExpensiveObject,Request>();

for(inti=0;i<10;i++)

{

requestProcessor.Target1.Post(newExpensiveObject());

}

varprocessor=newTransform<Tuple<ExpensiveObject,Request>,ExpensiveObject>(pair=>

{

varresource=pair.Item1;

varrequest=pair.Item2;

request.ProcessWith(resource);

returnresource;

});

throttle.LinkTo(processor);

processor.LinkTo(throttle.Target1);

IntroductiontoTPLDataflowbyStephenToub

Section121.5:WriteOnceBlock<T>

(Readonly variable: Memorizesits first dataitem and passesout copies ofit as itsoutput. Ignores allother data items)

IfBufferBlockisthemostfundamentalblockinTPLDataflow,WriteOnceBlockisthesimplest.

Itstoresatmostonevalue,andoncethatvaluehasbeenset,itwillneverbereplacedoroverwritten.

YoucanthinkofWriteOnceBlockinasbeingsimilartoareadonlymembervariableinC#,exceptinsteadofonly being settable in a constructor and then being immutable, it’sonly settable once and is then immutable.

SplittingaTask’sPotentialOutputs

publicstaticasyncvoidSplitIntoBlocks(thisTask<T>task,

outIPropagatorBlock<T>result,

outIPropagatorBlock<Exception>exception)

{

result=newWriteOnceBlock<T>(i=>i);

exception=newWriteOnceBlock<Exception>(i=>i);

try

{

result.Post(awaittask);

}

catch(Exceptionex)

{

exception.Post(ex);

}

}

IntroductiontoTPLDataflowbyStephenToub

Section121.6:BatchedJoinBlock<T1,T2,…>

(Collectsacertainnumberoftotalitemsfrom2-3inputsandgroupsthemintoaTupleofcollectionsofdataitems) BatchedJoinBlock<T1, T2,…> is in a sense a combination of BatchBlock and JoinBlock<T1, T2,…>.

WhereasJoinBlock<T1,T2,…>isusedtoaggregateoneinputfromeachtargetintoatuple,andBatchBlockisused toaggregateNinputsintoacollection,BatchedJoinBlock<T1,T2,…>isusedtogatherNinputsfromacrossallofthe targets into tuples of collections.

Scatter/Gather

Consider a scatter/gather problem where N operations are launched, some of which may succeed and produce string outputs, and others of which may fail and produce Exceptions.

varbatchedJoin=newBatchedJoinBlock<string,Exception>(10);

for(inti=0;i<10;i++)

{

Task.Factory.StartNew(()=>{

try{batchedJoin.Target1.Post(DoWork());}

catch(Exceptionex){batchJoin.Target2.Post(ex);}

});

}

varresults=awaitbatchedJoin.ReceiveAsync();

foreach(stringsinresults.Item1)

{

Console.WriteLine(s);

}

foreach(Exceptioneinresults.Item2)

{

Console.WriteLine(e);

}

IntroductiontoTPLDataflowbyStephenToub

Section121.7:TransformBlock<TInput,TOutput>

(Select,one-to-one)

AswithActionBlock,TransformBlock<TInput,TOutput>enablestheexecutionofadelegatetoperformsomeaction foreachinputdatum;unlikewithActionBlock,thisprocessinghasanoutput.Thisdelegatecanbea Func<TInput, TOutput>, in which case processing of that element is considered completed when the delegate returns, or it can be a Func<TInput,Task>, in which case processing of that element is considered completed not when the delegate returns but when the returned Task completes. For those familiar with LINQ, it’ssomewhat similar to Select() in that it takes an input, transforms that input in some manner, and then produces an output.

Bydefault,TransformBlock<TInput,TOutput>processesitsdatasequentiallywithaMaxDegreeOfParallelismequal to 1. In addition to receiving buffered input and processing it, this block will take all of its processed output and buffer that as well (data that has not been processed, and data that has been processed).

Ithas2tasks:Onetoprocessthedata,andonetopushdatatothenextblock.

AConcurrent Pipeline

varcompressor=newTransformBlock<byte[],byte[]>(input=>Compress(input)); varencryptor=newTransformBlock<byte[],byte[]>(input=>Encrypt(input));

compressor.LinkTo(Encryptor);

IntroductiontoTPLDataflowbyStephenToub

Section121.8:TransformManyBlock<TInput,TOutput>

(SelectMany,1-m:Theresultsofthismappingare “flattened”,justlikeLINQ’sSelectMany) TransformManyBlock<TInput,TOutput>isverysimilartoTransformBlock<TInput,TOutput>.

The key difference is that whereas a TransformBlock<TInput, TOutput> produces one and only one output for each input, TransformManyBlock<TInput, TOutput> produces any number (zero or more) outputs for each input. As with ActionBlock and TransformBlock<TInput, TOutput>, this processing may be specified using delegates, both for synchronous and asynchronous processing.

AFunc<TInput,IEnumerable>isusedforsynchronous,andaFunc<TInput,Task<IEnumerable>>isusedfor asynchronous.AswithbothActionBlockandTransformBlock<TInput,TOutput>,TransformManyBlock<TInput, TOutput> defaults to sequential processing, but may be configured otherwise.

Themappingdelegateretunsacollectionofitems,whichareinsertedindividuallyintotheoutputbuffer.

AsynchronousWebCrawler

vardownloader=newTransformManyBlock<string,string>(asyncurl=>

{

Console.WriteLine(“Downloading“+url); try

{

returnParseLinks(awaitDownloadContents(url));

}

catch{}

returnEnumerable.Empty<string>();

});

downloader.LinkTo(downloader);

ExpandinganEnumerableIntoItsConstituentElements

varexpanded=newTransformManyBlock<T[],T>(array=>array);

Filteringbygoingfrom1to0or1 elements

publicIPropagatorBlock<T>CreateFilteredBuffer<T>(Predicate<T>filter)

{

returnnewTransformManyBlock<T,T>(item=>

filter(item)?new[]{item}:Enumerable.Empty<T>());

}

IntroductiontoTPLDataflowbyStephenToub

Section121.9:BatchBlock<T>

(Groupsacertainnumberofsequentialdataitemsintocollectionsofdataitems)

BatchBlockcombinesNsingleitemsintoonebatchitem,representedasanarrayofelements.Aninstanceis createdwithaspecificbatchsize,andtheblockthencreatesabatchassoonasit’sreceivedthatnumberof elements, asynchronously outputting the batch to the output buffer.

BatchBlockiscapableofexecutinginbothgreedyandnon-greedymodes.

  • In the default greedy mode, all messages offered to the block from any number of sources are accepted and buffered to be converted into batches.
  • In non-greedy mode, all messages are postponed from sources until enough sources have offered messagestotheblocktocreateabatch.Thus,aBatchBlockcanbeusedtoreceive1elementfrom each of N sources, N elements from 1 source, and a myriad of options in between.

BatchingRequestsintogroupsof100toSubmittoaDatabase

varbatchRequests=newBatchBlock<Request>(batchSize:100);

varsendToDb=newActionBlock<Request[]>(reqs=>SubmitToDatabase(reqs));

batchRequests.LinkTo(sendToDb);

Creatingabatchonceasecond

varbatch=newBatchBlock<T>(batchSize:Int32.MaxValue);

newTimer(()=>{batch.TriggerBatch();}).Change(1000,1000);

IntroductiontoTPLDataflowbyStephenToub

About us and this blog

We are a digital marketing company with a focus on helping our customers achieve great results across several key areas.

Request a free quote

We offer professional SEO services that help websites increase their organic search score drastically in order to compete for the highest rankings even when it comes to highly competitive keywords.

Subscribe to our newsletter!

More from our blog

See all posts

Leave a Comment