How to do setup data queue handling between app designer and parallel thread?

9 visualizaciones (últimos 30 días)
Hello community
I want to use the Parallel Pool Box in App Designer to run a parallel thread and pass information between the two entitiies.
Imagine the following use case: You want to start, pause and abort i.e. a sequence of measurements from the GUI
  • In case the start is activated the start button changes to pause button and starts in parallel the measurement sequence.
  • In case the executions is paused, the current step of the sequence is allowed to be finished.
  • In case of abort the measurement shall be stopped immediatly and the instruments gracefully turned off.
I tried to represent this in a workflow (shown at end of post- file follows as soon as upload works again). And did setup a simplified .mlapp first trying to start a parallel thread queueing information and the main code fetching that information and diplaying it in a text area. But in that form I cannot save it as .mlapp file. From what I've read so far in the forum (see list below) it seems to be an issue of handing over the app object to the parallel process. But I don't understand what I am doing wrong and thus think I missed an important part of the data queue handling. I would be glad for any hint/information/support to fix this issue and learn from that mistake.
Further information I checked prior to set up .mlapp:
Example code which can not be saved as .mlapp
classdef ExampleWorkflowAbortAsyn < matlab.apps.AppBase
% Properties that correspond to app components
properties (Access = public)
UIFigure matlab.ui.Figure
Abort2Button matlab.ui.control.StateButton
TextArea matlab.ui.control.TextArea
AbortButton matlab.ui.control.Button
StartButton matlab.ui.control.Button
end
properties (Access = public)
TCindex % Description
isPassed = false;
asycnWorker
dataQueue
clientQueue
helperClientQueue
end
methods (Static)
function receiveDataOnClient(data)
disp(data);
drawnow limitrate;
end
end
methods (Access = public)
function testExecutionMaster(app)
while strcmp(app.StartButton.Text, 'Pause')
app.isPassed = false;
updateUILog(app,sprintf('TC %i', app.TCindex));
app.TCindex = app.TCindex + 1;
updateUILog(app,sprintf('\tPrepare measurement...'));
pause(1)
updateUILog(app,sprintf('\tExecute measurement...'));
pause(1)
updateUILog(app,sprintf('\tAnalyze Measurement...'));
pause(1)
updateUILog(app,sprintf('\t Test passed :)'));
app.isPassed = true;
end
end
function abortMeasurementAction(app)
% nice to have: pop up with abort error and instructions
updateUILog(app,'User triggerd abort starts');
updateUILog(app,sprintf('\tAll devices off'));
end
function updateUILog(app,message)
textArray = app.TextArea.Value;
textArray{end+1} = message;
app.TextArea.Value = textArray;
scroll(app.TextArea, 'bottom')
% createDiaryEntry(app, message)
end
end
% Callbacks that handle component events
methods (Access = private)
% Button pushed function: StartButton
function EnterTestState(app, event)
switch app.StartButton.Text
case 'Start'
app.StartButton.Text = 'Pause';
app.TCindex = 1;
asycnWorker = parpool('Threads',1);
clientQueue = parallel.pool.DataQueue;
afterEach(clientQueue,@(data) app.receiveDataOnClient(data));
helperClientQueue = parallel.pool.PollableDataQueue;
temp = app.StartButton.Text;
wkrF = parallel.FevalFuture;
wkrF = parfeval(@testExecutionWorker,0,temp);
wkrQueue = poll(helperClientQueue,inf);
send(wkrQueue.Queue,"Start generating data");
case 'Pause'
app.StartButton.Text = 'Resume';
updateUILog(app,'Test execution enters pause state')
while ~app.isPassed
pause(0.2)
end
updateUILog(app,sprintf('Test execution paused at %i', app.TCindex));
case 'Resume'
app.StartButton.Text = 'Pause';
updateUILog(app,'Resume test execution');
testExecutionMaster(app)
end
updateUILog(app,"End of EnterTestStateFunction")
end
end
% Component initialization
methods (Access = private)
% Create UIFigure and components
function createComponents(app)
% Create UIFigure and hide until all components are created
app.UIFigure = uifigure('Visible', 'off');
app.UIFigure.Position = [100 100 640 480];
app.UIFigure.Name = 'MATLAB App';
% Create StartButton
app.StartButton = uibutton(app.UIFigure, 'push');
app.StartButton.ButtonPushedFcn = createCallbackFcn(app, @EnterTestState, true);
app.StartButton.Position = [422 321 138 45];
app.StartButton.Text = 'Start';
% Create AbortButton
app.AbortButton = uibutton(app.UIFigure, 'push');
app.AbortButton.Position = [422 249 138 41];
app.AbortButton.Text = 'Abort';
% Create TextArea
app.TextArea = uitextarea(app.UIFigure);
app.TextArea.Editable = 'off';
app.TextArea.Position = [60 28 475 168];
% Create Abort2Button
app.Abort2Button = uibutton(app.UIFigure, 'state');
app.Abort2Button.Text = 'Abort2';
app.Abort2Button.Position = [129 289 186 54];
% Show the figure after all components are created
app.UIFigure.Visible = 'on';
end
end
% App creation and deletion
methods (Access = public)
% Construct app
function app = ExampleWorkflowAbortAsyn
% Create UIFigure and components
createComponents(app)
% Register the app with App Designer
registerApp(app, app.UIFigure)
% Execute the startup function
runStartupFcn(app, @startupFcn)
if nargout == 0
clear app
end
end
% Code that executes before app deletion
function delete(app)
% Delete UIFigure when app is deleted
delete(app.UIFigure)
end
end
end

