Skip to content

Create a Producer

Configure the producer

$conf = new \RdKafka\Conf();
$conf->set('bootstrap.servers', 'kafka:9092');
$conf->set('socket.timeout.ms', (string) 50);
$conf->set('queue.buffering.max.messages', (string) 1000);
$conf->set('max.in.flight.requests.per.connection', (string) 1);
$conf->setDrMsgCb(
    function (\RdKafka\Producer $producer, \RdKafka\Message $message): void {
        if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
            // Perform your error handling here using $message->errstr()
        }
    }
);
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$conf->setLogCb(
    function (\RdKafka\Producer $producer, int $level, string $facility, string $message): void {
        // Perform your logging mechanism here
    }
);
$conf->set('statistics.interval.ms', (string) 1000);
$conf->setStatsCb(
    function (\RdKafka\Producer $producer, string $json, int $json_len, $opaque): void {
        // Perform your stats mechanism here ...
    }
);

See configuration for more details.

Configure the topic

$topicConf = new \RdKafka\TopicConf();
$topicConf->set('message.timeout.ms', (string) 30000);
$topicConf->set('request.required.acks', (string) -1);
$topicConf->set('request.timeout.ms', (string) 5000);

Create the producer

$producer = new \RdKafka\Producer($conf);

Create a new topic

$topic = $producer->newTopic('playground', $topicConf);

Produce Messages

for ($i = 0; $i < 1000; $i++) {
    $key = $i % 10;
    $payload = sprintf('payload-%d-%s', $i, $key);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, (string) $key);

    // trigger callback queues
    $producer->poll(1);
}

$producer->flush(5000);

Warning

Always call flush after producing messages to not lose messages on shutdown that are still queued up within librdkafka memory and not yet delivered to a broker.

Flush on shutdown

Best practise is to wrap the producer instance and call flush on destruction.

class ExampleApp {
    private \RdKafka\Producer $producer;

    function __construct(\RdKafka\Conf $conf) {
        $this->producer = new \RdKafka\Producer($conf);
    }

    function __destruct() {
        $err = $this->producer->flush(10000);
        if ($err === RD_KAFKA_RESP_ERR__TIMED_OUT) {
            throw new \RuntimeException('Failed to flush producer. Messages might not have been delivered.');
        }
    }
}

Hint

Call flush(-1) to wait indefinitely until all messages are flushed.

Run example

If you want to test producing messages, you can take a look at the producer example. Please take a look at how to prepare running the examples before.

Run the example:

docker-compose run --rm php74 php examples/producer.php

And now you have produced messages to the topic playground.