werk365/larakafka

Kafka client for laravel that can easily handle producing and consuming messages

1.1.8 2021-08-19 13:10 UTC

This package is auto-updated.

Last update: 2024-03-19 18:48:08 UTC


README

Latest Version on Packagist Total Downloads StyleCI

Kafka client package for use in Laravel. Based on jobcloud/php-kafka-lib.

This package supports an extension to produce spatie activitylog activity automatically. Use werk365/larakafka-activity.

Without much configuration (simply making sure the config has the required broker information and credentials), you'll be able to enable the Spatie Activity Logging on a model, and this package will take care of also sending that information to a kafka topic corresponding with your application name.

Besides this basic logging feature, it also allows you to produce and consume anything you would want.

Producing can be easily done in-code, and you can start any number of consumers through:

$ php artisan larakafka:consume {topic}

Installation

Via Composer

$ composer require werk365/larakafka

Publish the config file using

$ php artisan vendor:publish --provider="Werk365\LaraKafka\LaraKafkaServiceProvider"

Configuration

The publishes config file looks as follows, you can find an explanation below it.

<?php

return [
    'client' => [
        'client_name' =>  str_replace(' ', '-', strtolower(env('APP_NAME'))),
        'configs' => [
            'producer' => [
                'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))),
                'compression.codec' => 'snappy',
                'security.protocol' => 'SASL_SSL',
                'sasl.mechanisms' => 'PLAIN',
                'sasl.username' => '',
                'sasl.password' => ''
            ],
            'consumer' => [
                'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))),
                'security.protocol' => 'SASL_SSL',
                'sasl.mechanisms' => 'PLAIN',
                'sasl.username' => '',
                'sasl.password' => '',
                'auto.offset.reset' => 'earliest'
            ]
        ],
        'broker' => '',
    ],
    'functions' => [
        'consumer' => [
            'example' => [
                'function' => 'ingest',
                'namespace' => '\App\Services\KafkaService'
            ]
        ],
    ],
    'maps' => [
        'consumer' => [
            'example' => [
                'model' => 'App\Models\Example',
                'event_id' => 'uuid',
                'model_id' => 'id',
                'attributes' => [
                    'uuid' => 'id',
                    'first_name' => 'name',
                ]
            ]
        ]
    ]
];

Not all of the above configuration is needed for every usecase. If you just wish to use the activity logging feature, simply make sure to configure the client.configs.producer and client.broker.

Client

The producer and consumer configuration live here. This should be fairly straight forward. Do note that the default topic set for the producer will be the client.client_name value.

Functions

This is currently only used for the consumer, if you wish to consume a topic, the corresponding function will be called when a message is received. In this case while consuming the example topic, a static function \App\Services\KafkaService::ingest($key, $headers, $body) would be called. It is then up to your application to process that data.

Maps

Lastly, this is the configuration for the storeMessage() function. This function can be used to help easily process the data received from Kafka. This function expects an array of data and can map and store it to a database for you. Further information about this function in usage, but some configuration points:

model = The model that should be used to store the data

event_id = The key name of the unique id that belongs to the object

model_id = The key name of the unique id as it is called in the model

attributes = The attributes that should be stored for this model. Not all attributes configured here have to be present in the consumed message, as only updates attributes could be sent. The key represents the attribute key name as it is in the event, the value represents the key as it is called in the model.

Usage

Produce

use Werk365\LaraKafka\LaraKafka;

$kafka = new LaraKafka();
$kafka->setTopic("string") //optional, defaults to application name
    ->setKey("string") // optional, default will be the caller classname
    ->setHeaders(["key" => "value"]) // optional, default will contain more information about caller
    ->setBody("string") // Body can also be set like: $kafka = new LaraKafka("body")
    ->produce();

Other available methods:

->setBroker("string") Sets broker other than defined in config

->setProducerConfig([]) Overrides config settings

->addProducerConfig("key", "value") Adds value to set config

->addHeaders([]) Merges added headers array in to set one

Octane Consumer

To run a consumer when using Laravel Octane, you can choose to either have it run through a Swoole worker or normally though a php process using the console command described in the next step. To use the Octane version, first make a new consumer using:

$ php artisan kafka:consumer topic

This will create a new consumer class in the App\Consumers namespace. For example: App\Consumers\TestConsumer when using test as the topic name.

In this consumer you will find a handleMessage() method which has everything you will need to start processing your messages. To make sure this consumer is started with your octane application, add the consumer to the listeners in the octane config file. I recommend adding it as a listener for the TickReceived event like so:

 'listeners' => [
       // ...

        TickReceived::class => [
            ...Octane::prepareApplicationForNextOperation(),
            \App\Consumers\TestConsumer::class
        ],

        // ...
    ],

The consumer will only be started once and keep running, but putting it here will mean it will be started every octane-tick (every second), so the consumer can ensure it is still running. If the initial consumer has given no sign of live for 60 seconds, a new consumer will be started.

Console Consumer

To run a consumer, you can simply run

$ php artisan larakafka:consume {topic}

On reading a message, the function defined in your config will be called. If we have the function given in the example config, it could simply look like this:

namespace App\Services;

use Illuminate\Support\Facades\Log;

class KafkaService

{
    public static function ingest($key, $headers, $body)
    {

        Log::info(json_encode($body));
    }

}

If you wish to store the data received in the body, you can use the storeMessage method. This method takes in an array of attributes and an array of types. This means one array of attributes can be mapped and stored to different Models (types). In this example we'll only store one model, assuming the example config, and assuming the body has event_attributes which is an array containing attributes.

namespace App\Services;

use Werk365\LaraKafka\LaraKafka;

class KafkaService

{
    public static function ingest($key, $headers, $body)
    {
        $kafka = new LaraKafka();
        $kafka->storeMessage($body->event_attributes, ["user"]);
    }

}

Change log

Please see the changelog for more information on what has changed recently.

Testing

WIP

$ composer test

Security

If you discover any security related issues, please email author email instead of using the issue tracker.

Credits

License

license. Please see the license file for more information.