Respuestas (1)

Rick Amos
Rick Amos el 28 de Mayo de 2025
Editada: Rick Amos el 3 de Jun. de 2025
You're suspicion is spot on, passing the app object to the worker will not work (as doing so sends a copy, or in this case, an invalid copy). Instead, you need to make your testExecutionWorker static and pass in arguments directly instead of the app:
classdef ExampleWorkflowAbortAsyn < matlab.apps.AppBase
% Static is important here as you want this to be callable without the app object.
methods (Access = public, Static)
function testExecutionWorker(dataQueue, n, myIn1, myIn2, ..)
for i = 1:n
% Do stuff then
send(dataQueue, someResult);
end
end
end
% In the start button callback:
methods (Access = private)
function EnterTestState(app, event)
switch app.StartButton.Text
case 'Start'
..
myIn1 = app.SomeProperty;
myIn2 = app.SomeOtherProperty;
n = app.N
dataQueue = parallel.pool.PollableDataQueue;
% Have to use the classname here instead of app as this
% function is static.
fut = parfeval(@ExampleWorkflowAbortAsyn.testExecutionWorker, 0, dataQueue, n, myIn1, myIn2, ..);
% If for any reason testExecutionWorker fails, we only
% find out by checking fut. One way to deal with this
% is to check fut periodically with ~isempty(fut.Error)
% Another way is to do the following, which will add a
% callback once fut finishes:
afterAll(fut, @app.checkIfFutureFailed, 0, PassFuture=true);
..
% Rest of code is same
end
end
function checkIfFutureFailed(app, fut)
if ~isempty(fut.Error)
errordlg("Something went wrong!", fut.Error.getReport());
end
end
end
end
Several other tips along the way:
  1. For your use-case here, if you're using a recent version of MATLAB (I think R2021b or later), you can use backgroundPool instead of parpool("Threads", 1). It's functionally the same, except it is available in base MATLAB and doesn't require Parallel Computing Toolbox (just do "parfeval(backgroundPool,..)" in your code).
  2. You might want to look into afterEach / afterAll and parallel.pool.DataQueue. The current code (waiting on DataQueue/poll(inf)) will have periods of time where the app does not respond to button presses. By adding callbacks to when data messages are received (instead of waiting for those data messages directly), you allow other callbacks like button presses to still work. A full example of this is Create Responsive Apps by Running Calculations in the Background.
  3 comentarios
Rick Amos
Rick Amos el 3 de Jun. de 2025
My sincere apologies, there was a typo in what I put down. Instead of parallel.internal.PollableDataQueue, it was meant to be parallel.pool.PollableDataQueue.
In answer to the questions:
  1. That is correct. "send(dataQueue, someResult)" adds one piece of data to that dataqueue that can be retrieved by any "poll(dataQueue, Inf)" in the app.
  2. The switch block will only do the jump once at the beginning. Changing the text afterwards to 'Pause' ought to have no effect until the very next time someone presses that button. I'm not sure I understand with respect to 'Pause' and the parallel running thread, you don't need to do this necessarily. If you want that parallel thread to stop when the user click's pause, you can call cancel(fut) to stop the work.
function EnterTestState(app, event)
switch app.StartButton.Text
case 'Start'
app.StartButton.Text = 'Pause';
app.TCindex = 1;
asycnWorker = parpool('Threads',1);
clientQueue = parallel.pool.DataQueue;
myIn1 = app.StartButton.Text;
myIn1 = app.StartButton.Text;
n = 10;
dataQueue = parallel.pool.PollableDataQueue;
app.Future = parfeval(@ExampleWorkflowAbortAsyn.testExecutionWorker, 0, dataQueue, n, myIn1, myIn2);
afterAll(app.Future, @app.checkIfFutureFailed, 0, PassFuture=true);
case 'Pause'
% This will stop the parallel work
cancel(app.Future);
..
end
This would require storing the future in a property, E.G. the "app.Future" above.
Ursina
Ursina el 12 de Jun. de 2025
no worries :) a typo happens easily. Regarding your feedback:
  • Using parallel.internal.PollableDataQueue does not work together with afterEach. I changend the code (see attached file for full code), which allows display in the text area and the abort button works (see below). If I understand it correctly with parallel.internal.PollableDataQueue I have to send and poll individually?
