MATLAB Answers

How do I distribute N 3-dimensional (large) arrays for processing across N workers?

2 views (last 30 days)
I would like to accelerate processing of a large set of radar event data using Parallel Computing. I have a server with 48-cores and 512GB of RAM so all of the data I need to process will fit into the local computer's memory with enough cores to process each independent set of events. The data I want each core to process consists of 8 channels of IQ data which is a matrix of S samples x P pulses -- i.e., each I would like to distribute an 8 x S x P matrix to each worker.
Currently the data is loaded from Nx8 files into an Nx8xSxP matrix which I would like to distribute to N workers. The file reading is actually quite slow since it is done by a single processor so perhaps the first question is whether or I could have each worker load their own Nx8 set of files.
Otherwise, how do I distribute each 8xSxP matrix to my workers?

  4 Comments

Show 1 older comment
Chris Steenhoek
Chris Steenhoek on 11 Jun 2020
Hi Walter,
Thanks for the response. Are you suggesting that by moving the N dimension to the "last" dimension that Matlab will by default slice the data off to the workers along this dimension?
I can actually build the matrix with N as the last dimension but it still seems that parfor chokes when a very large set of data is processed via parfor. I believe it tries to pass a copy of the matrix (or at least the worker's share of the data) to each worker which takes a very long time and requires far greater time than simply processing the data in a serial for loop.
Walter Roberson
Walter Roberson on 11 Jun 2020
You want to distribute 8 x S x P to each worker when you currently have N x 8 x S x P. Assuming that you are asking to
parfor n =1 : N
then if you were to address N8SP(n,:,:,:) inside the worker, then parfor is able to transfer only the appropriate portion of data to the worker instead of sending the whole array. However, think about the organization of data in memory: in memory, the first dimension is the one that varies most quickly in memory, so the memory order would be like 1111 2111 3111 1211 2211 3211 1311 2311 3311 and so on. Therefore to transfer N8SP(n,:,:,:) to the worker, parfor would have to transfer every N'th item in memory to the worker. That does not permit much in the way of hardware acceleration. The hardware cache line size on Intel i7 CPUs is 64 bytes, so if the 4 bytes at offset 0 are being sent to worker 1 and the 4 bytes at offset 4 are being sent to worker 2 and the 4 bytes at offset 8 are being sent to worker 3, then you have a whole bunch of cache line thrashing. This is the slowest possible memory arrangement for efficiency.
For the highest possible arrangement for efficiency, you would prefer that all of the memory being sent to a worker is already consecutive in memory. And the arrrangement that optimizes that is if the index you are slicing over is the last index in the array, ESPN(:,:,:,n).
I believe it tries to pass a copy of the matrix (or at least the worker's share of the data) to each worker
Yes, exactly. And for regular arrays, it needs to do that no matter what order the array is in memory in the client; having N as the last dimension reduces the work involved.
which takes a very long time and requires far greater time than simply processing the data in a serial for loop
Yes, that is a common problem for parfor. parfor workers need access to the data they are working on, and since they are separate processes, the memory has to be copied over to them.
There are configurations for using SPMD (not parfor) that in theory can use shared memory segments for labSend() instead of having to serialize and copy that; you don't hear much about those, but historically there were a couple of postings.
In the versions before Mathworks redesigned how complex arrays are handled, it was possible to use a File Exchange contribution to create shared memory segments that could be shared with the workers; it happened that the change of headers required for the redesigned complex arrays is not compatible with the FEX contribution; I don't know if anyone has worked out a substitute.
The overhead of sending data to a parfor worker (or a GPU) is a real one, and it is common for parfor to be slower than serial because of it.
This can lead to needing to redesign the program so that the appropriate data is created on the worker instead of transferred there. For exampe if the data is layers being read in from different files, then have the worker load its appropriate file. This has hidden performance restrictions, that are reduced if each of the files is stored on a different hardware controller (not just different hard drive); having the files stored on different hard drives on the same controller is better than having them all on the same hard drive but is still bottle-necked by controller bandwidth. Having the files all on the same hard-drive risks disk contention... but in some cases can still improve in performance. In particular if the files are fragmented then sometimes you can get a bit more performance by having two requests queued at the same time with the controller, as the controller can read data off of one track as it happens to spin underneath instead of wasting that time waiting for the drive to spin to collect parts of the first file. (But having fragments is still slower than if you have fully de-fragmented.)
Multiple workers each wanting to read a different file: "throw hardware at it" if practical.
If not practical, if everything has to be stored on the same drive, then typically drive performance is optimized with two processes asking for files at the same time (so that when the data for the first process is finished transferring, there is already a request in the controller that it can move on to handling immediately without having to wait for a round trip to MATLAB to decide what the next request is.) With more than two processes asking for file access at the same time on the same drive on the same controller, then is is common that performance starts to degrade due to contention for resources.
Chris Steenhoek
Chris Steenhoek on 12 Jun 2020
Thanks Walter. Great explanation on the memory considerations for the data organization. I certainly understand the need for the data to get created on the worker. This is the reason Edric's recommended approach "works" -- with the caveat of the disk controller limitations both he and you have pointed out (great info there too, btw).
I was hoping there was some way to allocate "shared" memory such that the workers would just get a pointer to their slice of the memory (rather than the data itself needing to be transfered to the workers before the parfor executes). The distributed/codistributed functions seem to almost be what I want but not quite.
I'll do some experiments on our server to see what my limitations are for having the workers load the data files within the parfor loop.

Sign in to comment.

Accepted Answer

Edric Ellis
Edric Ellis on 9 Jun 2020
The best approach probably depends on the operations you need to perform on this Nx8xSxP array. Are the operations that you wish to perform such that you can consider "slices" of the array independently? I.e. can each 8xSxP slice of the array be operated on independently? If so, you could consider an approach like this:
parfor i = 1:N
myData = zeros(8,S,P)
for f = 1:8
% Here, readData reads one file returning a matrix
% of size SxP
myData(f, :, :) = readData(i, f);
end
% Here, "compute" operates on 8xSxP array, giving some result
result(i) = compute(myData);
end
Even with this approach, be aware that the file reading might be slow because of the limitations of the disk hardware you're reading from. It this is a spinning disk, it might actually be counter-productive to try and have multiple workers attempting to read different files simultaneously.
If the operations you need to perform are not as easily "sliced" as in the example above, then it might be better to consider using "distributed arrays".

  3 Comments

Chris Steenhoek
Chris Steenhoek on 11 Jun 2020
Hi Edric,
Thank you for the response. Yes, each 8xSxP array is processed independently (sorry, should have made that more clear in my description). For simplicity of this discussion and clarity in my included code, I made number of channels a variable, C (C=8 in orginal post).
I haven't had a chance to get on our server to try out the real deal, but I mocked up a small test on a desktop. I've inserted this code below this discussion. The first set of code is similar in structure to my original code. The second set reflects your recommended approach.
Processing time for original approach (using mock code; the real code requires about 90 mins in the for loop and effectively locks up in the parfor loop):
  • for loop: 16.4 sec
  • parfor loop: 255.4 sec
Processing time for recommended approach (mock code):
  • for loop: 11.9 sec
  • parfor loop: 6.3 sec
Unfortunately, I do have a spinning disk so I may be forced to load the array outside of the parfor loop. It's unfortunate but the file loading is not nearly as long as the processing so if I can get the processing to run in parallel I'll still be substantially reducing our run time. So... that leads back to my original question. Assuming a have a very large matrix which is NxCxSxP, is there a way to allocate it and load it such that each of the N workers gets a CxSxP slice of my overall matrix?
I previously looked into distributed and codistributed arrays. As I understand it, I should be able to allocate my NxCxSxP matrix across cores and it seems that "distributed" slices across the last dimension (thus Walter's suggestion) while codistributed can be specified more completely. Both work with spmd and not with parfor. I tried both with limited success. I've seen examples where a matrix is declared and distributed both inside and prior to the spmd statement. Specifying prior to spmd wih my large sizes results in very long run times; specifying within the smpd results in relatively long initialization times and but short processing times but overall worse than a basic for loop. Specifiying the matrix in an spmd statement, then loading it outside an spmd statement (as I suspect will be need for the file reads), and subsequently processing it inside an spmd also results in very long run times.
Sorry for the super long response. Please ask for clarification if needed. My mock code is inserted below (I didn't include the code variants I used for distributed and codistributed but am happy to pass that along also).
Mock code of original approach:
N=4; C=2; S=12000; P=6000; %values use most of 32GB RAM in PC
tInit = tic;
NCSP=complex(zeros(N, C, S, P));
for iN = 1:N
for iC = 1:C
NCSP(iN,iC,:,:)=ones(S,P)*iC*iN;
end
end
results = zeros(N,C);
fprintf(' ** Init time = %.1f sec ** \n', toc(tInit));
tLoop = tic;
parfor iN = 1 : N
for iC=1:C
results(iN,iC) = sum(sum(NCSP(iN,iC,:,:)));
end
end
fprintf(' ** Loop time = %.1f sec ** \n', toc(tLoop));
fprintf(' ** Total time = %.1f sec ** \n', toc(tInit));
Mock code of recommended approach:
%N sets of data: C channels, P pulses of S samples
N=4; C=2; S=12000; P=6000; %values use most of 32GB RAM in PC
tLoop = tic;
results = zeros(N,C);
parfor iN = 1 : N
NCSP=complex(zeros(C, S, P));
for iC=1:C
NCSP(iC,:,:)=ones(S,P)*iC*iN;
results(iN,iC) = sum(sum(NCSP(iC,:,:)));
end
end
fprintf(' ** Loop time = %.1f sec ** \n', toc(tLoop));
Edric Ellis
Edric Ellis on 12 Jun 2020
Assuming a have a very large matrix which is NxCxSxP, is there a way to allocate it and load it such that each of the N workers gets a CxSxP slice of my overall matrix?
If you read the data on the client, then the parfor machinery knows how to copy only the necessary portions of NCSP needed by each worker. But it is a copy, and there's some additional transitory memory overhead there. As Walter so rightly points out, it's more efficient if you "slice" the matrix in the final dimension so that when parfor needs to copy out a bunch of slices to send to the workers, it's copying a contiguous block of memory.
If your workload is well-balanced (i.e. you can expect each slice operation to take the same amount of time, give or take), then you could try an approach using spmd which gives you more control. The idea here is similar to my original parfor suggestion, but spmd lets you co-ordinate things such that only a single worker is accessing the disk at any time. Here's a rough sketch:
N=4; C=2; S=1200; P=600;
spmd
% First, divide up the N pieces across the workers
partition = codistributor1d.defaultPartition(N);
% partition is now a vector of length numlabs specifying
% the number of values of slices each worker takes on.
% We need an offset for each worker - compute this using
% cumsum. The result is a vector telling us how many elements
% the preceding workers own.
nOffsetByLab = [0, cumsum(partition)];
% Next, we can force the workers to operate one at a time
% using labBarrier like so:
for activeLab = 1:numlabs
if labindex == activeLab
% My turn to load data
% The partition tells me how many values to load on this worker
myNumSlices = partition(labindex);
% Allocate my local piece of NCSP
myNCSP = complex(zeros(myNumSlices, C, S, P));
% The offset here tells me what the "global" index is in the
% first dimension
myNOffset = nOffsetByLab(labindex);
% Loop over my slices and "load" the data.
for nIdx = 1:myNumSlices
globalN = nIdx + myNOffset;
myNCSP(nIdx, :, :, :) = globalN .* ones(C,S,P);
end
end
% Force all workers to wait here
labBarrier
end
% At this point, each worker has myNumSlices x C x S x P array
% myNCSP, and can perform computations.
myResult = zeros(myNumSlices, 1);
for nIdx = 1:myNumSlices
myResult(nIdx) = sum(myNCSP(nIdx, :));
end
end
% At the end of the spmd block, myResult is a Composite. We
% can simply concatenate the portions of that to get the overall
% result
overallResult = vertcat(myResult{:});
This is quite a bit more complex than the simple parfor approach, but it ensures no large data transfer, and also that only one worker at a time is "loading" data...
Chris Steenhoek
Chris Steenhoek on 12 Jun 2020
Thanks Edric. I had a bit of time on our server today and did a quick test where I reordered my matrix from NCSP to CSPN and did the file reads inside the parfor. I started slow with N=9 and it was loading 9x 280MB files in parallel with seemingly zero issues. There's a file for each channel for each event so with N=9 and C=8, that's 72 of these files. My load time reduced from 467 seconds to 45 seconds which is somehow actually greater than a factor of 9 reduction. I didn't get a chance to push this to see how far it will scale (my ultimate goal is N=45) but it is certainly promising.
With the SPMD approach you've provided as a backup plan I feel very good about the path I'm on. I greatly appreciate the explanations and well commented examples that both you and Walter have provided. I'll try to post an update once I get this all worked out. In my experiment wiht the first 9 events, I found that the code I'm working with isn't exactly memory effiencent so I need to add some memory management to keep within my 512GB. That will probably keep me busy for a few days.
Many thanks.

Sign in to comment.

More Answers (0)

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by