Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions trivup/apps/KafkaBrokerApp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
class KafkaBrokerApp (trivup.App):
""" Kafka broker app
Depends on ZookeeperApp (unless KRaft mode) """

def __init__(self, cluster, conf=None, on=None):
"""
@param cluster Current cluster
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(self, cluster, conf=None, on=None):
* fdlimit - RLIMIT_NOFILE (or "max") (default: max)
* conf - arbitary server.properties config as a list of strings.
* realm - Kerberos realm to use when sasl_mechanisms contains GSSAPI
*
"""
super(KafkaBrokerApp, self).__init__(cluster, conf=conf, on=on)

Expand Down Expand Up @@ -171,27 +173,34 @@ def __init__(self, cluster, conf=None, on=None):

conf_blob.append(listener_map)


def sort_listener(a):
""" Sort listener_types list so that the PLAINTEXTs are first,
since the first listener is used by operational(). """
if a.startswith('PLAINTEXT'):
return 0
else:
return 1

# Allocate a port for each listener type
ports = [(x, trivup.TcpPortAllocator(self.cluster).next(

# If brokers ports are specified those would be used, if not it would fallback to TcpPortAllocator logic
if 'user_port' in self.conf and self.conf['user_port']:
ports = [(x, self.conf.get('user_port')) for x in sorted(set(listener_types), key=sort_listener)]
self.conf['port'] = self.conf.get('user_port')
self.conf['address'] = '%s:%d' % (listener_host, self.conf.get('user_port'))
else :
ports = [(x, trivup.TcpPortAllocator(self.cluster).next(
self, self.conf.get('port_base', self.conf.get('port', None))))
for x in sorted(set(listener_types), key=sort_listener)]
self.conf['port'] = ports[0][1] # "Default" port
self.conf['port'] = ports[0][1] # "Default" port
self.conf['address'] = '%s:%d' % (listener_host, self.conf['port'])


if can_docker:
# Add docker listener to allow services (e.g, SchemaRegistry) in
# docker-containers to reach the on-host Kafka.
docker_port = trivup.TcpPortAllocator(self.cluster).next(self)
docker_host = '%s:%d' % (cluster.get_docker_host(), docker_port)

self.conf['address'] = '%s:%d' % (listener_host, self.conf['port'])
# Create a listener for each port
listeners = ['%s://%s:%d' % (x[0], "0.0.0.0", x[1]) for x in ports]
if can_docker:
Expand Down Expand Up @@ -462,4 +471,4 @@ def _add_simple_authorizer(self, conf_blob):
if self.version[0] >= 3:
conf_blob.append('authorizer.class.name=kafka.security.authorizer.AclAuthorizer') # noqa: E501
else:
conf_blob.append('authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer') # noqa: E501
conf_blob.append('authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer') # noqa: E501
73 changes: 68 additions & 5 deletions trivup/clusters/KafkaCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@
import argparse
import subprocess
import copy

import socket

class KafkaCluster(object):

# conf dict structure with defaults:
# commented-out fields are not defaults but show what is available.
default_conf = {
Expand All @@ -92,8 +93,10 @@ class KafkaCluster(object):
'oidc': False,
# Additional broker server.properties configuration
# 'broker_conf': ['connections.max.idle.ms=1234', ..]
# 'broker_ports' : Comma-separated list of Kafka broker ports. If not provided, random ports will be used.
}


def __init__(self, **kwargs):
""" Create and start a KafkaCluster.
See default_conf above for parameters. """
Expand All @@ -108,6 +111,27 @@ def __init__(self, **kwargs):
self.version_num = [int(x) for x in self.version.split('.')][:3]
self.kraft = self.conf.get('kraft')


# Checking if ports being passed are available
if 'broker_ports' in conf and conf['broker_ports']:
self.broker_ports_list = [int(port) for port in conf.get('broker_ports').split(',')]
self._check_ports_availability()
else:
self.broker_ports_list = []

# Scenario where no. of ports being passed doesn't match broker count
if 'broker_cnt' in conf and conf['broker_cnt'] and 'broker_ports' in conf and conf['broker_ports']:
if len(self.broker_ports_list) != self.conf.get('broker_cnt'):
raise ValueError(f"The number of ports :({len(self.broker_ports_list)}) does not match broker_cnt : ({conf['broker_cnt']}).")
else:
if 'broker_ports' in conf and conf['broker_ports']:
self.conf['broker_cnt'] = len(self.broker_ports_list)

# Broker count's default value has been set to None so has to be overwritten
if 'broker_cnt' not in self.conf or not self.conf['broker_cnt']:
self.conf['broker_cnt'] = self.default_conf.get('broker_cnt')


# Create trivup Cluster
self.cluster = Cluster(
self.__class__.__name__,
Expand Down Expand Up @@ -173,6 +197,9 @@ def __init__(self, **kwargs):
self.brokers = dict()
for n in range(0, broker_cnt):
bconf = copy.deepcopy(self.broker_conf)
if self.broker_ports_list:
bconf['user_port'] = self.broker_ports_list[n]

if self.version_num >= [2, 4, 0]:
# Configure rack & replica selector if broker supports
# fetch-from-follower
Expand Down Expand Up @@ -438,7 +465,8 @@ def interactive(self, cmd=None):
retcode, fullcmd))

return retcode



def client_conf(self):
""" Get a dict copy of the client configuration """
return deepcopy(self._client_conf)
Expand All @@ -451,7 +479,37 @@ def write_client_conf(self, path, additional_blob=None):
if additional_blob is not None:
f.write(str('#\n# Additional configuration:'))
f.write(str(additional_blob))



def _check_ports_availability(self):
""" Check availability of the broker ports and exit if any are unavailable. """
unavailable_ports = []

for port in self.broker_ports_list:
if not self._is_port_available(port):
unavailable_ports.append(port)

if unavailable_ports:
print(f"Error: The following broker ports are unavailable: {', '.join(map(str, unavailable_ports))}")
print("Closing application due to unavailable ports.")
sys.exit(1)

print(f"All broker ports are available: {', '.join(map(str, self.broker_ports_list))}")


def _is_port_available(self, port):
""" Check if a port is available by trying to bind to it. """
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', port))
return True
except socket.error:
return False
finally:
if s:
s.close()


if __name__ == '__main__':

Expand All @@ -472,7 +530,7 @@ def write_client_conf(self, path, additional_blob=None):
default=KafkaCluster.default_conf['with_sr'],
help='Enable SchemaRegistry')
parser.add_argument('--brokers', dest='broker_cnt', type=int,
default=KafkaCluster.default_conf['broker_cnt'],
default=None,
help='Number of Kafka brokers')
parser.add_argument('--version', dest='version', type=str,
default=KafkaCluster.default_conf['version'],
Expand All @@ -494,6 +552,9 @@ def write_client_conf(self, path, additional_blob=None):
action='store_true',
default=KafkaCluster.default_conf['oidc'],
help='Enable Oauthbearer OIDC JWT server')
parser.add_argument('--broker-ports', dest='broker_ports', type=str, default=None,
help='Comma-separated list of Kafka broker ports. If not provided, default ports will be used.')


args = parser.parse_args()

Expand All @@ -507,7 +568,9 @@ def write_client_conf(self, path, additional_blob=None):
'broker_cnt': args.broker_cnt,
'kafka_path': args.kafka_src,
'cleanup': not args.no_cleanup,
'oidc': args.oidc}
'oidc': args.oidc,
'broker_ports': args.broker_ports
}

kc = KafkaCluster(**conf)

Expand Down