Configuration¶
Tip
See librdkafka CONFIGURATION for available parameters
Set, Get & Dump¶
Note
Values are always of type string
.
// create config
$conf = new \RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
var_dump($conf->get('log_level'));
var_dump($conf->dump());
Logging¶
The default is to print to stderr. Alternatively the application may provide its own logger callback. Or call setLogCb(null)
to disable
logging.
// create config
$conf = new \RdKafka\Conf();
// enable logging
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$conf->setLogCb(
function (\RdKafka $rdkafka, int $level, string $facility, string $message): void {
echo sprintf("log: %d %s %s", $level, $fac, $buf) . PHP_EOL;
}
);
Error Handling¶
The error callback is used by librdkafka to signal warnings and errors back to the application.
These errors should generally be considered informational and non-permanent, the client will try to recover automatically from all type of errors. Given that the client and cluster configuration is correct the application should treat these as temporary errors.
The error callback will be triggered with err set to RD_KAFKA_RESP_ERR__FATAL if a fatal error has been raised; in this case use rd_kafka_fatal_error() to retrieve the fatal error code and error string, and then begin terminating the client instance.
// create config
$conf = new \RdKafka\Conf();
$conf->setErrorCb(
function (\RdKafka $rdkafka, int $err, string $reason, $opaque = null): void
{
echo sprintf("Error %d %s. Reason: %s", $err, rd_kafka_err2str($err), $reason) . PHP_EOL;
if ($err === RD_KAFKA_RESP_ERR__FATAL) {
throw new \RuntimeException('fatal error');
}
}
);
Read Statistics¶
Tip
See https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md for a detailed list of emitted metrics.
// create config
$conf = new \RdKafka\Conf();
$conf->setStatsCb(
function ($consumerOrProducer, string $json, int $jsonLength, $opaque = null): void
{
var_dump(\json_decode($json, true));
}
);
Monitor Message Delivery¶
Producer only.
Keep track of message delivery and react on final delivery errors.
Note
Please read about librdkafka & its message reliability to fully understand why delivery of messages can fail and how to handle failures.
$conf->setDrMsgCb(
function (\RdKafka\Producer $producer, \RdKafka\Message $message): void {
if ($message->err !== RD_KAFKA_RESP_ERR_NO_ERROR) {
// handle delivery errors
}
}
);