methods (Access = public, Static)
function output = testExecutionWorker(dataQueue, TCindex, n, myIn1)
output = struct('TCindex', TCindex);
for TCindex = 1:n
output.TCindex = TCindex;
disp(output.TCindex)
send(dataQueue, sprintf('TC %i', TCindex));
pause(1)
send(dataQueue, sprintf('\t Prepare measurement...'))
pause(0.5)
send(dataQueue, sprintf('\t Execute measurement...'))
pause(0.5)
send(dataQueue, sprintf('\t Analyze measurement...'))
pause(0.5)
send(dataQueue, sprintf('\t TC %i passed :)', TCindex))
send(dataQueue, sprintf('\t ---------------------------'))
end
end
end
methods (Access = private)
% Button pushed function: StartButton
function EnterTestState(app, event)
switch app.StartButton.Text
case 'Start'
app.StartButton.Text = 'Pause';
app.TCindex = 1;
asycnWorker = parpool('Threads',1);
myIn1 = app.StartButton.Text;
n = 10;
dataQueue = parallel.pool.DataQueue;
app.Future = parfeval(@ExampleWorkflowAbortAsyn.testExecutionWorker, 1, dataQueue, app.TCindex, n, myIn1);
afterEach(dataQueue, @app.updateUILog);
afterAll(app.Future, @app.checkIfFutureFailed, 0, PassFuture=true);
case 'Pause'
app.StartButton.Text = 'Resume';
updateUILog(app,'Test execution enters pause state')
% ..... do other stuff here
end
end
% Button pushed function: AbortButton
function AbortButtonPushed(app, event)
cancel(app.Future)
message = 'Test plan execution aborted. No data are saved and all instruments turned off.';
updateUILog(app, message)
app.StartButton.Text = 'Start';
poolobj = gcp('nocreate');
delete(poolobj)
end
  • Regarding the 'Pause' state. I aim to send from the main thread in the app designer a signal so that the parallel thread finishes in a defined state. In this case I want the current iteration TCindex in testExecutionWorker to be finished before the parallel execution stops. I introduced a 2nd queue and send the string 'Pause'. In testExecutionWorker I added a poll (see code below). But 1) it does not output the messages anymore and 2) in the parallel thread it seems to ignore the "Pause" string.
So what am I missing when I want to send information to the parallel thread?
function output = testExecutionWorker(flagQueue, dataQueue, TCindex, n)
output = struct('TCindex', TCindex);
for TCindex = 1:n
output.TCindex = TCindex;
disp(output.TCindex)
send(dataQueue, sprintf('TC %i', TCindex));
pause(1)
send(dataQueue, sprintf('\t Prepare measurement...'))
pause(0.5)
send(dataQueue, sprintf('\t Execute measurement...'))
pause(0.5)
send(dataQueue, sprintf('\t Analyze measurement...'))
pause(0.5)
send(dataQueue, sprintf('\t TC %i passed :)', TCindex))
send(dataQueue, sprintf('\t ---------------------------'))
b = poll(flagQueue);
disp(b)
if strcmp(b, 'Pause')
return
end
end
end
function EnterTestState(app, event)
dataQueue = parallel.pool.DataQueue;
flagQueue = parallel.pool.PollableDataQueue;
switch app.StartButton.Text
case 'Start'
app.StartButton.Text = 'Pause';
app.TCindex = 1;
n = 10;
asycnWorker = parpool('Threads',1);
app.Future = parfeval(@ExampleWorkflowAbortAsyn.testExecutionWorker, 1,flagQueue, dataQueue, app.TCindex, n);
afterEach(dataQueue, @app.updateUILog);
afterAll(app.Future, @app.checkIfFutureFailed, 0, PassFuture=true);
case 'Pause'
app.StartButton.Text = 'Resume';
updateUILog(app,'Test execution enters pause state')
send(flagQueue,'Pause')
app.TCindex = app.Future.OutputArguments;
cancel(app.Future)
disp(app.TCindex)
%updateUILog(app,sprintf('Test execution
end
end

Iniciar sesión para comentar.

Categorías

Más información sobre Develop Apps Using App Designer en Help Center y File Exchange.

Productos


Versión

R2024b

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by