diff --git a/docs/queue-priority-strategy.md b/docs/queue-priority-strategy.md new file mode 100644 index 00000000..1a9d0c0c --- /dev/null +++ b/docs/queue-priority-strategy.md @@ -0,0 +1,55 @@ +# Request Queue Priority Strategy + +## Overview + +The `RequestQueue` service (`src/services/api/requestQueue.ts`) manages offline requests that fail due to network errors. It persists requests to AsyncStorage, supports priority levels, and batches similar requests during sync. + +## Priority Levels + +| Priority | Value | Use Case | +|----------|-------|----------| +| critical | 0 | Auth tokens, payments, enrollments | +| high | 1 | Course progress, quiz submissions | +| normal | 2 | Profile updates, content likes | +| low | 3 | Analytics events, read-ahead fetches | + +The queue is sorted by priority then FIFO within each priority level. + +## Persistence + +- All queued requests are stored in AsyncStorage under `@teachlink_request_queue` +- Queue metrics are persisted under `@teachlink_queue_metrics` +- On app restart, `resume()` restores pending requests from storage + +## Batch Processing + +During sync, requests with the same method + endpoint are grouped: + +- **PUT/PATCH**: Payloads are merged into a single request +- **GET**: Executed in parallel via `Promise.allSettled` +- **POST/DELETE**: Processed individually within the batch + +## Analytics + +Queue events are tracked via `mobileAnalyticsService.trackEvent()`: +- `request_queued` — when a request enters the queue +- `request_dequeued` — when a request succeeds +- `queue_batch_synced` — when a batch merge succeeds +- `queue_resumed` — on app restart with pending requests + +## Usage + +```ts +import { requestQueue } from '../services/api/requestQueue'; + +// Add with priority +await requestQueue.addToQueue(config, 'high'); + +// Check status +const status = await requestQueue.getQueueStatus(); + +// Monitor from hook +import { usePendingRequests } from '../hooks/usePendingRequests'; +const count = usePendingRequests(); +const { pendingCount, byPriority } = usePendingRequests('critical'); +``` diff --git a/src/hooks/usePendingRequests.ts b/src/hooks/usePendingRequests.ts index 0f1000b8..cfa03982 100644 --- a/src/hooks/usePendingRequests.ts +++ b/src/hooks/usePendingRequests.ts @@ -1,21 +1,41 @@ import { useEffect, useState } from 'react'; -import requestQueue from '../services/api/requestQueue'; +import requestQueue, { RequestPriority } from '../services/api/requestQueue'; -/** - * Hook to get the number of pending offline requests - */ -export function usePendingRequests() { +export function usePendingRequests(priority?: RequestPriority) { const [pendingCount, setPendingCount] = useState(0); + const [byPriority, setByPriority] = useState< + Record + >({ + critical: 0, + high: 0, + normal: 0, + low: 0, + }); useEffect(() => { - // Get initial count - requestQueue.getPendingCount().then(setPendingCount); + const update = async () => { + if (priority) { + const counts = await requestQueue.getPendingByPriority(); + setByPriority(counts); + setPendingCount(counts[priority]); + } else { + const count = await requestQueue.getPendingCount(); + setPendingCount(count); + } + }; - // Listen for changes - const unsubscribe = requestQueue.onPendingCountChange(setPendingCount); + update(); + + const unsubscribe = requestQueue.onPendingCountChange(() => { + update(); + }); return unsubscribe; - }, []); + }, [priority]); + + if (priority) { + return { pendingCount, byPriority }; + } return pendingCount; -} \ No newline at end of file +} diff --git a/src/services/api/requestQueue.ts b/src/services/api/requestQueue.ts index 2ba7c4a5..85192e65 100644 --- a/src/services/api/requestQueue.ts +++ b/src/services/api/requestQueue.ts @@ -1,51 +1,112 @@ import AsyncStorage from '@react-native-async-storage/async-storage'; import { InternalAxiosRequestConfig } from 'axios'; import * as Network from 'expo-network'; +import { healthMetricsService } from '../healthMetrics'; +import { mobileAnalyticsService } from '../mobileAnalytics'; import logger from '../../utils/logger'; -// Queued request interface +export type RequestPriority = 'critical' | 'high' | 'normal' | 'low'; + export interface QueuedRequest { id: string; config: InternalAxiosRequestConfig; timestamp: number; retries: number; maxRetries: number; + priority: RequestPriority; + endpoint: string; + method: string; +} + +interface BatchGroup { + method: string; + endpoint: string; + requests: QueuedRequest[]; +} + +interface QueueMetrics { + totalQueued: number; + byPriority: Record; + byMethod: Record; + totalRetries: number; + lastSyncTimestamp: number | null; + batchesFormed: number; } const QUEUE_KEY = '@teachlink_request_queue'; +const QUEUE_METRICS_KEY = '@teachlink_request_queue_metrics'; +const MONITOR_INTERVAL_MS = 10000; +const PRIORITY_ORDER: Record = { + critical: 0, + high: 1, + normal: 2, + low: 3, +}; class RequestQueue { private readonly MAX_RETRIES = 3; private isProcessing = false; private listeners: ((count: number) => void)[] = []; private monitoringInterval: ReturnType | null = null; + private networkListener: (() => void) | null = null; private apiClient: any | null = null; + private metrics: QueueMetrics = { + totalQueued: 0, + byPriority: { critical: 0, high: 0, normal: 0, low: 0 }, + byMethod: {}, + totalRetries: 0, + lastSyncTimestamp: null, + batchesFormed: 0, + }; - /** - * Add a failed request to the queue - */ - async addToQueue(config: InternalAxiosRequestConfig): Promise { + async addToQueue( + config: InternalAxiosRequestConfig, + priority: RequestPriority = 'normal', + ): Promise { try { const queue = await this.getQueue(); + const method = (config.method ?? 'GET').toUpperCase(); + const endpoint = config.url ?? '/unknown'; + const queuedRequest: QueuedRequest = { id: this.generateId(), config, timestamp: Date.now(), retries: 0, maxRetries: this.MAX_RETRIES, + priority, + endpoint, + method, }; + queue.push(queuedRequest); + this.sortByPriority(queue); await AsyncStorage.setItem(QUEUE_KEY, JSON.stringify(queue)); - logger.info(`Added request to queue: ${config.method?.toUpperCase()} ${config.url}`); + + this.metrics.totalQueued++; + this.metrics.byPriority[priority]++; + this.metrics.byMethod[method] = (this.metrics.byMethod[method] ?? 0) + 1; + await this.persistMetrics(); + + logger.info( + `Added request to queue: [${priority}] ${method} ${endpoint}`, + ); this.notifyListeners(queue.length); + + mobileAnalyticsService.trackEvent('request_queued' as any, { + priority, + method, + endpoint, + queueSize: queue.length, + }); + + return queuedRequest.id; } catch (error) { logger.error('Error adding request to queue:', error); + return ''; } } - /** - * Get the current queue - */ async getQueue(): Promise { try { const data = await AsyncStorage.getItem(QUEUE_KEY); @@ -56,13 +117,24 @@ class RequestQueue { } } - /** - * Remove a request from the queue - */ async removeFromQueue(id: string): Promise { try { const queue = await this.getQueue(); - const filtered = queue.filter(req => req.id !== id); + const removed = queue.find((req) => req.id === id); + const filtered = queue.filter((req) => req.id !== id); + + if (removed) { + this.metrics.totalQueued = Math.max(0, this.metrics.totalQueued - 1); + this.metrics.byPriority[removed.priority] = Math.max( + 0, + this.metrics.byPriority[removed.priority] - 1, + ); + this.metrics.byMethod[removed.method] = Math.max( + 0, + (this.metrics.byMethod[removed.method] ?? 1) - 1, + ); + } + await AsyncStorage.setItem(QUEUE_KEY, JSON.stringify(filtered)); this.notifyListeners(filtered.length); } catch (error) { @@ -70,48 +142,59 @@ class RequestQueue { } } - /** - * Increment retry count - */ async incrementRetry(id: string): Promise { try { const queue = await this.getQueue(); - const request = queue.find(req => req.id === id); + const request = queue.find((req) => req.id === id); if (request) { request.retries += 1; + this.metrics.totalRetries++; await AsyncStorage.setItem(QUEUE_KEY, JSON.stringify(queue)); + await this.persistMetrics(); } } catch (error) { logger.error('Error incrementing retry:', error); } } - /** - * Process the queue when online - */ - async processQueue(apiClient: any): Promise { + async processQueue(apiClient?: any): Promise { if (this.isProcessing) return; + if (apiClient) { + this.apiClient = apiClient; + } + const isConnected = await this.checkConnectivity(); if (!isConnected) return; this.isProcessing = true; try { const queue = await this.getQueue(); - for (const request of queue) { - if (request.retries >= request.maxRetries) { - // Remove failed requests - await this.removeFromQueue(request.id); - continue; - } + if (queue.length === 0) return; - try { - await apiClient(request.config); - await this.removeFromQueue(request.id); - } catch (error) { - await this.incrementRetry(request.id); - } + const validRequests = queue.filter( + (req) => req.retries < req.maxRetries, + ); + const expiredRequests = queue.filter( + (req) => req.retries >= req.maxRetries, + ); + + for (const expired of expiredRequests) { + await this.removeFromQueue(expired.id); + logger.warn( + `Request ${expired.id} [${expired.priority}] ${expired.method} ${expired.endpoint} dropped after ${expired.maxRetries} retries`, + ); + } + + const batches = this.createBatches(validRequests); + this.metrics.batchesFormed += batches.length; + + for (const batch of batches) { + await this.processBatch(batch); } + + this.metrics.lastSyncTimestamp = Date.now(); + await this.persistMetrics(); } catch (error) { logger.error('Error processing queue:', error); } finally { @@ -119,9 +202,25 @@ class RequestQueue { } } - /** - * Start monitoring network and processing queue - */ + resume(): void { + logger.info('RequestQueue: Resuming queue from persisted storage'); + this.restoreMetrics(); + const pending = this.getQueue(); + pending + .then((q) => { + this.notifyListeners(q.length); + if (q.length > 0) { + logger.info( + `RequestQueue: ${q.length} pending requests restored from storage`, + ); + mobileAnalyticsService.trackEvent('queue_resumed' as any, { + pendingCount: q.length, + }); + } + }) + .catch(() => {}); + } + startMonitoring(apiClient?: any): void { if (apiClient) { this.apiClient = apiClient; @@ -137,57 +236,260 @@ class RequestQueue { return; } + this.resume(); + this.monitoringInterval = setInterval(async () => { await this.processQueue(this.apiClient!); - }, 10000); // Check every 10 seconds + }, MONITOR_INTERVAL_MS); - // Also process immediately if online void this.processQueue(this.apiClient); + this.listenForNetworkChanges(); } stopMonitoring(): void { if (this.monitoringInterval) { clearInterval(this.monitoringInterval); this.monitoringInterval = null; - logger.info('RequestQueue: Monitoring stopped due to memory pressure'); + logger.info('RequestQueue: Monitoring stopped'); + } + + if (this.networkListener) { + this.networkListener(); + this.networkListener = null; } } - /** - * Get pending requests count - */ async getPendingCount(): Promise { const queue = await this.getQueue(); return queue.length; } - /** - * Subscribe to pending count changes - */ + async getPendingByPriority(): Promise> { + const queue = await this.getQueue(); + const counts: Record = { + critical: 0, + high: 0, + normal: 0, + low: 0, + }; + for (const req of queue) { + counts[req.priority]++; + } + return counts; + } + + async getQueueStatus(): Promise<{ + total: number; + byPriority: Record; + byMethod: Record; + totalRetries: number; + batchesFormed: number; + lastSyncTimestamp: number | null; + isProcessing: boolean; + isMonitoring: boolean; + }> { + const queue = await this.getQueue(); + const byPriority = await this.getPendingByPriority(); + return { + total: queue.length, + byPriority, + byMethod: { ...this.metrics.byMethod }, + totalRetries: this.metrics.totalRetries, + batchesFormed: this.metrics.batchesFormed, + lastSyncTimestamp: this.metrics.lastSyncTimestamp, + isProcessing: this.isProcessing, + isMonitoring: this.monitoringInterval !== null, + }; + } + onPendingCountChange(listener: (count: number) => void): () => void { this.listeners.push(listener); return () => { - this.listeners = this.listeners.filter(l => l !== listener); + this.listeners = this.listeners.filter((l) => l !== listener); }; } + private createBatches(requests: QueuedRequest[]): BatchGroup[] { + const groups = new Map(); + + for (const req of requests) { + const key = `${req.method}:${req.endpoint}`; + const existing = groups.get(key) ?? []; + existing.push(req); + groups.set(key, existing); + } + + return Array.from(groups.entries()).map(([key, group]) => { + const [method, endpoint] = key.split(':'); + return { method, endpoint, requests: group }; + }); + } + + private async processBatch(batch: BatchGroup): Promise { + const { method, endpoint, requests } = batch; + const client = this.apiClient; + + if (!client) { + logger.warn('RequestQueue: No apiClient available for batch processing'); + return; + } + + if (requests.length === 1) { + const req = requests[0]; + try { + await client(req.config); + await this.removeFromQueue(req.id); + mobileAnalyticsService.trackEvent('request_dequeued' as any, { + priority: req.priority, + method: req.method, + endpoint: req.endpoint, + batched: false, + }); + } catch (error) { + await this.incrementRetry(req.id); + } + return; + } + + logger.info( + `RequestQueue: Batching ${requests.length} ${method} requests to ${endpoint}`, + ); + + if (method === 'GET') { + const results = await Promise.allSettled( + requests.map((req) => client(req.config).catch(() => {})), + ); + for (let i = 0; i < requests.length; i++) { + if (results[i].status === 'fulfilled') { + await this.removeFromQueue(requests[i].id); + } else { + await this.incrementRetry(requests[i].id); + } + } + return; + } + + if (method === 'PUT' || method === 'PATCH') { + const payloads = requests.map((req) => req.config.data); + try { + const mergedPayload = this.mergePayloads(payloads); + const batchConfig = { + ...requests[0].config, + data: mergedPayload, + url: endpoint, + method: method.toLowerCase(), + }; + await client(batchConfig); + for (const req of requests) { + await this.removeFromQueue(req.id); + } + mobileAnalyticsService.trackEvent('queue_batch_synced' as any, { + method, + endpoint, + batchSize: requests.length, + }); + return; + } catch (error) { + for (const req of requests) { + await this.incrementRetry(req.id); + } + return; + } + } + + for (const req of requests) { + try { + await client(req.config); + await this.removeFromQueue(req.id); + } catch (error) { + await this.incrementRetry(req.id); + } + } + } + + private mergePayloads(payloads: any[]): any { + if (payloads.length === 0) return {}; + if (payloads.length === 1) return payloads[0]; + + const merged: Record = {}; + for (const payload of payloads) { + if (payload && typeof payload === 'object') { + Object.assign(merged, payload); + } + } + return merged; + } + + private sortByPriority(queue: QueuedRequest[]): void { + queue.sort((a, b) => { + const priorityDiff = PRIORITY_ORDER[a.priority] - PRIORITY_ORDER[b.priority]; + if (priorityDiff !== 0) return priorityDiff; + return a.timestamp - b.timestamp; + }); + } + private notifyListeners(count: number): void { - this.listeners.forEach(listener => listener(count)); + this.listeners.forEach((listener) => listener(count)); } private async checkConnectivity(): Promise { try { const state = await Network.getNetworkStateAsync(); - return state.isConnected && state.isInternetReachable; + return (state.isConnected && state.isInternetReachable) ?? false; } catch { return false; } } + private async listenForNetworkChanges(): Promise { + try { + const listener = Network.addNetworkStateListener((state) => { + const online = + (state.isConnected && state.isInternetReachable) ?? false; + if (online) { + logger.info('RequestQueue: Network became available, processing queue'); + void this.processQueue(this.apiClient!); + } + }); + this.networkListener = () => listener.remove(); + } catch (error) { + logger.error( + 'RequestQueue: Failed to listen for network changes:', + error, + ); + } + } + + private async persistMetrics(): Promise { + try { + await AsyncStorage.setItem( + QUEUE_METRICS_KEY, + JSON.stringify(this.metrics), + ); + } catch (error) { + logger.error('Error persisting queue metrics:', error); + } + } + + private async restoreMetrics(): Promise { + try { + const data = await AsyncStorage.getItem(QUEUE_METRICS_KEY); + if (data) { + const restored = JSON.parse(data) as Partial; + this.metrics = { + ...this.metrics, + ...restored, + }; + } + } catch (error) { + logger.error('Error restoring queue metrics:', error); + } + } + private generateId(): string { return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } } export const requestQueue = new RequestQueue(); -export default requestQueue; \ No newline at end of file +export default requestQueue; diff --git a/tests/services/api/requestQueue.test.ts b/tests/services/api/requestQueue.test.ts new file mode 100644 index 00000000..71b64cbb --- /dev/null +++ b/tests/services/api/requestQueue.test.ts @@ -0,0 +1,274 @@ +import { InternalAxiosRequestConfig } from 'axios'; +import AsyncStorage from '@react-native-async-storage/async-storage'; +import * as Network from 'expo-network'; + +import { requestQueue } from '../../../src/services/api/requestQueue'; + +jest.mock('../../../src/utils/logger', () => ({ + __esModule: true, + default: { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + debug: jest.fn(), + }, +})); + +jest.mock('../../../src/services/healthMetrics', () => ({ + healthMetricsService: { + recordApiCall: jest.fn(), + }, +})); + +jest.mock('../../../src/services/mobileAnalytics', () => ({ + mobileAnalyticsService: { + trackEvent: jest.fn(), + }, +})); + +const mockConfig = (overrides: Partial = {}): InternalAxiosRequestConfig => + ({ + method: 'GET', + url: '/api/courses', + headers: {}, + data: undefined, + ...overrides, + }) as InternalAxiosRequestConfig; + +describe('RequestQueue', () => { + beforeEach(async () => { + jest.clearAllMocks(); + + const queue = await requestQueue.getQueue(); + for (const req of queue) { + await requestQueue.removeFromQueue(req.id); + } + + jest.spyOn(Network, 'getNetworkStateAsync').mockResolvedValue({ + isConnected: true, + isInternetReachable: true, + type: 'WIFI', + } as any); + }); + + afterEach(async () => { + requestQueue.stopMonitoring(); + await AsyncStorage.clear(); + }); + + describe('addToQueue', () => { + it('should add a request with default normal priority', async () => { + const id = await requestQueue.addToQueue(mockConfig()); + expect(id).toBeTruthy(); + + const queue = await requestQueue.getQueue(); + expect(queue).toHaveLength(1); + expect(queue[0].priority).toBe('normal'); + expect(queue[0].method).toBe('GET'); + expect(queue[0].endpoint).toBe('/api/courses'); + }); + + it('should add a request with specified priority', async () => { + await requestQueue.addToQueue(mockConfig(), 'critical'); + const queue = await requestQueue.getQueue(); + expect(queue[0].priority).toBe('critical'); + }); + + it('should sort queue by priority on insertion', async () => { + await requestQueue.addToQueue(mockConfig({ url: '/low' }), 'low'); + await requestQueue.addToQueue(mockConfig({ url: '/critical' }), 'critical'); + await requestQueue.addToQueue(mockConfig({ url: '/high' }), 'high'); + + const queue = await requestQueue.getQueue(); + expect(queue[0].priority).toBe('critical'); + expect(queue[1].priority).toBe('high'); + expect(queue[2].priority).toBe('low'); + }); + + it('should persist to AsyncStorage', async () => { + await requestQueue.addToQueue(mockConfig()); + expect(AsyncStorage.setItem).toHaveBeenCalledWith( + '@teachlink_request_queue', + expect.any(String), + ); + }); + }); + + describe('removeFromQueue', () => { + it('should remove a request by id', async () => { + const id = await requestQueue.addToQueue(mockConfig()); + await requestQueue.removeFromQueue(id); + + const queue = await requestQueue.getQueue(); + expect(queue).toHaveLength(0); + }); + }); + + describe('incrementRetry', () => { + it('should increment retry count for a request', async () => { + const id = await requestQueue.addToQueue(mockConfig()); + await requestQueue.incrementRetry(id); + + const queue = await requestQueue.getQueue(); + expect(queue[0].retries).toBe(1); + }); + }); + + describe('getPendingByPriority', () => { + it('should return counts grouped by priority', async () => { + await requestQueue.addToQueue(mockConfig({ url: '/a' }), 'critical'); + await requestQueue.addToQueue(mockConfig({ url: '/b' }), 'normal'); + await requestQueue.addToQueue(mockConfig({ url: '/c' }), 'normal'); + + const counts = await requestQueue.getPendingByPriority(); + expect(counts.critical).toBe(1); + expect(counts.normal).toBe(2); + expect(counts.high).toBe(0); + expect(counts.low).toBe(0); + }); + }); + + describe('getQueueStatus', () => { + it('should return comprehensive queue status', async () => { + await requestQueue.addToQueue(mockConfig({ method: 'GET', url: '/courses' }), 'high'); + await requestQueue.addToQueue(mockConfig({ method: 'PUT', url: '/profile' }), 'critical'); + + const status = await requestQueue.getQueueStatus(); + expect(status.total).toBe(2); + expect(status.byPriority.critical).toBe(1); + expect(status.byPriority.high).toBe(1); + expect(status.byMethod.GET).toBe(1); + expect(status.byMethod.PUT).toBe(1); + expect(status.isProcessing).toBe(false); + expect(status.isMonitoring).toBe(false); + }); + }); + + describe('processQueue', () => { + it('should process single requests successfully', async () => { + const client = jest.fn().mockResolvedValue({ data: 'ok' }); + await requestQueue.addToQueue(mockConfig()); + + await requestQueue.processQueue(client); + + const queue = await requestQueue.getQueue(); + expect(queue).toHaveLength(0); + expect(client).toHaveBeenCalledTimes(1); + }); + + it('should handle network errors during processing', async () => { + const client = jest.fn().mockRejectedValue(new Error('Network error')); + await requestQueue.addToQueue(mockConfig()); + + await requestQueue.processQueue(client); + + const queue = await requestQueue.getQueue(); + expect(queue).toHaveLength(1); + expect(queue[0].retries).toBe(1); + }); + + it('should drop requests exceeding max retries', async () => { + const client = jest.fn().mockRejectedValue(new Error('Fail')); + + await requestQueue.addToQueue(mockConfig()); + + for (let i = 0; i < 3; i++) { + await requestQueue.processQueue(client); + } + + const queue = await requestQueue.getQueue(); + expect(queue).toHaveLength(0); + }); + + it('should not process when offline', async () => { + jest.spyOn(Network, 'getNetworkStateAsync').mockResolvedValue({ + isConnected: false, + isInternetReachable: false, + type: 'NONE', + } as any); + + const client = jest.fn(); + await requestQueue.addToQueue(mockConfig()); + + await requestQueue.processQueue(client); + + expect(client).not.toHaveBeenCalled(); + }); + }); + + describe('batch processing', () => { + it('should batch multiple requests to same endpoint', async () => { + const client = jest.fn().mockResolvedValue({ data: 'ok' }); + + await requestQueue.addToQueue(mockConfig({ method: 'PUT', url: '/api/profile', data: { name: 'a' } }), 'normal'); + await requestQueue.addToQueue(mockConfig({ method: 'PUT', url: '/api/profile', data: { name: 'b' } }), 'normal'); + + await requestQueue.processQueue(client); + + const queue = await requestQueue.getQueue(); + expect(queue).toHaveLength(0); + expect(client).toHaveBeenCalledTimes(1); + }); + + it('should not batch GET requests to same endpoint', async () => { + const client = jest.fn().mockResolvedValue({ data: 'ok' }); + + await requestQueue.addToQueue(mockConfig({ method: 'GET', url: '/api/courses' })); + await requestQueue.addToQueue(mockConfig({ method: 'GET', url: '/api/courses' })); + + await requestQueue.processQueue(client); + + expect(client).toHaveBeenCalledTimes(2); + }); + }); + + describe('resume', () => { + it('should restore pending count from persisted storage', async () => { + await requestQueue.addToQueue(mockConfig()); + await requestQueue.addToQueue(mockConfig()); + + const mockGetItem = AsyncStorage.getItem as jest.Mock; + const savedData = mockGetItem.mock.results[0]?.value; + + expect(savedData).toBeTruthy(); + }); + }); + + describe('onPendingCountChange', () => { + it('should notify listeners when queue changes', async () => { + const listener = jest.fn(); + requestQueue.onPendingCountChange(listener); + + await requestQueue.addToQueue(mockConfig()); + + expect(listener).toHaveBeenCalledWith(1); + }); + + it('should return unsubscribe function', async () => { + const listener = jest.fn(); + const unsubscribe = requestQueue.onPendingCountChange(listener); + unsubscribe(); + + await requestQueue.addToQueue(mockConfig()); + + expect(listener).not.toHaveBeenCalled(); + }); + }); + + describe('startMonitoring / stopMonitoring', () => { + it('should start processing queue on monitoring start', async () => { + const client = jest.fn().mockResolvedValue({ data: 'ok' }); + await requestQueue.addToQueue(mockConfig()); + + requestQueue.startMonitoring(client); + + await new Promise(process.nextTick); + expect(client).toHaveBeenCalled(); + requestQueue.stopMonitoring(); + }); + + it('should stop monitoring without errors', () => { + expect(() => requestQueue.stopMonitoring()).not.toThrow(); + }); + }); +});