Documentation

parallel.pool.DataQueue

Class that enables sending and listening for data between client and workers

Description

parallel.pool.DataQueueenables sending data or messages from workers back to the client in a parallel pool while a computation is carried out. For example, you can get intermediate values and an indication of the progress of the computation.

To send data from a parallel pool worker back to the client, first construct aDataQueuein the client. Pass thisDataQueueinto aparfor-loop or other parallel language construct, such asspmd. From the workers, callsendto send data back to the client. At the client, register a function to be called each time data is received by usingafterEach.

  • You can callsendfrom the process that calls the constructor, if required.

  • You can construct the queue on the workers and send it back to the client to enable communication in the reverse direction. However, you cannot send a queue from one worker to another. Usespmd,labSend, orlabReceiveinstead.

  • Unlike all other handle objects,DataQueueinstances do remain connected when they are sent to workers.

Construction

q= parallel.pool.DataQueue

The constructor for aDataQueuetakes no arguments and returns an object that can be used to send or listen for messages (or data) from different workers. You call the constructor only in the process where you want to receive the data. In the usual workflow, the workers should not be calling the constructor, but should be handed an existingDataQueueinstance instead.

Properties

collapse all

依赖属性表明的队列many items of data are waiting to be removed from the queue.

q = parallel.pool.DataQueue;% No messages in queue because nothing has been sent.q.QueueLength
ans = 0
q.send('sending a message')% Now QueueLength = 1 because one message has been sent.q.QueueLength
ans = 1
% Add a callback to process the queue.listener = q.afterEach(@disp);
sending a message
% Now QueueLength = 0 because there are no more pending messages.q.QueueLength
ans = 0
% Data sent now is immediately processed by the callback so that QueueLength remains 0.q.send('sending message 2') q.QueueLength
sending message 2 ans = 0
% Deleting all callback listeners causes messages to build up in the queue again.delete(listener) q.send('sending message 3') q.QueueLength
ans = 1

Methods

A parallel.pool.DataQueue object has the following methods.

afterEach Define a function to call when new data is received on a DataQueue
send Send data from worker to client using a data queue

Copy Semantics

Handle. To learn how handle classes affect copy operations, seeCopying Objects(MATLAB).

Examples

collapse all

Construct aDataQueue, and callafterEach.

q = parallel.pool.DataQueue; afterEach(q, @disp);
Start aparfor-loop, and send a message. The pending message is passed to theafterEachfunction, in this example@disp.

parfori = 1:3 send(q, i);end;
1 2 3

For more details on listening for data using aDataQueue, seeafterEach.

创建一个DataQueue, and useafterEachto specify the function to execute each time the queue receives data. This example calls a subfunction that updates the wait bar.

创建一个parfor-loop to carry out a computationally demanding task in MATLAB®. Usesendto send some dummy data on each iteration of theparfor-loop. When the queue receives the data,afterEachcallsnUpdateWaitbarin the client MATLAB, and you can observe the wait bar progress.

functiona = parforWaitbar D = parallel.pool.DataQueue; h = waitbar(0,'Please wait ...'); afterEach(D, @nUpdateWaitbar); N = 200; p = 1;parfori = 1:N a(i) = max(abs(eig(rand(400)))); send(D, i);endfunctionnUpdateWaitbar(~) waitbar(p/N, h); p = p + 1;endend

If you callafterEachand there are items on the queue waiting to be dispatched, these items are immediately dispatched to the function handle specified byafterEach. CallafterEachbefore sending data to the queue, to ensure that onsend, the function handle@dispis called.

Construct aDataQueueand callafterEach.

q = parallel.pool.DataQueue; afterEach(q, @disp);
If you then send messages to the queue, each message is passed to the function handle specified byafterEachimmediately.

parfori = 1:3 send(q, i);endsend(q, 0);
1 3 2 0

If you send the data to the queue and then callafterEach, each of the pending messages are passed to the function handle specified byafterEach.

q = parallel.pool.DataQueue;parfori = 1:3 send(q, i);endafterEach(q, @disp);
3 1 2

Introduced in R2017a

Was this topic helpful?