diff --git a/digitalpy/core/component_management/impl/component_registration_handler.py b/digitalpy/core/component_management/impl/component_registration_handler.py index 17ff93e..2aa600a 100644 --- a/digitalpy/core/component_management/impl/component_registration_handler.py +++ b/digitalpy/core/component_management/impl/component_registration_handler.py @@ -35,26 +35,36 @@ def clear(): @staticmethod def discover_components(component_folder_path: PurePath) -> List[str]: - """this method is used to discover all available components + """Discover valid components. - Args: - component_folder_path (str): the path in which to search for components. the searchable folder should be in the following format:\n - component_folder_path \n - |-- some_component \n - | `-- some_component_facade.py\n - `-- another_component\n - `-- another_component_facade.py\n - Returns: - List[str]: a list of available components in the given path + A valid component must have: + - /_facade.py + - /configuration/manifest.ini + + This avoids trying to register legacy/helper folders that look like + components but are not installable DigitalPy components. """ potential_components = os.scandir(component_folder_path) components = [] + for potential_component in potential_components: + if not potential_component.is_dir(): + continue + facade_path = PurePath( - potential_component.path, potential_component.name + "_facade.py" + potential_component.path, + potential_component.name + "_facade.py", + ) + + manifest_path = PurePath( + potential_component.path, + "configuration", + "manifest.ini", ) - if os.path.exists(facade_path): + + if os.path.exists(facade_path) and os.path.exists(manifest_path): components.append(PurePath(potential_component.path)) + return components @staticmethod diff --git a/digitalpy/core/component_management/impl/default_facade.py b/digitalpy/core/component_management/impl/default_facade.py index c356eea..9af259a 100644 --- a/digitalpy/core/component_management/impl/default_facade.py +++ b/digitalpy/core/component_management/impl/default_facade.py @@ -22,24 +22,24 @@ class DefaultFacade(Controller): def __init__( - self, - action_mapping_path: str, - internal_action_mapping_path, - logger_configuration, - log_file_path, - component_name=None, - type_mapping=None, - action_mapper: DefaultActionMapper = None, # type: ignore - base=ModuleType, - request: Request = None, # type: ignore - response: Response = None, # type: ignore - configuration: Configuration = None, # type: ignore - configuration_path_template=None, - tracing_provider_instance=None, - manifest_path=None, - action_flow_path: Optional[str] = None, - object_configuration_paths: Optional[str] = None, - **kwargs, + self, + action_mapping_path: str, + internal_action_mapping_path, + logger_configuration, + log_file_path, + component_name=None, + type_mapping=None, + action_mapper: DefaultActionMapper = None, # type: ignore + base=ModuleType, + request: Request = None, # type: ignore + response: Response = None, # type: ignore + configuration: Configuration = None, # type: ignore + configuration_path_template=None, + tracing_provider_instance=None, + manifest_path=None, + action_flow_path: Optional[str] = None, + object_configuration_paths: Optional[str] = None, + **kwargs, ): """_summary_ @@ -177,7 +177,7 @@ def get_configuration_path(self) -> str: def get_flow_configuration_path(self) -> str: """get the flow configuration path for the component""" return self.action_flow_path - + def get_object_configuration_path(self) -> str: """get the object configuration path for the component""" return self.object_configuration_paths @@ -192,11 +192,17 @@ def get_action_mapper(self) -> DefaultActionMapper: internal_config, ), ) + def setup(self, **kwargs): """setup the component""" self.action_mapper = self.get_action_mapper() self._register_type_mapping() + def register(self, config: InifileConfiguration, **kwargs): + """register the component with the system""" + config.add_configuration(self.action_mapping_path) + self.setup(**kwargs) + def unregister(self, config: InifileConfiguration, **kwargs): """unregister the component from the system""" ObjectFactory.clear_instance(f"{self.component_name.lower()}actionmapper") @@ -207,28 +213,40 @@ def get_manifest(self, **kwargs): return self.manifest def _register_type_mapping(self): - """any component may or may not have a type mapping defined, - if it does then it should be registered""" - if self.type_mapping: + """Register optional type mappings. + + Some legacy FTS components define type_mapping before the Type component + action mapper is available. That must not abort component registration. + """ + + if not self.type_mapping: + return + + actionmapper = ObjectFactory.get_instance("SyncActionMapper") + + try: request = ObjectFactory.get_new_instance("request") request.set_action("RegisterMachineToHumanMapping") request.set_value("machine_to_human_mapping", self.type_mapping) - actionmapper = ObjectFactory.get_instance("SyncActionMapper") response = ObjectFactory.get_new_instance("response") actionmapper.process_action(request, response) request = ObjectFactory.get_new_instance("request") request.set_action("RegisterHumanToMachineMapping") - # reverse the mapping and save the reversed mapping request.set_value( - "human_to_machine_mapping", {k: v for v, k in self.type_mapping.items()} + "human_to_machine_mapping", + {k: v for v, k in self.type_mapping.items()}, ) - actionmapper = ObjectFactory.get_instance("SyncActionMapper") response = ObjectFactory.get_new_instance("response") actionmapper.process_action(request, response) + except ValueError as exc: + if "No action key found" in str(exc): + return + raise + def accept_visitor(self, node: Node, visitor, **kwargs): return node.accept_visitor(visitor) @@ -244,4 +262,4 @@ def __getstate__(self) -> dict: set_base = tmp.get("base", None) if set_base is not None: tmp["base"] = True - return tmp + return tmp \ No newline at end of file diff --git a/digitalpy/core/impl/__init__.py b/digitalpy/core/impl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/digitalpy/core/impl/default_event_manager.py b/digitalpy/core/impl/default_event_manager.py new file mode 100644 index 0000000..73196b2 --- /dev/null +++ b/digitalpy/core/impl/default_event_manager.py @@ -0,0 +1,3 @@ +from digitalpy.core.main.impl.default_event_manager import DefaultEventManager + +__all__ = ["DefaultEventManager"] diff --git a/digitalpy/core/service_management/digitalpy_service.py b/digitalpy/core/service_management/digitalpy_service.py index 61c1262..2c21bce 100644 --- a/digitalpy/core/service_management/digitalpy_service.py +++ b/digitalpy/core/service_management/digitalpy_service.py @@ -101,13 +101,14 @@ class DigitalPyService: # with the service information def __init__( - self, - service_id: str, - service: ServiceConfiguration, - integration_manager_subscriber: IntegrationManagerSubscriber, - subject_pusher: SubjectPusher, - integration_manager_pusher: IntegrationManagerPusher, - error_threshold: float = 0.1, + self, + service_id: str, + service: ServiceConfiguration | str, + integration_manager_subscriber: IntegrationManagerSubscriber | int | None = None, + subject_pusher: SubjectPusher | str | None = None, + integration_manager_pusher: IntegrationManagerPusher | str | None = None, + error_threshold: float | int | str | None = 0.1, + *legacy_args, ): """the constructor for the digitalpy service class @@ -116,36 +117,107 @@ def __init__( formatter (Formatter): the formatter used by the service to serialize the request values to and from messages, (should be injected by object factory) protocol (NetworkInterface): the network interface used by the service to send and receive messages, (should be injected by object factory through the services' constructor) """ - self._integration_manager_subscriber = integration_manager_subscriber - self._subject_pusher = subject_pusher - self._integration_manager_pusher = integration_manager_pusher - self._service_conf = service + if isinstance(service, ServiceConfiguration): + service_conf = service + ims = integration_manager_subscriber + sp = subject_pusher + imp = integration_manager_pusher + threshold = error_threshold if error_threshold is not None else 0.1 + + else: + # Legacy FreeTAKServer signature: + # service_id, + # subject_address, + # subject_port, + # subject_protocol, + # integration_manager_address, + # integration_manager_port, + # integration_manager_protocol, + # formatter + formatter = legacy_args[-1] if legacy_args else None + + service_conf = ServiceConfiguration() + service_conf.name = service_id + service_conf.host = str(service) + service_conf.port = ( + int(integration_manager_subscriber) + if integration_manager_subscriber is not None + else None + ) + service_conf.protocol = ( + str(subject_pusher) + if subject_pusher is not None + else None + ) + service_conf.status = ServiceStatusEnum.STOPPED.value + service_conf.flows = [] + + zconf = SingletonConfigurationFactory.get_configuration_object( + "ZManagerConfiguration" + ) + + timeout = getattr(zconf, "integration_manager_pull_timeout", None) or 0 + + ims = IntegrationManagerSubscriber( + formatter=formatter, + timeout=timeout, + service_id=service_id, + application_protocol=service_conf.protocol, + ) + + sp = SubjectPusher( + formatter=formatter, + service_id=service_id, + ) + + imp = IntegrationManagerPusher( + formatter=formatter, + ) + + threshold = 0.1 + + self._integration_manager_subscriber = ims + self._subject_pusher = sp + self._integration_manager_pusher = imp + self._service_conf = service_conf + self._zmanager_configuration: ZManagerConfiguration = ( SingletonConfigurationFactory.get_configuration_object( "ZManagerConfiguration" ) ) + self.subject_address = self._zmanager_configuration.subject_pull_address self.integration_manager_address = ( self._zmanager_configuration.integration_manager_pub_address ) self._tracer = None - self.protocol: NetworkInterface = ObjectFactory.get_instance( - self.configuration.protocol - ) - self.iam_facade: IAM = ObjectFactory.get_instance("IAM") + + self.protocol = None + if self.configuration.protocol: + try: + self.protocol = ObjectFactory.get_instance(self.configuration.protocol) + except Exception: + self.protocol = None + + try: + self.iam_facade: IAM = ObjectFactory.get_instance("IAM") + except Exception: + self.iam_facade = None + self.service_id = service_id self.total_requests = 0 self.total_errors = 0 self.total_request_processing_time = 0 - self.error_threshold = error_threshold + self.error_threshold = threshold self._process: Optional[Process] = None - self._topics: list[ActionKey] = [] + self.stop_event: threading.Event = threading.Event() - self.stop_event: threading.Event + # Legacy FTS compatibility. FTS accesses these directly. + self.subscriber_socket = None def handle_connection(self, message: Request): """register a client with the server. This method should be called when a client connects to the server @@ -276,13 +348,23 @@ def initialize_controllers(self): by inheriting classes """ - def initialize_connections(self): - """initialize connections to the subject and the integration manager within the - zmanager architecture. + def initialize_connections(self, application_protocol: str | None = None): + """Initialize zmanager connections. + + The optional application_protocol argument is kept for legacy FTS services, + which call initialize_connections(APPLICATION_PROTOCOL). """ + + if application_protocol is not None: + self._integration_manager_subscriber.application_protocol = application_protocol + self._integration_manager_subscriber.setup() self._subject_pusher.setup() self._integration_manager_pusher.setup() + + # Legacy FTS code accesses self.subscriber_socket directly. + self.subscriber_socket = self._integration_manager_subscriber.subscriber_socket + self._subscribe_to_commands() self._subscribe_to_flows() @@ -502,28 +584,38 @@ def handle_exception(self, exception: Exception): self.total_errors += 1 def start( - self, - object_factory: DefaultFactory, - tracing_provider: TracingProvider, - conf_factory: ConfigurationFactory, + self, + object_factory: DefaultFactory | None = None, + tracing_provider: TracingProvider | None = None, + conf_factory: ConfigurationFactory | None = None, ): - """used to start the service and initialize the network if provided + """Start the service. - Args: - object_factory (DefaultFactory): the object factory used to create instances of objects - tracing_provider (TracingProvider): the tracing provider used to create a tracer + Supports: + - current DigitalPy start(factory, tracing_provider, conf_factory) + - legacy FTS super().start() """ + + # Legacy FTS services call super().start() and then manually call + # initialize_connections(APPLICATION_PROTOCOL). + if object_factory is None and tracing_provider is None and conf_factory is None: + self.status = ServiceStatusEnum.RUNNING.value + return + SingletonConfigurationFactory.configure(conf_factory) ObjectFactory.configure(object_factory) + self.tracer = tracing_provider.create_tracer(self.service_id) + self.initialize_controllers() self.initialize_connections() - self.protocol.initialize_network( - self.configuration.host, - self.configuration.port, - service_desc=self._service_conf, - ) + if self.protocol is not None: + self.protocol.initialize_network( + self.configuration.host, + self.configuration.port, + service_desc=self._service_conf, + ) self.status = ServiceStatusEnum.RUNNING.value self.execute_main_loop() @@ -536,3 +628,31 @@ def execute_main_loop(self): self.event_loop() except Exception as ex: self.handle_exception(ex) + + def subject_send_request( + self, + request: Request, + application_protocol: str | None = None, + service_id: str | None = None, + ): + """Legacy FTS compatibility wrapper for sending requests to the subject.""" + + if application_protocol is not None: + request.set_format(application_protocol) + + target_service_id = service_id if service_id is not None else self.service_id + + self._subject_pusher.push_container( + request, + service_id=target_service_id, + ) + + def broker_receive(self, blocking: bool = False): + """Legacy FTS compatibility wrapper for receiving broker responses.""" + + response = self._integration_manager_subscriber.fetch_integration_manager_response() + + if response is None: + return [] + + return [response] diff --git a/digitalpy/routing/__init__.py b/digitalpy/routing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/digitalpy/routing/impl/__init__.py b/digitalpy/routing/impl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/digitalpy/routing/impl/default_action_mapper.py b/digitalpy/routing/impl/default_action_mapper.py new file mode 100644 index 0000000..bd9ca21 --- /dev/null +++ b/digitalpy/routing/impl/default_action_mapper.py @@ -0,0 +1,3 @@ +from digitalpy.core.zmanager.impl.default_action_mapper import DefaultActionMapper + +__all__ = ["DefaultActionMapper"] diff --git a/digitalpy/routing/impl/default_request.py b/digitalpy/routing/impl/default_request.py new file mode 100644 index 0000000..39a0f93 --- /dev/null +++ b/digitalpy/routing/impl/default_request.py @@ -0,0 +1,3 @@ +from digitalpy.core.zmanager.impl.default_request import DefaultRequest + +__all__ = ["DefaultRequest"] diff --git a/digitalpy/routing/impl/default_response.py b/digitalpy/routing/impl/default_response.py new file mode 100644 index 0000000..414452a --- /dev/null +++ b/digitalpy/routing/impl/default_response.py @@ -0,0 +1,3 @@ +from digitalpy.core.zmanager.impl.default_response import DefaultResponse + +__all__ = ["DefaultResponse"]