A .NET Core distributed process orchestration platform built as an experiment in scalable job processing across multiple nodes. TDIE explores a component-based architecture where business logic is packaged into reusable components that can be deployed and managed across a cluster of machines.
TDIE is an experiment in distributed systems that explores horizontal scaling of job processing by distributing workloads across multiple nodes. The platform uses a component-based architecture where business logic is packaged into reusable components that can be deployed and managed across a cluster of machines.
- Distributed Process Orchestration: Attempts to distribute and manage component instances across cluster nodes
- Dynamic Component Loading: Supports hot-deployment of components without system restarts
- Cluster Management: Basic node discovery, health monitoring, and workload balancing
- Message-Driven Architecture: Simple publish/subscribe messaging between components
- Package Management: Basic deployment and versioning of component packages
- RESTful APIs: REST APIs for cluster and node management
- Extensible Framework: Plugin architecture for custom components and message publishers
The following diagram shows how TDIE operates in a 3-node cluster environment, demonstrating the distributed orchestration, component hosting, and inter-node communication patterns:
graph TB
subgraph "TDIE Distributed Cluster Architecture"
subgraph "Master Node (Node 1 - tdie-node-1)"
subgraph "Node Manager Process"
NM[NodeManagerComponent<br/>IComponent Implementation]
CM[ClusterManager<br/>Node Distribution Logic]
NS[NodeSynchronizer<br/>Package Synchronization]
DLF[DistributedLockFactory<br/>SQL Server Locks]
MAPI[Master Web API<br/>:7777/api/master]
end
subgraph "Component Host Instances"
CH1A[Component Host A<br/>:5000]
CH1B[Component Host B<br/>:5001]
CH1C[Component Host C<br/>:5002]
end
subgraph "Node API"
N1API[Node API<br/>:5100/api/node]
PM1[Package Manager<br/>LiteDB Storage]
PROC1[Process Manager<br/>Local Process Control]
end
end
subgraph "Worker Node 2 (tdie-node-2)"
subgraph "Component Host Instances"
CH2A[Component Host A<br/>:5000]
CH2B[Component Host B<br/>:5001]
CH2C[Component Host C<br/>:5002]
end
subgraph "Node API"
N2API[Node API<br/>:5100/api/node]
PM2[Package Manager<br/>LiteDB Storage]
PROC2[Process Manager<br/>Local Process Control]
end
end
subgraph "Worker Node 3 (tdie-node-3)"
subgraph "Component Host Instances"
CH3A[Component Host A<br/>:5000]
CH3B[Component Host B<br/>:5001]
CH3C[Component Host C<br/>:5002]
end
subgraph "Node API"
N3API[Node API<br/>:5100/api/node]
PM3[Package Manager<br/>LiteDB Storage]
PROC3[Process Manager<br/>Local Process Control]
end
end
subgraph "Components Running in Component Hosts"
subgraph "Component Types (IComponent Implementations)"
FW[File Watcher<br/>Monitor Directories]
QS[Quartz Scheduler<br/>Cron Jobs]
WA[Web API<br/>REST Endpoints]
DR[Database Replication<br/>Data Sync]
CUSTOM[Custom Components<br/>Business Logic]
end
end
subgraph "Shared Infrastructure"
subgraph "SQL Server Database"
LOCKS[Distributed Locks Table<br/>Medallion.Threading.Sql]
META[Metadata Storage<br/>Component Configurations]
end
subgraph "Package Repository"
PKG1[componentHost.zip]
PKG2[fileWatcher.zip]
PKG3[quartzScheduler.zip]
PKG4[webApiComponent.zip]
PKG5[customComponent.zip]
end
end
end
%% Connections - Cluster Management
NM -->|Timer: 50s Sync| CM
CM -->|Distributed Operations| DLF
CM -->|Package Deployment| NS
NS -->|HTTP Requests| N1API
NS -->|HTTP Requests| N2API
NS -->|HTTP Requests| N3API
%% Connections - Component Host Management
CM -->|Start/Stop/Configure| CH1A
CM -->|Start/Stop/Configure| CH1B
CM -->|Start/Stop/Configure| CH1C
CM -->|Start/Stop/Configure| CH2A
CM -->|Start/Stop/Configure| CH2B
CM -->|Start/Stop/Configure| CH2C
CM -->|Start/Stop/Configure| CH3A
CM -->|Start/Stop/Configure| CH3B
CM -->|Start/Stop/Configure| CH3C
%% Connections - Node API to Local Services
N1API -->|Process Control| PROC1
N1API -->|Package Operations| PM1
N2API -->|Process Control| PROC2
N2API -->|Package Operations| PM2
N3API -->|Process Control| PROC3
N3API -->|Package Operations| PM3
%% Connections - Component Hosting
CH1A -.->|Hosts| FW
CH1B -.->|Hosts| QS
CH1C -.->|Hosts| WA
CH2A -.->|Hosts| DR
CH2B -.->|Hosts| CUSTOM
CH2C -.->|Hosts| FW
CH3A -.->|Hosts| QS
CH3B -.->|Hosts| WA
CH3C -.->|Hosts| DR
%% Connections - Database Access
DLF -->|Acquire/Release Locks| LOCKS
CM -->|Read Configurations| META
NS -->|Read Configurations| META
%% Connections - Package Repository
NS -->|Deploy Packages| PKG1
NS -->|Deploy Packages| PKG2
NS -->|Deploy Packages| PKG3
NS -->|Deploy Packages| PKG4
NS -->|Deploy Packages| PKG5
%% Process Management
PROC1 -->|Launch/Kill| CH1A
PROC1 -->|Launch/Kill| CH1B
PROC1 -->|Launch/Kill| CH1C
PROC2 -->|Launch/Kill| CH2A
PROC2 -->|Launch/Kill| CH2B
PROC2 -->|Launch/Kill| CH2C
PROC3 -->|Launch/Kill| CH3A
PROC3 -->|Launch/Kill| CH3B
PROC3 -->|Launch/Kill| CH3C
%% External API Access
EXT[External Clients<br/>REST API Calls] -->|Cluster Management| MAPI
EXT -->|Direct Node Access| N1API
EXT -->|Direct Node Access| N2API
EXT -->|Direct Node Access| N3API
classDef masterNode fill:#e1f5fe,stroke:#0277bd,stroke-width:2px
classDef workerNode fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef componentHost fill:#fff3e0,stroke:#f57c00,stroke-width:2px
classDef component fill:#e8f5e8,stroke:#388e3c,stroke-width:2px
classDef infrastructure fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef api fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
class NM,CM,NS,DLF,MAPI masterNode
class N2API,PM2,PROC2,N3API,PM3,PROC3 workerNode
class CH1A,CH1B,CH1C,CH2A,CH2B,CH2C,CH3A,CH3B,CH3C componentHost
class FW,QS,WA,DR,CUSTOM component
class LOCKS,META,PKG1,PKG2,PKG3,PKG4,PKG5 infrastructure
class N1API,PM1,PROC1 api
- NodeManagerComponent: Runs as an IComponent within a Component Host, demonstrating the experimental extensible architecture
- Timer-Based Synchronization: Every 50 seconds, triggers cluster-wide synchronization operations
- ClusterManager: Coordinates component distribution, node expansion/shrinkage, and instance lifecycle management
- Distributed Locking: Uses SQL Server-based locks via Medallion.Threading.Sql for cluster-wide coordination
Master Node Decision → Distributed Lock Acquisition → Package Synchronization →
Component Host Provisioning → Component Instantiation → Health Monitoring
- RESTful APIs: All inter-node communication via HTTP/HTTPS REST APIs
- Node API Endpoints: Each node exposes standardized
/api/nodeendpoints for:- Process management (
/processes) - Package management (
/packages) - Health monitoring (
/stats)
- Process management (
- Process Isolation: Each component runs in its own Component Host process
- Dynamic Port Allocation: Ports assigned dynamically (starting from 5000)
- Independent Lifecycles: Components can be started/stopped/updated independently
- Configuration-Driven: All components use same IComponent interface regardless of complexity
Package Upload → Validation → Distribution to Nodes → Local Storage →
Component Host Provisioning → Assembly Loading → Component Instantiation
- Cluster Expansion: New nodes automatically receive all required packages and component instances
- Cluster Shrinkage: Components gracefully shut down before node removal
- Load Balancing: Components distributed across available nodes based on capacity
- Fault Tolerance: Failed nodes detected and workloads redistributed automatically
- Uniform Interface: File Watchers, Schedulers, Web APIs, and the Node Manager itself all implement IComponent
- Same Lifecycle: All components use StartAsync/StopAsync pattern
- Configuration Consistency: All components configured via key-value pairs
- Deployment Uniformity: All components packaged and deployed identically
This architecture demonstrates an experimental approach where complex distributed orchestration is built from the same foundational abstractions as simple components, exploring scalability through a unified component model.
1. Node Manager (TDIE.Components.NodeManager)
The Node Manager acts as the central orchestrator for the distributed cluster. It coordinates cluster-wide operations and maintains the overall health of the distributed system.
Responsibilities:
- Cluster Synchronization: Monitors and synchronizes the state of all nodes in the cluster
- Component Distribution: Distributes component instances across available nodes
- Node Health Monitoring: Tracks the health and performance of individual nodes
- Package Deployment: Manages the deployment and versioning of component packages
- Distributed Locking: Coordinates access to shared resources using distributed locks
Architecture Components:
- Cluster Manager: Handles node addition, removal, and load distribution
- Node Synchronizer: Ensures package versions and configurations are consistent across nodes
- Distributed Lock Factory: Provides cluster-wide locking mechanisms using SQL Server-based distributed locks
- Component Instance Manager: Tracks and manages component instances across the cluster
Operational Flow:
- Cluster Discovery: Reads cluster configuration and discovers available nodes
- Node Synchronization: Ensures all nodes have the required packages and configurations
- Instance Distribution: Distributes component instances based on configured rules
- Health Monitoring: Monitors node health and redistributes workloads when failures occur
- Package Synchronization: Deploys new package versions across the cluster
REST API Endpoints:
GET /api/master- Cluster management endpoints (implementation in progress)
Configuration Management:
The Node Manager uses JSON configuration files to define cluster topology, node specifications, and component deployment rules. The cluster configuration is defined in clusterSettings.json:
{
"nodePackages": [
{
"name": "ComponentHostProcess",
"version": "1.0.0",
"packagePath": "\\\\node-share\\repos\\Integration Platform\\TDIE.ComponentHost.WebApi\\TDIE.ComponentHost.WebApi\\bin\\Release\\netcoreapp2.2\\componentHost.zip"
}
],
"componnetHostPackageName": "componentHostProcess",
"nodeServers": [
{
"networkName": "tdie-node-1",
"networkIp": "10.0.10.21",
"nodeApiUri": "https://n1.tdie.skynet.home"
},
{
"networkName": "tdie-node-2",
"networkIp": "10.0.10.22",
"nodeApiUri": "https://n2.tdie.skynet.home"
},
{
"networkName": "tdie-node-3",
"networkIp": "10.0.10.23",
"nodeApiUri": "https://n3.tdie.skynet.home"
}
]
}Configuration Properties:
nodePackages: Array of packages that need to be deployed to cluster nodesname: Package identifier used for referenceversion: Package version for tracking updatespackagePath: File system path or network path to the package ZIP file
componnetHostPackageName: Name of the Component Host package that manages component instancesnodeServers: Array of nodes available in the clusternetworkName: Unique identifier for the nodenetworkIp: IP address of the node for network communicationnodeApiUri: Base URI for the Node API endpoint
2. Component Host (TDIE.ComponentHost)
The Component Host is a lightweight runtime environment that serves as a building block for component execution within the distributed system, providing process isolation, lifecycle management, and dynamic service composition.
Responsibilities:
- Dynamic Component Loading: Loads and instantiates components from assemblies at runtime
- Isolated Runtime Environment: Provides process isolation for component instances
- Lifecycle Orchestration: Manages the complete component lifecycle from configuration through startup to shutdown
- Dependency Injection Container: Creates and manages a DI container with automatic service registration
- Configuration-Driven Assembly Loading: Loads components and message publishers based on assembly paths and class names
- State Management: Tracks and manages component and host states with error handling
Architecture Components: ComponentHostBackgroundService: The core orchestration engine that manages component instances
- Service Provider Management: Creates isolated
IServiceProviderinstances with component-specific service registrations - Assembly Loading: Dynamically loads assemblies using
GetTypeFromAssembly()for both components and message publishers - Configuration Validation: Validates assembly paths, class names, and component compatibility before instantiation
- Error Handling: Implements error handling with state tracking and rollback capabilities
Dependency Injection Pipeline:
_serviceProvider = new ServiceCollection()
.AddSingleton(_loggerFactory)
.AddLogging()
.AddOptions()
.AddSingleton(_componentSettings)
.AddSingleton(_messagePublisherSettings)
.AddSingleton<IObjectMapperService, ObjectMapperService>()
.AddSingleton(typeof(IMessagePublisher), MessagePublisherType)
.AddSingleton(typeof(IComponent), ComponentType)
.BuildServiceProvider();Dynamic Type Resolution:
- Components are loaded using
ComponentType = GetTypeFromAssembly(assemblyPath, fullyQualifiedClassName) - Message publishers are optionally loaded based on component requirements
- Type validation ensures components implement required interfaces (
IComponent,IMessagePublisher)
Component Host State Machine: The Component Host implements a state machine with the following states:
AwaitingConfiguration: Initial state waiting for component and message publisher configurationsConfigured: Assembly loading completed, services configured, ready to startStarted: All services running, component and message publisher activeStopping: Graceful shutdown in progress, stopping services in proper orderStopped: All services stopped, ready for reconfiguration or disposalErrored: Error state due to configuration, loading, or runtime failuresDestroyed: Resources cleaned up, host ready for termination
Lifecycle Management: Configuration Phase:
- Assembly Validation: Validates assembly paths and component types
- Message Publisher Resolution: Determines if component requires message publisher
- Settings Injection: Creates component and message publisher settings from configuration
- Service Registration: Registers all services in dependency injection container
- Instance Creation: Creates component and message publisher instances
Startup Sequence:
- Message Publisher First: Starts message publisher before component to ensure immediate publishing capability
- Component Activation: Starts component with cancellation token for graceful shutdown support
- State Tracking: Monitors component and message publisher states independently
- Error Recovery: Implements rollback mechanisms for failed startup attempts
Shutdown Orchestration:
- Component First: Stops component first to complete any pending work
- Message Publisher Last: Stops message publisher after component to allow final message publishing
- Resource Cleanup: Disposes all resources and clears service provider
- State Reset: Resets host state for potential reconfiguration
REST API Endpoints:
Component Host Management:
GET /api/componentHost/configuration- Get detailed host configuration and runtime statePOST /api/componentHost/shutdown- Gracefully shutdown the component host with cleanup
Package Management:
POST /api/componentHost/packages- Upload a new component package with validationPUT /api/componentHost/packages- Update an existing component package with version controlGET /api/componentHost/packages/configuration- Get configuration for all installed packagesGET /api/componentHost/packages/{packageName}/configuration- Get configuration for a specific package
Service Management:
PUT /api/componentHost/services/start- Start all configured services (component and message publisher)PUT /api/componentHost/services/stop- Stop all running services with proper sequencingPUT /api/componentHost/services/component/{packageName}/init- Initialize a component service with configurationPUT /api/componentHost/services/component/{packageName}/start- Start a specific component servicePUT /api/componentHost/services/component/{packageName}/stop- Stop a specific component servicePUT /api/componentHost/services/messagePublisher/{packageName}/init- Initialize a message publisher servicePUT /api/componentHost/services/messagePublisher/{packageName}/start- Start a message publisher servicePUT /api/componentHost/services/messagePublisher/{packageName}/stop- Stop a message publisher service
Extensible Architecture Pattern:
The Component Host demonstrates an experimental extensible architecture where complex orchestration components like the Node Manager are implementations of the same IComponent interface. This creates a recursive architecture where:
Uniform Interface Implementation:
public sealed class NodeManagerComponent : IComponent
{
public IComponentSettings Settings { get; }
public string Name => Settings.Name;
public Guid InstanceId { get; } = Guid.NewGuid();
public ObjectState State { get; }
public async Task StartAsync(CancellationToken cancellationToken) { /* Cluster orchestration logic */ }
public async Task StopAsync(CancellationToken cancellationToken) { /* Graceful cluster shutdown */ }
}Recursive Component Hosting:
- The Node Manager runs as a component within a Component Host
- File Watchers run as components within Component Hosts
- Schedulers run as components within Component Hosts
- Web APIs run as components within Component Hosts
- Custom Business Logic runs as components within Component Hosts
Configuration-Driven Deployment: Every component, regardless of complexity, uses the same deployment model:
{
"packageName": "NodeManager",
"assemblyPath": "/path/to/TDIE.Components.NodeManager.dll",
"fullyQualifiedClassName": "TDIE.Components.NodeManager.NodeManagerComponent",
"settings": {
"masterConfigurationFile": "clusterSettings.json",
"syncInterval": "30000"
}
}Unified Lifecycle Management:
- All components use the same
IHostedServicelifecycle pattern - Dependency injection works identically for simple and complex components
- Error handling, logging, and monitoring are consistent across all component types
- State management follows the same patterns regardless of component complexity
Runtime Component Composition: The Component Host enables runtime composition:
Dynamic Service Discovery:
- Components can be loaded from any assembly path
- Message publishers are automatically detected and configured
- Dependencies are resolved through the unified DI container
Hot Deployment:
- New components can be deployed without system restart
- Existing components can be stopped, updated, and restarted
- Package versioning supports updates with rollback capabilities
Isolation and Safety:
- Each component instance runs in its own process
- Configuration errors are isolated to individual components
- Component failures don't affect other components or the broader system
Features: Message Publisher Integration:
- Automatic detection of components that require message publishers
- Dynamic loading and configuration of publisher implementations
- Proper startup/shutdown sequencing for reliable message handling
Configuration Flexibility:
- Key-value pair configuration system works for any component type
- Assembly paths and class names are resolved at runtime
- Settings are injected automatically through dependency injection
Monitoring:
- Host state tracking with detailed error information
- Component and message publisher state monitoring
- Correlation IDs for distributed logging and troubleshooting
This extensible design means that adding new functionality to TDIE requires only:
- Implementing the
IComponentinterface - Packaging the assembly with a manifest
- Deploying through the standard package management system
The Node Manager orchestrating components across nodes is architecturally similar to a simple file watcher component - both are implementations of IComponent running within Component Hosts.
3. Node API (TDIE.NodeApi)
The Node API is a RESTful service that runs on each node in the cluster, acting as the local agent for the Node Manager. It provides an interface for managing the node's lifecycle, deploying component packages, and monitoring local processes.
Responsibilities:
- Process Management: Starting, stopping, and monitoring component host processes
- Package Management: Receiving, installing, and updating component packages
- Health and Status Reporting: Providing health status, performance metrics, and operational logs
- Local Resource Access: Exposing controlled access to local node resources
REST API Endpoints:
Node Management:
GET /api/node/stats- Get node system statistics including environment info, process details, and network interfacesGET /api/node/stats/processes- Get statistics for all running processes on the nodeGET /api/node/stats/processes/{packageName}- Get process statistics filtered by package name
Package Management:
POST /api/node/packages- Upload a new component package to the nodePUT /api/node/packages- Update an existing component packageGET /api/node/packages/configuration- Get configuration for all installed packagesGET /api/node/packages/{packageName}/configuration- Get configuration for a specific package
Process Management:
GET /api/node/processes- List all running component host processesGET /api/node/processes/{packageName}- List processes for a specific packageGET /api/node/processes/{packageName}/{processId}- Get details for a specific process instanceGET /api/node/processes/{packageName}/{settingsId}- Get processes by settings configuration IDPUT /api/node/processes/{packageName}/start- Start a new instance of a component packagePUT /api/node/processes/{packageName}/{processId}/kill- Terminate a specific processPUT /api/node/processes/kill- Terminate all processes on the nodePUT /api/node/processes/{packageName}/kill- Terminate all processes for a specific package
The Node API abstracts node-specific operations, allowing the Node Manager to orchestrate the cluster without needing to know the underlying details of each machine. It maintains a local process store using LiteDB for tracking component instances and their states.
4. Package Management (TDIE.PackageManager.Basic)
The Package Management system provides capabilities for deploying, versioning, and managing component packages across the distributed cluster. It handles the lifecycle of component packages from upload through deployment and cleanup.
Responsibilities:
- Package Deployment: Handles upload, extraction, and installation of component packages
- Version Management: Manages package versioning with support for upgrades and rollbacks
- Package Validation: Validates package structure, metadata, and dependencies before deployment
- Storage Management: Manages package storage, archiving, and cleanup operations
- Configuration Management: Handles package-specific configuration and metadata
Package Structure: Component packages are ZIP files containing:
package.zip
├── manifest.json # Package metadata and configuration
├── content/ # Package content root
│ ├── MyComponent.dll # Component assembly
│ ├── dependencies/ # Component dependencies
│ └── config/ # Configuration files
└── docs/ # Documentation (optional)
Package Manifest (manifest.json):
{
"packageName": "MyFileProcessor",
"packageVersion": "1.2.0",
"description": "Advanced file processing component",
"contentRoot": "content",
"extensionProperties": {
"AssemblyName": "MyComponent.dll",
"AssemblyExtension": ".dll",
"ComponentType": "MyNamespace.MyComponent",
"Dependencies": ["Newtonsoft.Json >= 12.0.0"]
}
}Package Management Operations:
Import/Upload Process:
- Package Reception: Receives ZIP package via REST API
- Validation: Validates ZIP structure and manifest.json content
- Extraction: Extracts package contents to designated directory
- Metadata Storage: Stores package metadata in LiteDB database
- Archival: Archives previous versions if updating existing package
Package Storage Structure:
/packages/
├── MyFileProcessor/ # Package name directory
│ ├── manifest.json # Current package manifest
│ ├── content/ # Package content
│ └── archive/ # Previous versions
└── database.db # LiteDB metadata store
REST API Operations: The package manager exposes operations through both Node API and Component Host API:
Core Operations:
ImportPackageAsync(Stream)- Import new package from ZIP streamUpdatePackageAsync(Stream)- Update existing packageDeletePackageAsync(string)- Remove package and cleanup storageGetPackageConfigurationAsync(string)- Retrieve package configurationGetAllPackageConfigurationsAsync()- List all installed packages
Storage Operations:
GetPackagePathAsync(string)- Get physical path to packageGetPackageContentRootAsync(string)- Get path to package content root
Package Lifecycle Management:
- Upload: Package uploaded via REST API as multipart form data
- Validation: Package structure and manifest validated
- Staging: Package temporarily extracted for validation
- Installation: Package moved to permanent location
- Registration: Package metadata stored in database
- Deployment: Package available for component instantiation
- Updates: New versions replace old versions with archival support
- Cleanup: Old packages archived or removed based on retention policies
Error Handling and Recovery:
- Validation Failures: Invalid packages are rejected with detailed error messages
- Partial Installations: Failed installations are rolled back automatically
- Storage Issues: Disk space and permission issues are handled gracefully
- Corruption Detection: Package integrity is verified during operations
Configuration Management: Package manager behavior is configured through application settings:
{
"Configuration": {
"PackageManager": {
"Directory": "./packages",
"ConfigurationFileName": "manifest.json",
"StorePath": "./packages/database.db",
"RetentionPolicy": "KeepLast5Versions"
}
}
}Core interface that all business logic components must implement:
public interface IComponent : IIntegrationExtension
{
IComponentSettings Settings { get; }
}Interface for implementing custom message publishers:
public interface IMessagePublisher : IIntegrationExtension
{
IMessagePublisherSettings Settings { get; }
Task PublishAsync(IMessage message);
}Base interface providing lifecycle management:
public interface IIntegrationExtension : IHostedService, IDisposable
{
string Name { get; }
Guid InstanceId { get; }
ObjectState State { get; }
}Components are the building blocks of TDIE, encapsulating specific business logic. Creating a custom component involves implementing the IComponent interface and understanding the component lifecycle.
The TDIE Component Host is responsible for instantiating and managing your component. It uses dependency injection to provide essential services:
IComponentSettings: Contains component-specific configuration, such as connection strings or file paths. Properties are defined in the package manifest and can be customized per instance.IMessagePublisher: The gateway for sending messages or results to other components or external systems. The actual publisher implementation is determined by the runtime configuration.ILogger: A standard logger for writing diagnostic messages.
The component lifecycle is managed through the IHostedService interface:
StartAsync(CancellationToken): Called by the host to begin component execution. This is where you establish connections, start timers, or begin listening for events.StopAsync(CancellationToken): Called to gracefully shut down the component. Use this method to release resources and complete any pending work.Dispose(): Called for final cleanup of unmanaged resources.
Components also maintain an ObjectState property that tracks their current status (Starting, Started, Stopping, Stopped, Error).
Components receive configuration through the IComponentSettings.Properties dictionary. This allows for flexible, key-value based configuration:
// Reading configuration values
var inputPath = Settings.Properties["inputPath"];
var batchSize = int.Parse(Settings.Properties["batchSize"]);
var connectionString = Settings.Properties["connectionString"];
var enableRetry = bool.Parse(Settings.Properties["enableRetry"]);Components can communicate with other components or external systems through the IMessagePublisher interface:
// Publishing a message
await _publisher.PublishAsync(new Message
{
Source = Name,
Content = "Processing completed successfully",
Timestamp = DateTime.UtcNow
});Here's an example of a file processing component that demonstrates typical patterns:
public class FileProcessorComponent : IComponent
{
private readonly ILogger<FileProcessorComponent> _logger;
private readonly IMessagePublisher _publisher;
private FileSystemWatcher _watcher;
private Timer _heartbeatTimer;
private readonly string _inputPath;
private readonly string _fileFilter;
private readonly int _maxRetries;
private readonly TimeSpan _retryDelay;
public FileProcessorComponent(
IComponentSettings settings,
IMessagePublisher publisher,
ILogger<FileProcessorComponent> logger)
{
Settings = settings;
_publisher = publisher;
_logger = logger;
InstanceId = Guid.NewGuid();
// Parse configuration
_inputPath = Settings.Properties["inputPath"];
_fileFilter = Settings.Properties.GetValueOrDefault("filter", "*.*");
_maxRetries = int.Parse(Settings.Properties.GetValueOrDefault("maxRetries", "3"));
_retryDelay = TimeSpan.FromSeconds(int.Parse(Settings.Properties.GetValueOrDefault("retryDelaySeconds", "30")));
}
public IComponentSettings Settings { get; }
public string Name => Settings.Name;
public Guid InstanceId { get; }
public ObjectState State { get; private set; }
public async Task StartAsync(CancellationToken cancellationToken)
{
State = ObjectState.Starting;
_logger.LogInformation("Starting File Processor Component {InstanceId}. Watching: {Path}",
InstanceId, _inputPath);
try
{
// Validate configuration
if (!Directory.Exists(_inputPath))
{
throw new DirectoryNotFoundException($"Input directory not found: {_inputPath}");
}
// Set up file system watcher
_watcher = new FileSystemWatcher(_inputPath, _fileFilter)
{
NotifyFilter = NotifyFilters.CreationTime | NotifyFilters.LastWrite,
EnableRaisingEvents = false
};
_watcher.Created += OnFileCreated;
_watcher.Error += OnWatcherError;
// Set up heartbeat timer
_heartbeatTimer = new Timer(SendHeartbeat, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
// Start watching
_watcher.EnableRaisingEvents = true;
State = ObjectState.Started;
_logger.LogInformation("File Processor Component {InstanceId} started successfully", InstanceId);
// Send startup notification
await _publisher.PublishAsync(new Message
{
Source = Name,
Content = $"Component {InstanceId} started watching {_inputPath}",
MessageType = "ComponentStarted"
});
}
catch (Exception ex)
{
State = ObjectState.Error;
_logger.LogError(ex, "Failed to start File Processor Component {InstanceId}", InstanceId);
throw;
}
}
private async void OnFileCreated(object sender, FileSystemEventArgs e)
{
_logger.LogDebug("File detected: {File}", e.FullPath);
var retryCount = 0;
while (retryCount <= _maxRetries)
{
try
{
// Wait for file to be completely written
await WaitForFileCompletion(e.FullPath);
// Process the file
var content = await File.ReadAllTextAsync(e.FullPath);
var processedData = await ProcessFileContent(content, e.Name);
_logger.LogInformation("Successfully processed file {File}. Content length: {Length}",
e.Name, content.Length);
// Publish success message
await _publisher.PublishAsync(new Message
{
Source = Name,
Content = processedData,
MessageType = "FileProcessed",
Properties = new Dictionary<string, string>
{
{"FileName", e.Name},
{"FilePath", e.FullPath},
{"ProcessedAt", DateTime.UtcNow.ToString("O")},
{"InstanceId", InstanceId.ToString()}
}
});
// Archive or delete the processed file
await ArchiveFile(e.FullPath);
break;
}
catch (Exception ex)
{
retryCount++;
_logger.LogWarning(ex, "Failed to process file {File}. Retry {Retry}/{MaxRetries}",
e.FullPath, retryCount, _maxRetries);
if (retryCount > _maxRetries)
{
_logger.LogError("Max retries exceeded for file {File}. Moving to error folder.", e.FullPath);
await MoveToErrorFolder(e.FullPath);
// Publish error message
await _publisher.PublishAsync(new Message
{
Source = Name,
Content = $"Failed to process file {e.Name} after {_maxRetries} retries",
MessageType = "FileProcessingError",
Properties = new Dictionary<string, string>
{
{"FileName", e.Name},
{"FilePath", e.FullPath},
{"Error", ex.Message},
{"InstanceId", InstanceId.ToString()}
}
});
}
else
{
await Task.Delay(_retryDelay);
}
}
}
}
private async Task<string> ProcessFileContent(string content, string fileName)
{
// Implement your business logic here
// This is just an example
var lines = content.Split('\n');
var processedLines = lines.Select(line => line.Trim().ToUpperInvariant()).ToList();
return string.Join('\n', processedLines);
}
private async Task WaitForFileCompletion(string filePath)
{
// Wait until file is no longer being written to
var maxWait = TimeSpan.FromSeconds(30);
var start = DateTime.UtcNow;
while (DateTime.UtcNow - start < maxWait)
{
try
{
using var stream = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.None);
break; // File is not locked, ready to process
}
catch (IOException)
{
await Task.Delay(100);
}
}
}
private async Task ArchiveFile(string filePath)
{
var archivePath = Settings.Properties.GetValueOrDefault("archivePath", Path.Combine(Path.GetDirectoryName(filePath), "archive"));
Directory.CreateDirectory(archivePath);
var destinationPath = Path.Combine(archivePath, Path.GetFileName(filePath));
File.Move(filePath, destinationPath);
}
private async Task MoveToErrorFolder(string filePath)
{
var errorPath = Settings.Properties.GetValueOrDefault("errorPath", Path.Combine(Path.GetDirectoryName(filePath), "error"));
Directory.CreateDirectory(errorPath);
var destinationPath = Path.Combine(errorPath, Path.GetFileName(filePath));
File.Move(filePath, destinationPath);
}
private void OnWatcherError(object sender, ErrorEventArgs e)
{
_logger.LogError(e.GetException(), "File system watcher error in component {InstanceId}", InstanceId);
State = ObjectState.Error;
}
private async void SendHeartbeat(object state)
{
try
{
await _publisher.PublishAsync(new Message
{
Source = Name,
Content = $"Component {InstanceId} heartbeat",
MessageType = "Heartbeat",
Properties = new Dictionary<string, string>
{
{"InstanceId", InstanceId.ToString()},
{"State", State.ToString()},
{"Timestamp", DateTime.UtcNow.ToString("O")}
}
});
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to send heartbeat for component {InstanceId}", InstanceId);
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
State = ObjectState.Stopping;
_logger.LogInformation("Stopping File Processor Component {InstanceId}", InstanceId);
try
{
// Stop file watching
if (_watcher != null)
{
_watcher.EnableRaisingEvents = false;
_watcher.Created -= OnFileCreated;
_watcher.Error -= OnWatcherError;
}
// Stop heartbeat timer
_heartbeatTimer?.Dispose();
// Send shutdown notification
await _publisher.PublishAsync(new Message
{
Source = Name,
Content = $"Component {InstanceId} shutting down",
MessageType = "ComponentStopped"
});
State = ObjectState.Stopped;
_logger.LogInformation("File Processor Component {InstanceId} stopped successfully", InstanceId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error stopping File Processor Component {InstanceId}", InstanceId);
State = ObjectState.Error;
}
}
public void Dispose()
{
_watcher?.Dispose();
_heartbeatTimer?.Dispose();
_logger.LogDebug("File Processor Component {InstanceId} disposed", InstanceId);
}
}A component package is a ZIP file containing:
- Component assembly (DLL)
- Dependencies (if not available in the runtime)
manifest.jsonfile defining component metadata
Example manifest.json:
{
"name": "FileProcessor",
"version": "1.2.0",
"description": "Processes files from a designated directory",
"componentType": "TDIE.Components.FileProcessor.FileProcessorComponent",
"dependencies": [
"Newtonsoft.Json >= 12.0.0"
],
"defaultConfiguration": {
"inputPath": "C:\\input",
"filter": "*.*",
"maxRetries": "3",
"retryDelaySeconds": "30",
"archivePath": "C:\\archive",
"errorPath": "C:\\error"
}
}- Package Creation: Create a ZIP file with your component assembly and manifest
- Upload: Use the Node Manager API to upload the package to the cluster
- Distribution: The Node Manager distributes the package to target nodes
- Instantiation: Create component instances with specific configurations
- Monitoring: Monitor component health and performance through the Node API
1. File Watcher Component (TDIE.Components.FileWatcher)
Monitors file system directories for new files and publishes events when files are detected.
Configuration:
{
"path": "C:\\input\\directory",
"filter": "*.xml",
"bufferSize": "8192"
}2. Quartz Scheduler Component (TDIE.Components.QuartzScheduler)
Provides cron-based scheduling capabilities using Quartz.NET.
Configuration:
{
"cronSchedule": "0 */5 * * * ?",
"isReentrant": "false"
}3. Web API Component (TDIE.Components.WebApi)
Creates REST endpoints for receiving external requests.
4. Database Replication Component (TDIE.Components.DatabaseReplication)
Template for database synchronization operations.
Basic Publisher (TDIE.Publishers.Basic)
Simple implementation for testing and development that logs messages and can perform basic file operations.
TDIE operates using a thought exercise on distributed orchestration with timer-based synchronization, distributed locking, and automated health monitoring. The following detailed flows demonstrate how the platform is intended to handle distributed processing scenarios in practice.
Process Description:
The Node Manager acts as the cluster orchestrator, running a timer-based synchronization process (default: every 50 seconds). This process, managed by NodeManagerComponent, maintains cluster consistency, handles dynamic scaling, and ensures operational integrity.
Detailed Flow:
- Configuration Load: The
NodeManagerComponentstarts and loads its configuration fromclusterSettings.json, which defines the cluster topology and required packages. - Initial Discovery: It discovers and validates all configured nodes via HTTP health checks to their respective Node APIs.
- Cluster Manager Initialization: A
ClusterManagerinstance is created, which immediately populates its view of the cluster's state by querying each node for its running component instances. - Timer Activation: A recurring timer is started to periodically trigger the
SyncCluster()method. - Continuous Synchronization: On each timer tick,
SyncCluster()performs these actions:- It re-reads the node configuration to detect any changes.
- It compares the configured nodes with the currently active nodes to identify new additions or removals.
- It triggers cluster expansion or shrinkage as needed.
- It directs the
NodeSynchronizerto verify and update packages on each node.
Sample Scenario - Adding a New Node:
- Input: A new node, "tdie-node-4", is added to the
nodeServersarray inclusterSettings.json. - Process Flow:
- The Node Manager's timer triggers the
SyncCluster()method. - The system detects "tdie-node-4" as a new, unmanaged node.
- The
ClusterManagerinvokesExpandClusterAsync()for the new node. - A distributed lock is acquired for "tdie-node-4" to prevent concurrent modifications.
- The
NodeSynchronizerconnects to the new node's API and ensures all required packages (like the Component Host) are uploaded and up-to-date. - The
ClusterManagerthen starts the necessary component instances on the new node, updating its internal state map.
- The Node Manager's timer triggers the
- Output: "tdie-node-4" becomes an active member of the cluster, sharing the workload by running its own set of component instances.
Process Description: Package deployment is a coordinated process managed by the Node Manager, involving multi-stage validation, cluster-wide distribution, and controlled updates with rollback capabilities.
Detailed Flow:
- Upload: A component package (ZIP file) is uploaded to the Node Manager's API.
- Validation: The package is validated for correct structure, a valid
manifest.json, and declared dependencies. - Distribution: The
NodeSynchronizerdistributes the package to all nodes in the cluster by calling each node's/api/node/packagesendpoint. - Graceful Shutdown (for Updates): If updating an existing package, the
NodeSynchronizerfirst identifies all running instances of that package and gracefully shuts them down via their Component Host APIs. - Installation: Each Node API receives the package, extracts it, and stores it locally, using LiteDB to track metadata.
- Restart: The Node Manager then commands the Component Hosts to restart the component instances using the new package version.
Sample Scenario - Deploying a File Processor Component:
- Input: A
fileProcessor.zippackage is uploaded via a REST client. - Package Contents:
manifest.json(defining metadata, the main class, and default configuration)FileProcessor.dll(the component assembly)dependencies/(any required libraries)
- Process Flow:
- The Node Manager validates the package.
- It iterates through each registered node and calls the
POST /api/node/packagesendpoint on each, sending the ZIP file. - Each node's
PackageManagersaves the package. - The Node Manager, during its next sync, identifies that this component should be running and issues commands to the Component Hosts to instantiate it.
- Output:
FileProcessorcomponents are now running across the cluster, configured and ready to process files.
Process Description:
Component instances follow a structured lifecycle managed by the ComponentHostBackgroundService. This service provides process isolation, dependency injection, and state management.
Detailed Flow:
- Placement Decision: The
ClusterManagerdecides which node should run a new component instance. - Host Process Launch: It calls the target Node API's
/api/node/processes/{packageName}/startendpoint. The Node API launches a newTDIE.ComponentHostprocess, assigning it a dynamic port. - Initialization: The
ClusterManagercommunicates with the new Component Host's API to initialize the services.- It first calls
InitializeMessagePublisherServiceAsyncif the component requires a message publisher. - It then calls
InitializeComponentServiceAsync, passing the component's configuration.
- It first calls
- DI and Instantiation: The Component Host uses reflection to load the component's assembly, sets up a dependency injection container, and injects required services (
ILogger,IComponentSettings,IMessagePublisher). - Service Start: The
ClusterManagercallsStartHostServicesAsyncon the Component Host, which starts the message publisher first, followed by the component itself. - State Management: The Component Host monitors the component's state and reports errors or status changes.
Sample Scenario - File Watcher Component Startup:
- Input Configuration:
{ "packageName": "FileWatcher", "settings": { "inputPath": "C:\\data\\input", "filter": "*.xml" } } - Process Flow:
ClusterManager.StartInstanceOnNode()is executed for a target node.- The Node API on that machine starts a
ComponentHost.exeprocess on an available port (e.g., 5001). - The
ClusterManagersends the configuration tohttp://node-x:5001. - The Component Host loads the
FileWatcher.dllassembly. - It creates a DI container, providing an
IComponentSettingsobject populated with theinputPathandfilter. - The
FileWatcherComponent's constructor receives the injected dependencies. - Its
StartAsync()method is called, which initializes aFileSystemWatcherto monitorC:\data\inputfor XML files.
- Output: An active
FileWatcherComponentinstance is now monitoring the specified directory, ready to process files.
Process Description:
Components are designed to be loosely coupled and communicate through a message-driven pattern. A component processes an event and then uses an IMessagePublisher to send the result onward.
Detailed Flow:
- Trigger: A component is triggered by an external event (e.g., a file is created, a Quartz.NET timer fires, or a Web API endpoint is called).
- Processing: The component executes its business logic.
- Message Creation: It creates an
IMessageobject, populating it with a source identifier, a timestamp, and a dictionary of properties containing the processed data or event details. - Publishing: It calls
_publisher.PublishAsync(message), handing the message off to its injected message publisher. - Routing: The message publisher (e.g.,
TestPublisher) processes the message. In a more advanced implementation, this could involve routing to a message bus like RabbitMQ or Azure Service Bus, where other components could be subscribed to listen for these messages.
Sample Scenario - File Processing Workflow:
- Input: A new file,
customer_data.xml, appears in the directory monitored by aFileWatcherComponent. - Flow Sequence:
- The
FileWatcherComponent'sOnFileCreatedevent handler is triggered. - It creates a
FileWatcherMessagewith properties likefileName,filePath, andfileSize. - It calls
_publisher.PublishAsync(fileMessage). - The message publisher receives the message and, based on its internal logic, might route it to a
DataProcessorcomponent. - The
DataProcessorcomponent receives the message, reads the file from the specified path, transforms the data, and creates a newProcessedMessage. - It publishes this new message, which could then be consumed by a
DatabaseWritercomponent to save the results.
- The
- Output: The file is processed through a multi-stage, decoupled workflow.
Process Description: TDIE supports horizontal scaling through the dynamic addition or removal of nodes, with the Node Manager automatically redistributing workloads.
Detailed Flow:
- Detection: The Node Manager's
SyncClustermethod detects a change in the cluster's topology (a node is added or removed from configuration). - Expansion: For a new node, the flow described in "Cluster Initialization" is followed to synchronize packages and start new component instances, thus increasing overall capacity.
- Shrinkage: For a removed node, the
ClusterManagercallsShrinkClusterAsync. This method connects to the target node's API and gracefully shuts down all running Component Host processes. The workloads previously handled by this node are then automatically picked up by instances on the remaining nodes. - Locking: All scaling operations are protected by distributed locks to ensure consistency and prevent race conditions.
Sample Scenario - Auto-scaling During High Load:
- Current State: A 2-node cluster is running 4
FileWatcherinstances on each node. - Trigger: An administrator adds
tdie-node-3to the configuration to handle a higher volume of incoming files. - Process Flow:
SyncCluster()detects the new node.- It acquires a distributed lock for the cluster modification.
- It calls
_nodeSynchronizer.SynchronizeNodeAsync(node-3)to upload all required packages. - It then starts 4 new
FileWatcherinstances ontdie-node-3.
- Output: The cluster now has 12
FileWatcherinstances running across 3 nodes, increasing processing capacity by 50%.
As part of exploring different approaches to workload distribution, the codebase includes an implementation of the Shannon Diversity Index calculation in StatsExtensions.cs. This was a thought exercise in applying ecological diversity metrics to distributed systems.
The Shannon Diversity Index measures species diversity in ecosystems. The thought experiment was: what if we treated component types as "species" and nodes as "habitats"?
Implementation:
public static double Evenness(this IEnumerable<ComponentHostInstanceSettings> components)
{
var sampleSize = components.Count();
var componentTypeGroups = components.GroupBy(x => x.PackageName).ToDictionary(x => x.Key, x => x.ToArray());
var shannonDiversityIndex = componentTypeGroups
.Select(x => (x.Key, (x.Value.Length / (double)sampleSize)))
.Select(x => x.Item2 * Math.Log(x.Item2))
.Sum() * -1;
double systemEvenness = shannonDiversityIndex / Math.Log(componentTypeGroups.Count);
return systemEvenness;
}- Shannon Diversity Index (H'): Measures how diverse the component types are on a node
- Evenness: A value from 0 to 1, where 1 means all component types are equally distributed
The idea was that nodes with higher evenness might:
- Be more resilient to component-specific failures
- Have better resource utilization if different components have different resource profiles
- Reduce the impact of cascading failures
This remains an unexplored concept in the codebase. The calculation is implemented but not integrated into any scheduling or distribution logic. It represents one of several thought experiments about alternative approaches to distributed system management.
Process Description: System reliability is maintained through continuous health monitoring at both the node and cluster levels, with mechanisms for automatic failure detection and workload redistribution.
Detailed Flow:
- Node-Level Monitoring: Each Node API runs a
ProcessStoreSystemSyncBackgroundService, which periodically checks for "zombie" processes—processes that are in its database but no longer running on the OS—and cleans them up. - Cluster-Level Monitoring: The Node Manager periodically calls the
/api/node/statsendpoint on each node to check its health. - Failure Detection: If a node is unreachable (e.g., HTTP requests time out), the Node Manager marks it as failed.
- Workload Redistribution: During the next
SyncClusterrun, the failed node is treated as a "removed" node. TheClusterManageridentifies the components that were running on it and starts replacement instances on the remaining healthy nodes.
Sample Scenario - Node Failure Recovery:
- Initial State: A 3-node cluster, with each node running two
QuartzSchedulercomponents. - Failure Event:
tdie-node-2becomes unreachable due to a hardware failure. - Recovery Flow:
- The Node Manager's sync cycle fails to get a health status from
tdie-node-2. - It marks
tdie-node-2as inactive. - It identifies the two
QuartzSchedulerinstances that were assigned to the failed node. - It acquires distributed locks and starts two new replacement instances, placing one on
tdie-node-1and the other ontdie-node-3.
- The Node Manager's sync cycle fails to get a health status from
- Output: Fault tolerance is maintained. All scheduled jobs continue to run without interruption on the remaining healthy nodes.
REST APIs available at multiple levels:
- Cluster Management: Node Manager APIs for cluster operations
- Node Management: Node APIs for local operations
- Component Control: Component Host APIs for instance management
src/
├── TDIE.ComponentHost/ # Component runtime host
├── TDIE.ComponentHost.Core/ # Core interfaces and contracts
├── TDIE.ComponentHost.WebApi/ # Component Host REST API
├── TDIE.Components.DatabaseReplication/ # Database sync component
├── TDIE.Components.FileWatcher/ # File monitoring component
├── TDIE.Components.NodeManager/ # Cluster orchestration
├── TDIE.Components.QuartzScheduler/ # Cron scheduling component
├── TDIE.Components.WebApi/ # HTTP endpoint component
├── TDIE.Core/ # Core framework interfaces
├── TDIE.Extensions.Logging/ # Logging extensions
├── TDIE.NodeApi/ # Node management API
├── TDIE.PackageManager.Basic/ # Package management
├── TDIE.PackageManager.Core/ # Package management interfaces
├── TDIE.Publishers.Basic/ # Basic message publisher
├── TDIE.Server/ # Server engine
├── TDIE.Tester/ # Testing utilities
Documentation Note: This README was generated with assistance from Claude (Anthropic), an AI assistant, to help structure and document the TDIE experimental distributed integration platform.
— Claude