Process Kafka Events Using MATLAB
This example shows how to use the Streaming Data Framework for MATLAB® Production Server™ to process events from a Kafka® stream. The example provides and explains the recamanSum and
initRecamanSum streaming analytic functions that process event streams,
and the demoRecaman script that creates event streams, validates event
stream creation, uses the streaming analytic function to process event streams, and writes the
results to an output stream.
The example functions and script are located in the
folder, where support_package_root\mps\streaming\Examples\Numeric is the root
folder of support packages on your system. To get the path to this folder, use this
command:support_package_root
fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')
Prerequisites
You must have Streaming Data Framework for MATLAB Production Server installed on your system. For more information, see Install Streaming Data Framework for MATLAB Production Server.
You must have a running Kafka server where you have the necessary permissions to create topics. The example assumes that the network address of your Kafka host is
kafka.host.com:9092.
Write Streaming Analytic MATLAB Function
For this example, use the sample MATLAB functions recamanSum and initRecamanSum.
Later, you iterate the recamanSum streaming function over several events
to compute results.
Write Stateful Function
The recamanSum function is stateful. In
stateful functions, the data state is shared between events, and past events can influence
the way current events are processed. recamanSum computes the
cumulative sum of a numeric sequence in stream variable R, and returns
a table cSum and structure state. The table
cSum contains the cumulative sum of the elements in
R along with timestamps. The structure state
contains the final value of the sequence in its field cumsum.
function [cSum, state] = recamanSum(data, state) timestamp = data.Properties.RowTimes; key = data.key; sum = cumsum(data.R) + state.cumsum; state.cumsum = sum(end); cSum = timetable(timestamp, key, sum); end
Write State Initialization Function
The initRecamanSum function initializes state for the first
iteration of the recamanSum function.
function state = initRecamanSum(config) state.cumsum = 0; end
Create Sample Stream Events
To run the example, you require sample streaming data. The
demoRecaman script contains the following code to create streaming data
that consists of the first 1000 elements of Recamán's sequence and also contains code to
write the sequence to a Kafka topic recamanSum_data.
Set the Kafka hostname and port number.
kafkaHost = "kafka.host.com"; kafkaPort = 9092;Create the first 1000 elements of Recamán's sequence.
To create the sequence, you can use the following
recamanTimeTablefunction also located in the\Examples\Numericfolder.recamanTimeTablecreates a timetable containing the firstNelements of Recamán's sequence.function tt = recamanTimeTable(N) rs = zeros(1,N); for k=2:N n = k-1; subtract = rs(k-1) - n; if subtract > 0 && any(rs == subtract) == false rs(k) = subtract; else rs(k) = rs(k-1) + n; end end incr = seconds(1:N); thisVeryInstant = ... convertTo(datetime, "epochtime", "Epoch", "1970-1-1"); thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",... "epochtime", "Epoch", "1970-1-1"); thisVeryInstant.TimeZone = "UTC"; timestamp = (thisVeryInstant - seconds(N)) + incr'; key = (0:N-1)'; key = string(key); R = rs'; tt = timetable(timestamp,R,key); end
Store the results of
recamanTimeTablein a timetable.tt0 = recamanTimeTable(1000);
Create a stream object connected to the
recamanSum_datatopic. Later, you write the timetable that contains the Recamán sequence torecamanSum_data.dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);If the
recamanSum_datatopic already exists, delete it.try deleteTopic(dataKS); catch, end
Write the entire Recamán sequence to the
recamanSum_datatopic.writetimetable(dataKS, tt0);
Validate Sample Data Creation
To validate the sample stream events that you created, confirm that the first 100 rows
that you read from the recamanSum_data topic are the same as the sample
data you created and wrote to the recamanSum_data topic. The
demoRecaman script contains the following code.
Read one window of data (100 rows) from the
recamanSum_datatopic into a timetablett1.tt1 = readtimetable(dataKS);
Check if the data read into
tt1is equal to the first 100 elements from the Recamán sequence you wrote.if isequal(tt0(1:height(tt1),:), tt1) fprintf(1,"Success writing data to topic %s.\n", dataKS.Name); end
Stop reading from the
dataKSstream, since later you usedataKSto read again from therecamanSum_datatopic. Reading from the same topic using multiple streams is not permitted.stop(dataKS);
Process Stream Events with Streaming Analytic Function
Iterate the recamanSum streaming analytic function multiple times to
read the numeric sequence from the input stream, compute its cumulative sum, and write the
results to the output stream. The demoRecaman script contains the
following code.
Create an output stream connected to the
recamanSum_resultstopic. UserecamanSum_resultsto store the output of therecamanSumstreaming function.resultKS = kafkaStream(kafkaHost,kafkaPort,"recamanSum_results", ... Rows=100);
Create an event stream processor to iterate the
recamanSumstreaming function over the input topic connected to the streamdataKS. Write the results to the output topic connected to the streamresultKS. Use a persistent storage connection namedRRto store data state between iterations.rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,... StateStore="RR",OutputStream=resultKS);
Execute the stream function ten times. Since the window size, or the number of rows read at a time, is 100, ten iterations consumes the entire sequence of 1000 elements.
fprintf(1,"Computing cumulative sum of Recaman sequence.\n"); execute(rsp, 10);Delete the event stream processor. This shuts down
StateStore, which is required to run this script more than once in a row.clear rsp;Read the results from the output stream.
fprintf(1,"Reading results from %s.\n", resultKS.Name); tt2 = timetable.empty; for n = 1:10 tt2 = [ tt2 ; readtimetable(resultKS) ]; end cSum = cumsum(tt0.R); if tt2(end,:).sum == cSum(end) fprintf(1,"Cumulative sum computed successfully: %d.\n", ... tt2(end,:).sum); else fprintf(1,"Expected cumulative sum %d. Computed %d instead.\n", ... cSum(end), tt2(end,:).sum); end
When you run the entire demoRecaman script, you see the following
output.
Success writing data to topic recamanSum_data. Computing cumulative sum of Recaman sequence. Reading results from recamanSum_results. Cumulative sum computed successfully: 837722.
See Also
readtimetable | writetimetable | kafkaStream | eventStreamProcessor | execute | inMemoryStream | testStream