Is it possible to use parfeval and backgroundPool to write to a Mongo database in a background thread?

I am processing a large table of data, for example 100000x100. After processing this data, I want to write each row of the table into a mongodb database using the mongoc driver. It takes over 150s, so I tried various ways to optimize the code. What I am doing now is below:
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName'));
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
parfor i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
insert(c.Value, 'collection', insertData);
end
The above code works and I am able to write to the mongo database in about 20 seconds, but this is still too slow. So I want to know if there is a way I can do this write in the background by using parfeval. I have tried the following but get errors with parallel.pool.Constant or I get errors saying MongoConnection is not a supported thread based worker. I am confused by this since I am able to use 'parfor' with a MongoConnection, so why can't I use parfeval?
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName')););
% f = parfeval(backgroundPool, @(data, const) TableInsert(data, const.Value), 1, dataTable, c);
% f = parfeval(backgroundPool, TableInsert, 1, dataTable, c);
f = parfeval(backgroundPool, @TableInsert, 1, dataTable, c.Value);
o = fetchOutputs(f);
fprintf(o);
function complete = TableInsert(dataTable, C)
tic;
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
% mconn = C.Value;
for i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
insert(C, 'collection', insertData);
end
toc;
complete = sprintf('Table insert was completed in: %f\n', toc);
end
In other attempts to speed up the large data write, I have used pymongo instead of using mongoc. This is definitely faster, but I still am having trouble passing off the write to a background worker. Below is what I have tried using pymongo:
% Create const connection to mongoclient using pymongo
C = parallel.pool.Constant(@() py.pymongo.MongoClient('xx.xx.xxx.xx:xxxxx')); % ip, port, database name
f = parfeval(backgroundPool, @(data, c) pythonTableInsert(data, c.Value), 1, dataTable, C);
% Get output
o = fetchOutputs(f);
fprintf(o);
function complete = pythonTableInsert(dataTable, C)
tic;
% Batch insert
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
parfor i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
% Convert to 1xN cell array where each cell is a 1x1 struct
insertList = num2cell(insertData)';
% Insert
collection = C.get_database('database').get_collection('collection');
collection.insert_many(insertList);
end
toc;
complete = sprintf('Python insert was completed in: %f\n', toc);
end

Respuestas (1)

In your first case using parfor, if you haven't changed any settings, you will be using a process pool. (The default profile is "Processes", and with default settings, a pool will be created there when you first hit parfor).
If you want to run stuff in the background, you need to run a bunch of parfeval requests, a bit like this:
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName'));
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
for i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
fut(i) = parfeval(@iRunInsert, 0, c, dataTable(start:stop,:));
end
function iRunInsert(c, data)
insert(c.Value, 'collection', data);
end

4 comentarios

Hello Edric,
I appreciate your response. However, your example does not use the backgroundPool and still takes some time to process through the for loop. I tried your example and put the parfeval in the for loop, it took 30s and was slower than the 20s I had seen with just a parfor loop and no parfeval.
To explain better, the function I have right now processes a dataTable in <1s, and this 'processDataTable' function is called by a uifigure GUI. I want to return to the GUI after the dataTable is processed because the GUI is receiving new data. Therefore, I do not want to spend more than a couple of seconds in the 'processDataTable' function and the mongo insert is taking the longest time, which is why I want the insert to be done in the background/separate thread and for the main process to return to the GUI.
Do you have any other suggestions? Perhaps there are other ways to improve the workflow than I have explored?
Why do you need to use the backgroundPool specifically? Any use of parfeval with a pool (in this case, a Process pool) runs asynchronously with respect to the client. Does it help to ensure the pool is already running by calling
parpool()
before starting your code? Or, does it take 30 seconds to issue all the parfeval calls?
I don't need to use backgroundPool specifically, but from reading the documentation/browsing through other answers - I thought that is what I had to use to run asynchronously. I have ensured that parpool is running before starting my code and I have a timer like this
tic;
for
% parfeval calls
end
toc;
So, yes it does take 30 seconds to issue all of the parfeval calls. I have 12 workers in my parpool, not sure if that is a limitation or not.
Hm, that seems like a long time simply to issue the parfeval calls - I would normally expect each call to parfeval to run in a few milliseconds or so (in the absence of large amounts of data being transmitted). What's the value of numBatches? It might be worth running using the profile command in effect to see what's going on. It's possible that splitting up dataTable is taking a long time.

Iniciar sesión para comentar.

Categorías

Más información sobre Background and Parallel Processing en Centro de ayuda y File Exchange.

Productos

Versión

R2022b

Preguntada:

el 20 de Nov. de 2023

Comentada:

el 22 de Nov. de 2023

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by