Skip to content

Class MockCluster

Class \RdKafka\Test\MockCluster

Note: MockCluster is experimental - even in librdkafka! Expect breaking changes within minor versions of this library.

Methods

__destruct()

public __destruct (  ): 

create()

public static create ( 
    int $brokerCount, 
    ?\RdKafka\Conf $conf = null
 ): self
Parameters
brokerCount int
conf ?\RdKafka\Conf
Returns
self

createTopic()

public createTopic ( 
    string $topic, 
    int $partitionCount, 
    int $replicationFactor
 ): void

Creates a topic.

This is an alternative to automatic topic creation as performed by the client itself. The Topic Admin API (CreateTopics) is not supported by the mock broker.

Parameters
topic string
partitionCount int
replicationFactor int

fromProducer()

public static fromProducer ( 
    \RdKafka\Producer $producer
 ): self

Derive mock cluster from Producer created by setting the test.mock.num.brokers configuration property.

Parameters
producer \RdKafka\Producer
Returns
self

getBootstraps()

public getBootstraps (  ): string
Returns
string the mock cluster’s bootstrap.servers list

pushBrokerRequestErrorRtts()

public pushBrokerRequestErrorRtts ( 
    int $brokerId, 
    int $apiKey, 
    int $count, 
    int[] $errorCodeAndRttTuples
 ): void

Same as {@link MockCluster::pushBrokerRequestErrors()} but for a specific broker.

The broker errors take precedence over the cluster errors.

Parameters
brokerId int
apiKey int
count int
errorCodeAndRttTuples int[] plain tuples of error code or 0 (int) and response RTT/delay in millisecond (int)

pushBrokerRequestErrors()

public pushBrokerRequestErrors ( 
    int $brokerId, 
    int $apiKey, 
    int $count, 
    int[] $errorCodes
 ): void

Same as {@link MockCluster::pushRequestErrors()} but for a specific broker.

The broker errors take precedence over the cluster errors.

Parameters
brokerId int
apiKey int
count int
errorCodes int[] a list of error codes or 0

pushRequestErrors()

public pushRequestErrors ( 
    int $apiKey, 
    int $count, 
    int $errorCodes
 ): void

Push cnt errors onto the cluster’s error stack for the given apiKey.

ApiKey is the Kafka protocol request type, e.g., Produce (0).

The following cnt protocol requests matching apiKey will fail with the provided error code and removed from the stack, starting with the first error code, then the second, etc.

Parameters
apiKey int
count int
errorCodes int

pushRequestErrorsArray()

public pushRequestErrorsArray ( 
    int $apiKey, 
    int $count, 
    int[] $errorCodes
 ): void

See {@link MockCluster::pushRequestErrors()}

Parameters
apiKey int
count int
errorCodes int[]

setApiVersion()

public setApiVersion ( 
    int $apiKey, 
    int $minVersion, 
    int $maxVersion
 ): void

Set the allowed ApiVersion range for apiKey.

Set minVersion and maxVersion to -1 to disable the API completely. MaxVersion MUST not exceed the maximum implemented value.

Parameters
apiKey int Protocol request type/key
minVersion int Minimum version supported (or -1 to disable).
maxVersion int Maximum version supported (or -1 to disable).

setBrokerDown()

public setBrokerDown ( 
    int $brokerId
 ): void

Disconnects the broker and disallows any new connections.

This does NOT trigger leader change.

Parameters
brokerId int

setBrokerRack()

public setBrokerRack ( 
    int $brokerId, 
    string $rack
 ): void

Sets the broker’s rack as reported in Metadata to the client.

Parameters
brokerId int
rack string

setBrokerUp()

public setBrokerUp ( 
    int $brokerId
 ): void

Makes the broker accept connections again.

This does NOT trigger leader change.

Parameters
brokerId int

setCoordinator()

public setCoordinator ( 
    string $keyType, 
    string $key, 
    int $brokerId
 ): void

Explicitly sets the coordinator.

If this API is not a standard hashing scheme will be used.

Parameters
keyType string “transaction” or “group”
key string The transactional.id or group.id
brokerId int The new coordinator, does not have to be a valid broker.

setPartitionFollower()

public setPartitionFollower ( 
    string $topic, 
    int $partition, 
    int $brokerId
 ): void

Sets the partition’s preferred replica / follower.

The topic will be created if it does not exist.

Parameters
topic string
partition int
brokerId int does not need to point to an existing broker.

setPartitionFollowerWatermarks()

public setPartitionFollowerWatermarks ( 
    string $topic, 
    int $partition, 
    int $low, 
    int $high
 ): void

Sets the partition’s preferred replica / follower low and high watermarks.

The topic will be created if it does not exist. Setting an offset to -1 will revert back to the leader’s corresponding watermark.

Parameters
topic string
partition int
low int
high int

setPartitionLeader()

public setPartitionLeader ( 
    string $topic, 
    int $partition, 
    int $brokerId
 ): void

Sets the partition leader.

The topic will be created if it does not exist.

Parameters
topic string
partition int
brokerId int needs to be an existing broker

setRtt()

public setRtt ( 
    int $brokerId, 
    int $roundTripTimeDelayMs
 ): void

Set broker round-trip-time delay in milliseconds.

Parameters
brokerId int
roundTripTimeDelayMs int

setTopicError()

public setTopicError ( 
    string $topic, 
    int $errorCode
 ): void

Set the topic error to return in protocol requests.

Currently only used for TopicMetadataRequest and AddPartitionsToTxnRequest.

Parameters
topic string
errorCode int

Test Coverage 🧡

  • 🧡 Lines: 67.71% (65 / 96)
  • ❤️ Methods: 35% (7 / 20)