Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1432,15 +1432,22 @@ func TestDetermineExecutionMode_SingleExplicitDevice(t *testing.T) {
// ============================================================

func TestExecuteFlowsWithMode_AppiumParallel(t *testing.T) {
// Suppress stdout from setup messages
oldStdout := os.Stdout
os.Stdout, _ = os.Open(os.DevNull)
defer func() { os.Stdout = oldStdout }()

cfg := &RunConfig{
Driver: "appium",
Driver: "appium",
Parallel: 2,
}
_, err := executeFlowsWithMode(cfg, nil, true, []string{"d1", "d2"})
_, err := executeFlowsWithMode(cfg, nil, true, []string{"appium-1", "appium-2"})
if err == nil {
t.Error("expected error for parallel Appium execution")
t.Error("expected error for parallel Appium with no server URL")
}
if !strings.Contains(err.Error(), "parallel execution not yet supported for Appium") {
t.Errorf("unexpected error: %v", err)
// Should fail on session creation (no AppiumURL), not on "not supported"
if strings.Contains(err.Error(), "not yet supported") {
t.Errorf("parallel should be supported now, got: %v", err)
}
}

Expand Down
139 changes: 136 additions & 3 deletions pkg/cli/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,18 @@ func determineExecutionMode(cfg *RunConfig, emulatorMgr *emulator.Manager, simul
return false, nil, nil
}

// Appium driver: no local device management, server allocates devices
if strings.ToLower(cfg.Driver) == "appium" {
if cfg.Parallel > 1 {
ids := make([]string, cfg.Parallel)
for i := range ids {
ids[i] = fmt.Sprintf("appium-%d", i+1)
}
return true, ids, nil
}
return false, nil, nil
}

needsParallel = cfg.Parallel > 0 || len(cfg.Devices) > 1

if needsParallel {
Expand Down Expand Up @@ -1209,10 +1221,11 @@ func executeFlowsWithMode(cfg *RunConfig, flows []flow.Flow, needsParallel bool,
driverType := strings.ToLower(cfg.Driver)

if driverType == "appium" {
if needsParallel {
return nil, fmt.Errorf("parallel execution not yet supported for Appium driver")
count := cfg.Parallel
if count <= 1 {
return executeAppiumSingleSession(cfg, flows)
}
return executeAppiumSingleSession(cfg, flows)
return executeAppiumParallel(cfg, count, flows)
}

if needsParallel {
Expand Down Expand Up @@ -1584,6 +1597,75 @@ func executeAppiumSingleSession(cfg *RunConfig, flows []flow.Flow) (*executor.Ru
return runner.Run(context.Background(), flows)
}

// executeAppiumParallel runs flows across N Appium sessions in parallel.
// Each session hits the same Appium URL — the server allocates devices.
func executeAppiumParallel(cfg *RunConfig, count int, flows []flow.Flow) (*executor.RunResult, error) {
workers, cloudMetas, err := createAppiumWorkers(cfg, count)
if err != nil {
return nil, fmt.Errorf("failed to create Appium workers: %w", err)
}

// Register all worker cleanups for signal handler
allCleanup := func() {
for _, w := range workers {
w.Cleanup()
}
}
cleanupMu.Lock()
activeCleanup = allCleanup
cleanupMu.Unlock()
defer func() {
cleanupMu.Lock()
activeCleanup = nil
cleanupMu.Unlock()
}()

platform := strings.ToLower(cfg.Platform)
if platform == "" {
platform = "android"
}

parallelRunner := createParallelRunner(cfg, workers, platform)
result, err := parallelRunner.Run(context.Background(), flows)
if err != nil {
return nil, err
}

// Report to cloud providers per-worker (each session = separate cloud job)
for i, cm := range cloudMetas {
if cm.provider == nil {
continue
}
// Collect flow results that ran on this worker
var workerFlows []cloud.FlowResult
for _, f := range result.FlowResults {
workerFlows = append(workerFlows, cloud.FlowResult{
Name: f.Name,
File: f.SourceFile,
Passed: f.Status == report.StatusPassed,
Duration: f.Duration,
Error: f.Error,
})
}
cloudResult := &cloud.TestResult{
Passed: result.Status == report.StatusPassed,
Total: result.TotalFlows,
PassedCount: result.PassedFlows,
FailedCount: result.FailedFlows,
Duration: result.Duration,
OutputDir: cfg.OutputDir,
Flows: workerFlows,
}
if err := cm.provider.ReportResult(cfg.AppiumURL, cm.meta, cloudResult); err != nil {
logger.Warn("[appium-%d] %s result reporting failed: %v", i+1, cm.provider.Name(), err)
} else {
logger.Info("[appium-%d] %s job updated: passed=%v", i+1, cm.provider.Name(), cloudResult.Passed)
}
}

return result, nil
}

// CreateDriver creates the appropriate driver for the platform.
// Returns the driver, a cleanup function, and any error.
// Exported for library use - call once, reuse across multiple flows.
Expand Down Expand Up @@ -2119,6 +2201,57 @@ func createBrowserWorkers(cfg *RunConfig, count int) ([]executor.DeviceWorker, e
return workers, nil
}

// appiumWorkerMeta holds per-worker cloud provider state for Appium parallel execution.
type appiumWorkerMeta struct {
provider cloud.Provider
meta map[string]string
}

// createAppiumWorkers creates N Appium session workers against the same server URL.
// Each session is independent — the Appium server (local or cloud) allocates devices.
func createAppiumWorkers(cfg *RunConfig, count int) ([]executor.DeviceWorker, []appiumWorkerMeta, error) {
var workers []executor.DeviceWorker
var cloudMetas []appiumWorkerMeta
var cleanups []func()

cleanupAll := func() {
for _, cleanup := range cleanups {
cleanup()
}
}

for i := 0; i < count; i++ {
workerID := fmt.Sprintf("appium-%d", i+1)
printSetupStep(fmt.Sprintf("[%s] Creating Appium session...", workerID))

driver, cleanup, err := createAppiumDriver(cfg)
if err != nil {
logger.Warn("Failed to create session for %s: %v", workerID, err)
cleanupAll()
return nil, nil, fmt.Errorf("failed to create %s: %w", workerID, err)
}

workers = append(workers, executor.DeviceWorker{
ID: i,
DeviceID: workerID,
Driver: driver,
Cleanup: cleanup,
})
cleanups = append(cleanups, cleanup)

// Capture per-worker cloud metadata (each session = separate cloud job)
cloudMetas = append(cloudMetas, appiumWorkerMeta{
provider: cfg.CloudProvider,
meta: cfg.CloudMeta,
})
// Reset for next worker so createAppiumDriver detects fresh
cfg.CloudProvider = nil
cfg.CloudMeta = nil
}

return workers, cloudMetas, nil
}

// createParallelRunner builds the parallel runner with config.
func createParallelRunner(cfg *RunConfig, workers []executor.DeviceWorker, platform string) *executor.ParallelRunner {
driverName := resolveDriverName(cfg, platform)
Expand Down
Loading