Main Content

Process Kafka Events Using MATLAB

This example shows how to use the Streaming Data Framework for MATLAB® Production Server™ to process events from a Kafka® stream. The example provides and explains the recamanSum and initRecamanSum streaming analytic functions that process event streams, and the demoRecaman script that creates event streams, validates event stream creation, uses the streaming analytic function to process event streams, and writes the results to an output stream.

The example functions and script are located in the support_package_root\mps\streaming\Examples\Numeric folder, where support_package_root is the root folder of support packages on your system. To get the path to this folder, use this command:

fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')

Prerequisites

  • You must have Streaming Data Framework for MATLAB Production Server installed on your system. For more information, see Install Streaming Data Framework for MATLAB Production Server.

  • You must have a running Kafka server where you have the necessary permissions to create topics. The example assumes that the network address of your Kafka host is kafka.host.com:9092.

Write Streaming Analytic MATLAB Function

For this example, use the sample MATLAB functions recamanSum and initRecamanSum. Later, you iterate the recamanSum streaming function over several events to compute results.

Write Stateful Function

The recamanSum function is stateful. In stateful functions, the data state is shared between events, and past events can influence the way current events are processed. recamanSum computes the cumulative sum of a numeric sequence in stream variable R, and returns a table cSum and structure state. The table cSum contains the cumulative sum of the elements in R along with timestamps. The structure state contains the final value of the sequence in its field cumsum.

function [cSum, state] = recamanSum(data, state)
    timestamp = data.Properties.RowTimes; 
    key = data.key;
    
    sum = cumsum(data.R) + state.cumsum;
    
    state.cumsum = sum(end);

    cSum = timetable(timestamp, key, sum);
end

Write State Initialization Function

The initRecamanSum function initializes state for the first iteration of the recamanSum function.

function state = initRecamanSum(config)
    state.cumsum = 0;
end

Create Sample Stream Events

To run the example, you require sample streaming data. The demoRecaman script contains the following code to create streaming data that consists of the first 1000 elements of Recamán's sequence and also contains code to write the sequence to a Kafka topic recamanSum_data.

  1. Set the Kafka hostname and port number.

    kafkaHost = "kafka.host.com";
    kafkaPort = 9092;

  2. Create the first 1000 elements of Recamán's sequence.

    To create the sequence, you can use the following recamanTimeTable function also located in the \Examples\Numeric folder. recamanTimeTable creates a timetable containing the first N elements of Recamán's sequence.

    function tt = recamanTimeTable(N)
        rs = zeros(1,N);
        for k=2:N
            n = k-1;
            subtract = rs(k-1) - n;
            if  subtract > 0 && any(rs == subtract) == false
                rs(k) = subtract;
            else
                rs(k) = rs(k-1) + n;
            end
        end
    
        incr = seconds(1:N);
    
        thisVeryInstant = ...
            convertTo(datetime, "epochtime", "Epoch", "1970-1-1");
        thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",...
            "epochtime", "Epoch", "1970-1-1");
    
        thisVeryInstant.TimeZone = "UTC";
        timestamp = (thisVeryInstant - seconds(N)) + incr';
    
        key = (0:N-1)';
        key = string(key);
        R = rs';
        tt = timetable(timestamp,R,key);
    
    end

  3. Store the results of recamanTimeTable in a timetable.

    tt0 = recamanTimeTable(1000);

  4. Create a stream object connected to the recamanSum_data topic. Later, you write the timetable that contains the Recamán sequence to recamanSum_data.

    dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);

  5. If the recamanSum_data topic already exists, delete it.

    try deleteTopic(dataKS); catch, end

  6. Write the entire Recamán sequence to the recamanSum_data topic.

    writetimetable(dataKS, tt0);

Validate Sample Data Creation

To validate the sample stream events that you created, confirm that the first 100 rows that you read from the recamanSum_data topic are the same as the sample data you created and wrote to the recamanSum_data topic. The demoRecaman script contains the following code.

  1. Read one window of data (100 rows) from the recamanSum_data topic into a timetable tt1.

    tt1 = readtimetable(dataKS);

  2. Check if the data read into tt1 is equal to the first 100 elements from the Recamán sequence you wrote.

    if isequal(tt0(1:height(tt1),:), tt1)
        fprintf(1,"Success writing data to topic %s.\n", dataKS.Name);
    end

  3. Stop reading from the dataKS stream, since later you use dataKS to read again from the recamanSum_data topic. Reading from the same topic using multiple streams is not permitted.

    stop(dataKS);

Process Stream Events with Streaming Analytic Function

Iterate the recamanSum streaming analytic function multiple times to read the numeric sequence from the input stream, compute its cumulative sum, and write the results to the output stream. The demoRecaman script contains the following code.

  1. Create an output stream connected to the recamanSum_results topic. Use recamanSum_results to store the output of the recamanSum streaming function.

    resultKS = kafkaStream(kafkaHost,kafkaPort,"recamanSum_results", ...
        Rows=100);

  2. Create an event stream processor to iterate the recamanSum streaming function over the input topic connected to the stream dataKS. Write the results to the output topic connected to the stream resultKS. Use a persistent storage connection named RR to store data state between iterations.

    rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,...
        StateStore="RR",OutputStream=resultKS);
  3. Execute the stream function ten times. Since the window size, or the number of rows read at a time, is 100, ten iterations consumes the entire sequence of 1000 elements.

    fprintf(1,"Computing cumulative sum of Recaman sequence.\n");
    execute(rsp, 10);

  4. Delete the event stream processor. This shuts down StateStore, which is required to run this script more than once in a row.

    clear rsp;

  5. Read the results from the output stream.

    fprintf(1,"Reading results from %s.\n", resultKS.Name);
    tt2 = timetable.empty;
    for n = 1:10
        tt2 = [ tt2 ; readtimetable(resultKS) ];
    end
    
    cSum = cumsum(tt0.R);
    if tt2(end,:).sum == cSum(end)
        fprintf(1,"Cumulative sum computed successfully: %d.\n", ...
            tt2(end,:).sum);
    else
        fprintf(1,"Expected cumulative sum %d. Computed %d instead.\n", ...
            cSum(end), tt2(end,:).sum);
    end

When you run the entire demoRecaman script, you see the following output.

Success writing data to topic recamanSum_data.
Computing cumulative sum of Recaman sequence.
Reading results from recamanSum_results.
Cumulative sum computed successfully: 837722.

See Also

| | | | | |

Related Topics