From 6406ccf39f367096a2b2fad7998bdfe031566532 Mon Sep 17 00:00:00 2001 From: poushalibanik Date: Thu, 6 Mar 2025 13:05:05 +0530 Subject: [PATCH] Added a new functionality which now enables us to add broker ports of our own --- trivup/apps/KafkaBrokerApp.py | 21 +++++++--- trivup/clusters/KafkaCluster.py | 73 ++++++++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/trivup/apps/KafkaBrokerApp.py b/trivup/apps/KafkaBrokerApp.py index b5e2712..20080ec 100644 --- a/trivup/apps/KafkaBrokerApp.py +++ b/trivup/apps/KafkaBrokerApp.py @@ -41,6 +41,7 @@ class KafkaBrokerApp (trivup.App): """ Kafka broker app Depends on ZookeeperApp (unless KRaft mode) """ + def __init__(self, cluster, conf=None, on=None): """ @param cluster Current cluster @@ -73,6 +74,7 @@ def __init__(self, cluster, conf=None, on=None): * fdlimit - RLIMIT_NOFILE (or "max") (default: max) * conf - arbitary server.properties config as a list of strings. * realm - Kerberos realm to use when sasl_mechanisms contains GSSAPI + * """ super(KafkaBrokerApp, self).__init__(cluster, conf=conf, on=on) @@ -171,6 +173,7 @@ def __init__(self, cluster, conf=None, on=None): conf_blob.append(listener_map) + def sort_listener(a): """ Sort listener_types list so that the PLAINTEXTs are first, since the first listener is used by operational(). """ @@ -178,12 +181,19 @@ def sort_listener(a): return 0 else: return 1 - - # Allocate a port for each listener type - ports = [(x, trivup.TcpPortAllocator(self.cluster).next( + + # If brokers ports are specified those would be used, if not it would fallback to TcpPortAllocator logic + if 'user_port' in self.conf and self.conf['user_port']: + ports = [(x, self.conf.get('user_port')) for x in sorted(set(listener_types), key=sort_listener)] + self.conf['port'] = self.conf.get('user_port') + self.conf['address'] = '%s:%d' % (listener_host, self.conf.get('user_port')) + else : + ports = [(x, trivup.TcpPortAllocator(self.cluster).next( self, self.conf.get('port_base', self.conf.get('port', None)))) for x in sorted(set(listener_types), key=sort_listener)] - self.conf['port'] = ports[0][1] # "Default" port + self.conf['port'] = ports[0][1] # "Default" port + self.conf['address'] = '%s:%d' % (listener_host, self.conf['port']) + if can_docker: # Add docker listener to allow services (e.g, SchemaRegistry) in @@ -191,7 +201,6 @@ def sort_listener(a): docker_port = trivup.TcpPortAllocator(self.cluster).next(self) docker_host = '%s:%d' % (cluster.get_docker_host(), docker_port) - self.conf['address'] = '%s:%d' % (listener_host, self.conf['port']) # Create a listener for each port listeners = ['%s://%s:%d' % (x[0], "0.0.0.0", x[1]) for x in ports] if can_docker: @@ -462,4 +471,4 @@ def _add_simple_authorizer(self, conf_blob): if self.version[0] >= 3: conf_blob.append('authorizer.class.name=kafka.security.authorizer.AclAuthorizer') # noqa: E501 else: - conf_blob.append('authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer') # noqa: E501 + conf_blob.append('authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer') # noqa: E501 \ No newline at end of file diff --git a/trivup/clusters/KafkaCluster.py b/trivup/clusters/KafkaCluster.py index 8cae55c..31dd4da 100755 --- a/trivup/clusters/KafkaCluster.py +++ b/trivup/clusters/KafkaCluster.py @@ -64,9 +64,10 @@ import argparse import subprocess import copy - +import socket class KafkaCluster(object): + # conf dict structure with defaults: # commented-out fields are not defaults but show what is available. default_conf = { @@ -92,8 +93,10 @@ class KafkaCluster(object): 'oidc': False, # Additional broker server.properties configuration # 'broker_conf': ['connections.max.idle.ms=1234', ..] + # 'broker_ports' : Comma-separated list of Kafka broker ports. If not provided, random ports will be used. } + def __init__(self, **kwargs): """ Create and start a KafkaCluster. See default_conf above for parameters. """ @@ -108,6 +111,27 @@ def __init__(self, **kwargs): self.version_num = [int(x) for x in self.version.split('.')][:3] self.kraft = self.conf.get('kraft') + + # Checking if ports being passed are available + if 'broker_ports' in conf and conf['broker_ports']: + self.broker_ports_list = [int(port) for port in conf.get('broker_ports').split(',')] + self._check_ports_availability() + else: + self.broker_ports_list = [] + + # Scenario where no. of ports being passed doesn't match broker count + if 'broker_cnt' in conf and conf['broker_cnt'] and 'broker_ports' in conf and conf['broker_ports']: + if len(self.broker_ports_list) != self.conf.get('broker_cnt'): + raise ValueError(f"The number of ports :({len(self.broker_ports_list)}) does not match broker_cnt : ({conf['broker_cnt']}).") + else: + if 'broker_ports' in conf and conf['broker_ports']: + self.conf['broker_cnt'] = len(self.broker_ports_list) + + # Broker count's default value has been set to None so has to be overwritten + if 'broker_cnt' not in self.conf or not self.conf['broker_cnt']: + self.conf['broker_cnt'] = self.default_conf.get('broker_cnt') + + # Create trivup Cluster self.cluster = Cluster( self.__class__.__name__, @@ -173,6 +197,9 @@ def __init__(self, **kwargs): self.brokers = dict() for n in range(0, broker_cnt): bconf = copy.deepcopy(self.broker_conf) + if self.broker_ports_list: + bconf['user_port'] = self.broker_ports_list[n] + if self.version_num >= [2, 4, 0]: # Configure rack & replica selector if broker supports # fetch-from-follower @@ -438,7 +465,8 @@ def interactive(self, cmd=None): retcode, fullcmd)) return retcode - + + def client_conf(self): """ Get a dict copy of the client configuration """ return deepcopy(self._client_conf) @@ -451,7 +479,37 @@ def write_client_conf(self, path, additional_blob=None): if additional_blob is not None: f.write(str('#\n# Additional configuration:')) f.write(str(additional_blob)) - + + + def _check_ports_availability(self): + """ Check availability of the broker ports and exit if any are unavailable. """ + unavailable_ports = [] + + for port in self.broker_ports_list: + if not self._is_port_available(port): + unavailable_ports.append(port) + + if unavailable_ports: + print(f"Error: The following broker ports are unavailable: {', '.join(map(str, unavailable_ports))}") + print("Closing application due to unavailable ports.") + sys.exit(1) + + print(f"All broker ports are available: {', '.join(map(str, self.broker_ports_list))}") + + + def _is_port_available(self, port): + """ Check if a port is available by trying to bind to it. """ + s = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('', port)) + return True + except socket.error: + return False + finally: + if s: + s.close() + if __name__ == '__main__': @@ -472,7 +530,7 @@ def write_client_conf(self, path, additional_blob=None): default=KafkaCluster.default_conf['with_sr'], help='Enable SchemaRegistry') parser.add_argument('--brokers', dest='broker_cnt', type=int, - default=KafkaCluster.default_conf['broker_cnt'], + default=None, help='Number of Kafka brokers') parser.add_argument('--version', dest='version', type=str, default=KafkaCluster.default_conf['version'], @@ -494,6 +552,9 @@ def write_client_conf(self, path, additional_blob=None): action='store_true', default=KafkaCluster.default_conf['oidc'], help='Enable Oauthbearer OIDC JWT server') + parser.add_argument('--broker-ports', dest='broker_ports', type=str, default=None, + help='Comma-separated list of Kafka broker ports. If not provided, default ports will be used.') + args = parser.parse_args() @@ -507,7 +568,9 @@ def write_client_conf(self, path, additional_blob=None): 'broker_cnt': args.broker_cnt, 'kafka_path': args.kafka_src, 'cleanup': not args.no_cleanup, - 'oidc': args.oidc} + 'oidc': args.oidc, + 'broker_ports': args.broker_ports + } kc = KafkaCluster(**conf)