From fe6d3adf5083b9d098cd6868c9ff21ddd43866b3 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Fri, 23 May 2025 18:51:13 +0300 Subject: [PATCH 01/17] Add draft implementation of log upload tracking --- .../epam/reportportal/service/LaunchImpl.java | 33 ++- .../logs/TrackingLoggingSubscriber.java | 153 +++++++++++ .../service/LaunchLogCompletionTest.java | 252 ++++++++++++++++++ 3 files changed, 424 insertions(+), 14 deletions(-) create mode 100644 src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java create mode 100644 src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 32687807..a97eb223 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -22,7 +22,7 @@ import com.epam.reportportal.listeners.ListenerParameters; import com.epam.reportportal.message.TypeAwareByteSource; import com.epam.reportportal.service.logs.LogBatchingFlowable; -import com.epam.reportportal.service.logs.LoggingSubscriber; +import com.epam.reportportal.service.logs.TrackingLoggingSubscriber; import com.epam.reportportal.service.statistics.StatisticsService; import com.epam.reportportal.utils.*; import com.epam.reportportal.utils.files.ByteSource; @@ -127,6 +127,7 @@ public class LaunchImpl extends Launch { private final Supplier> launch; private final AtomicBoolean isShutDownHook = new AtomicBoolean(false); private final PublishSubject logEmitter; + private final TrackingLoggingSubscriber loggingSubscriber; private final ExecutorService executor; private final Scheduler scheduler; private StatisticsService statisticsService; @@ -142,11 +143,15 @@ private static Supplier> getLaunchSupplier(@Nonnull final ReportPo private static PublishSubject getLogEmitter(@Nonnull final ReportPortalClient client, @Nonnull final ListenerParameters parameters, @Nonnull final Scheduler scheduler, - @Nonnull final FlowableSubscriber loggingSubscriber) { + @Nonnull final TrackingLoggingSubscriber loggingSubscriber) { PublishSubject emitter = PublishSubject.create(); RxJavaPlugins.onAssembly(new LogBatchingFlowable(new FlowableFromObservable<>(emitter), parameters)) - .flatMap((Function, Flowable>) rqs -> client.log(HttpRequestUtils.buildLogMultiPartRequest( - rqs)).retry(DEFAULT_REQUEST_RETRY).toFlowable()) + .flatMap((Function, Flowable>) rqs -> { + // Notify the subscriber that a batch is being processed - do this first + loggingSubscriber.onBatchProcessing(); + // Then make the HTTP request + return client.log(HttpRequestUtils.buildLogMultiPartRequest(rqs)).retry(DEFAULT_REQUEST_RETRY).toFlowable(); + }) .onBackpressureBuffer(parameters.getRxBufferSize(), false, true) .cache() .subscribeOn(scheduler) @@ -176,13 +181,19 @@ protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonn LOGGER.info("Rerun: {}", parameters.isRerun()); launch = getLaunchSupplier(getClient(), getScheduler(), startRq); - logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), loggingSubscriber); + // Use TrackingLoggingSubscriber with delegation to the provided subscriber + if (loggingSubscriber instanceof TrackingLoggingSubscriber) { + this.loggingSubscriber = (TrackingLoggingSubscriber) loggingSubscriber; + } else { + this.loggingSubscriber = new TrackingLoggingSubscriber(loggingSubscriber); + } + logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), this.loggingSubscriber); projectSettings = getProjectSettings(getClient(), getScheduler()); } protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonnull final ListenerParameters parameters, @Nonnull final StartLaunchRQ rq, @Nonnull final ExecutorService executorService) { - this(reportPortalClient, parameters, rq, executorService, new LoggingSubscriber()); + this(reportPortalClient, parameters, rq, executorService, new TrackingLoggingSubscriber()); } protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonnull final ListenerParameters parameters, @@ -199,7 +210,8 @@ protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonn LOGGER.info("Rerun: {}", parameters.isRerun()); launch = () -> launchMaybe.cache().subscribeOn(getScheduler()); - logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), new LoggingSubscriber()); + loggingSubscriber = new TrackingLoggingSubscriber(); + logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), loggingSubscriber); projectSettings = getProjectSettings(getClient(), getScheduler()); } @@ -420,13 +432,6 @@ public void finish(final FinishExecutionRQ request) { throw new InternalReportPortalClientException("Executor service is already shut down"); } - try { - // FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // Close and re-create statistics service getStatisticsService().close(); statisticsService = new StatisticsService(getParameters()); diff --git a/src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java b/src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java new file mode 100644 index 00000000..efe9f799 --- /dev/null +++ b/src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java @@ -0,0 +1,153 @@ +/* + * Copyright 2025 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.epam.reportportal.service.logs; + +import com.epam.ta.reportportal.ws.model.BatchSaveOperatingRS; +import io.reactivex.Completable; +import io.reactivex.FlowableSubscriber; +import io.reactivex.subjects.CompletableSubject; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A logging subscriber that tracks the completion of log processing. + * This subscriber allows waiting for all log batches to be processed before completing. + * It can also delegate to another subscriber for compatibility. + */ +public final class TrackingLoggingSubscriber implements FlowableSubscriber { + private static final Logger LOGGER = LoggerFactory.getLogger(TrackingLoggingSubscriber.class); + + private final AtomicInteger pendingBatches = new AtomicInteger(0); + private final AtomicReference completionSubject = new AtomicReference<>(CompletableSubject.create()); + private final FlowableSubscriber delegate; + private volatile boolean completed = false; + + /** + * Creates a tracking subscriber that logs errors but doesn't delegate. + */ + public TrackingLoggingSubscriber() { + this(null); + } + + /** + * Creates a tracking subscriber that delegates to another subscriber. + * + * @param delegate The subscriber to delegate to, can be null + */ + public TrackingLoggingSubscriber(@Nullable FlowableSubscriber delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(@Nonnull Subscription s) { + // Request unlimited items to ensure all batches are processed + s.request(Long.MAX_VALUE); + if (delegate != null) { + delegate.onSubscribe(s); + } + } + + @Override + public void onNext(BatchSaveOperatingRS result) { + // Decrement pending batches when we receive a batch response + int remaining = pendingBatches.decrementAndGet(); + if (remaining < 0) { + LOGGER.warn( + "[{}] Pending batches counter went negative: {}. This indicates a bug in batch tracking.", + Thread.currentThread().getId(), + remaining + ); + // Reset to 0 to prevent issues + pendingBatches.set(0); + } + checkCompletion(); + if (delegate != null) { + delegate.onNext(result); + } + } + + @Override + public void onError(Throwable e) { + LOGGER.error("[{}] ReportPortal logging error", Thread.currentThread().getId(), e); + // Complete with error + CompletableSubject subject = completionSubject.get(); + if (subject != null && !subject.hasComplete() && !subject.hasThrowable()) { + subject.onError(e); + } + if (delegate != null) { + delegate.onError(e); + } + } + + @Override + public void onComplete() { + completed = true; + checkCompletion(); + if (delegate != null) { + delegate.onComplete(); + } + } + + /** + * Notifies that a log batch is being processed. + * This should be called when a batch is sent to the network layer. + */ + public void onBatchProcessing() { + int pending = pendingBatches.incrementAndGet(); + LOGGER.debug("[{}] Log batch processing started. Pending batches: {}", Thread.currentThread().getId(), pending); + } + + /** + * Returns a Completable that completes when all log processing is finished. + * This includes both the completion of the log stream and all pending batch requests. + * + * @return A Completable that signals when all logging is complete + */ + @Nonnull + public Completable getCompletion() { + return completionSubject.get(); + } + + /** + * Checks if all conditions for completion are met and completes the subject if so. + * This method handles the race condition between onComplete() and the last onNext() calls. + *

+ * Race condition scenario: + * 1. Log stream emits completion (onComplete called, completed = true) + * 2. But there are still pending HTTP batch requests + * 3. When those HTTP responses arrive (onNext called), we need to check if we can now complete + *

+ * Alternative scenario: + * 1. All HTTP responses arrive first (pendingBatches = 0) + * 2. Then log stream completes (onComplete called) + * 3. We need to complete immediately since all conditions are met + */ + private void checkCompletion() { + // Both conditions must be true: stream completed AND no pending batches + if (completed && pendingBatches.get() == 0) { + CompletableSubject subject = completionSubject.get(); + if (subject != null && !subject.hasComplete() && !subject.hasThrowable()) { + subject.onComplete(); + } + } + } +} \ No newline at end of file diff --git a/src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java b/src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java new file mode 100644 index 00000000..247a5e03 --- /dev/null +++ b/src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java @@ -0,0 +1,252 @@ +/* + * Copyright 2025 EPAM Systems + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.reportportal.service; + +import com.epam.reportportal.listeners.ListenerParameters; +import com.epam.reportportal.listeners.LogLevel; +import com.epam.reportportal.test.TestUtils; +import com.epam.reportportal.util.test.CommonUtils; +import com.epam.ta.reportportal.ws.model.BatchSaveOperatingRS; +import com.epam.ta.reportportal.ws.model.log.SaveLogRQ; +import io.reactivex.Maybe; +import okhttp3.MultipartBody; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.ArgumentCaptor; + +import java.util.Calendar; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class LaunchLogCompletionTest { + + private ExecutorService executor; + private ListenerParameters parameters; + + @BeforeEach + public void setUp() { + parameters = TestUtils.standardParameters(); + parameters.setBatchLogsSize(3); // Small batch size for testing + executor = Executors.newSingleThreadExecutor(); + } + + @AfterEach + public void tearDown() { + CommonUtils.shutdownExecutorService(executor); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS) + @SuppressWarnings("unchecked") + public void test_launch_finish_waits_for_log_completion() throws InterruptedException { + ReportPortalClient client = mock(ReportPortalClient.class); + TestUtils.mockLaunch(client, "launchUuid"); + + // Track the order of operations + AtomicInteger operationOrder = new AtomicInteger(0); + AtomicInteger logResponseTime = new AtomicInteger(-1); + AtomicInteger finishStartTime = new AtomicInteger(-1); + AtomicInteger finishEndTime = new AtomicInteger(-1); + + when(client.log(any(List.class))).thenAnswer(invocation -> { + return Maybe.fromCallable(() -> { + // Simulate network delay for log processing + Thread.sleep(300); // Simulate network delay + logResponseTime.set(operationOrder.incrementAndGet()); + return new BatchSaveOperatingRS(); + }); + }); + + Launch launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); + launch.start().blockingGet(); + + // Emit logs that will create at least one batch + for (int i = 0; i < 3; i++) { + final int logIndex = i; + launch.log(launchUuid -> { + SaveLogRQ rq = new SaveLogRQ(); + rq.setLaunchUuid(launchUuid); + rq.setLevel(LogLevel.INFO.name()); + rq.setLogTime(Calendar.getInstance().getTime()); + rq.setMessage("Test log " + logIndex); + return rq; + }); + } + + // Give some time for logs to be emitted + Thread.sleep(100); + + // Record when finish starts + finishStartTime.set(operationOrder.incrementAndGet()); + + // Finish the launch - this should wait for all logs to be processed + launch.finish(TestUtils.positiveFinishRequest()); + + // Record when finish ends + finishEndTime.set(operationOrder.incrementAndGet()); + + // Verify that log response happened between finish start and finish end + assertThat("Log response should have completed", logResponseTime.get(), greaterThan(0)); + assertThat("Log should have started before finish started", finishStartTime.get(), greaterThan(0)); + assertThat("Finish should have waited for log response", logResponseTime.get(), greaterThan(finishStartTime.get())); + assertThat("Finish should have completed after log response", finishEndTime.get(), greaterThan(logResponseTime.get())); + + // Verify log call was made + verify(client, atLeastOnce()).log(any(List.class)); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS) + @SuppressWarnings("unchecked") + public void test_launch_finish_without_logs_completes_quickly() { + ReportPortalClient client = mock(ReportPortalClient.class); + TestUtils.mockLaunch(client, "launchUuid"); + + Launch launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); + launch.start().blockingGet(); + + // Finish the launch without any logs + long startTime = System.currentTimeMillis(); + launch.finish(TestUtils.positiveFinishRequest()); + long endTime = System.currentTimeMillis(); + + // Verify that finish() completed quickly since there were no logs + long duration = endTime - startTime; + assertThat("Launch finish should complete quickly without logs", duration, lessThan(1000L)); + + // Verify no log calls were made + verify(client, never()).log(any(List.class)); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS) + @SuppressWarnings("unchecked") + public void test_launch_finish_handles_log_errors_gracefully() throws InterruptedException { + ReportPortalClient client = mock(ReportPortalClient.class); + TestUtils.mockLaunch(client, "launchUuid"); + + // Simulate log failure + when(client.log(any(List.class))).thenReturn(Maybe.error(new RuntimeException("Network error"))); + + Launch launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); + launch.start().blockingGet(); + + // Emit some logs + for (int i = 0; i < 3; i++) { + final int logIndex = i; + launch.log(launchUuid -> { + SaveLogRQ rq = new SaveLogRQ(); + rq.setLaunchUuid(launchUuid); + rq.setLevel(LogLevel.INFO.name()); + rq.setLogTime(Calendar.getInstance().getTime()); + rq.setMessage("Test log " + logIndex); + return rq; + }); + } + + // Finish the launch - this should handle log errors gracefully + long startTime = System.currentTimeMillis(); + launch.finish(TestUtils.positiveFinishRequest()); + long endTime = System.currentTimeMillis(); + + // Verify that finish() completed (even with log errors) + long duration = endTime - startTime; + assertThat("Launch finish should complete despite log errors", duration, lessThan(5000L)); + + // Verify log call was attempted + verify(client, atLeastOnce()).log(any(List.class)); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS) + @SuppressWarnings("unchecked") + public void test_virtual_item_logs_wait_for_completion() { + ReportPortalClient client = mock(ReportPortalClient.class); + TestUtils.mockLaunch(client, "launchUuid"); + TestUtils.mockStartTestItem(client, "itemUuid"); + + // Track the order of operations for virtual item logs + AtomicInteger operationOrder = new AtomicInteger(0); + AtomicInteger logResponseTime = new AtomicInteger(-1); + AtomicInteger finishStartTime = new AtomicInteger(-1); + AtomicInteger finishEndTime = new AtomicInteger(-1); + + when(client.log(any(List.class))).thenAnswer(invocation -> { + return Maybe.fromCallable(() -> { + // Simulate network delay for log processing + Thread.sleep(200); // Simulate network delay + logResponseTime.set(operationOrder.incrementAndGet()); + return new BatchSaveOperatingRS(); + }); + }); + + LaunchImpl launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); + launch.start().blockingGet(); + + // Create a virtual item + Maybe virtualItem = launch.createVirtualItem(); + + // Log to the virtual item using ReportPortal.emitLog (this goes through LoggingContext) + ReportPortal.emitLog(virtualItem, itemUuid -> { + SaveLogRQ rq = new SaveLogRQ(); + rq.setItemUuid(itemUuid); + rq.setLevel(LogLevel.INFO.name()); + rq.setLogTime(Calendar.getInstance().getTime()); + rq.setMessage("Virtual item log"); + return rq; + }); + + // Populate the virtual item + launch.startVirtualTestItem(virtualItem, TestUtils.standardStartStepRequest()); + + // Give some time for logs to be processed + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Record when finish starts + finishStartTime.set(operationOrder.incrementAndGet()); + + // Finish the launch - this should wait for virtual item logs to complete + launch.finish(TestUtils.positiveFinishRequest()); + + // Record when finish ends + finishEndTime.set(operationOrder.incrementAndGet()); + + // Verify that log response happened and finish waited for it + assertThat("Log response should have completed", logResponseTime.get(), greaterThan(0)); + assertThat("Finish should have waited for virtual item log response", logResponseTime.get(), greaterThan(finishStartTime.get())); + assertThat("Finish should have completed after log response", finishEndTime.get(), greaterThan(logResponseTime.get())); + + // Verify log call was made + verify(client, atLeastOnce()).log(any(List.class)); + } +} \ No newline at end of file From 525612609bc05606b727ee86a7b6624ba5cbb2e1 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 13:56:07 +0300 Subject: [PATCH 02/17] Remove tests --- .../service/LaunchLogCompletionTest.java | 252 ------------------ 1 file changed, 252 deletions(-) delete mode 100644 src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java diff --git a/src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java b/src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java deleted file mode 100644 index 247a5e03..00000000 --- a/src/test/java/com/epam/reportportal/service/LaunchLogCompletionTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Copyright 2025 EPAM Systems - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.epam.reportportal.service; - -import com.epam.reportportal.listeners.ListenerParameters; -import com.epam.reportportal.listeners.LogLevel; -import com.epam.reportportal.test.TestUtils; -import com.epam.reportportal.util.test.CommonUtils; -import com.epam.ta.reportportal.ws.model.BatchSaveOperatingRS; -import com.epam.ta.reportportal.ws.model.log.SaveLogRQ; -import io.reactivex.Maybe; -import okhttp3.MultipartBody; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.mockito.ArgumentCaptor; - -import java.util.Calendar; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; - -public class LaunchLogCompletionTest { - - private ExecutorService executor; - private ListenerParameters parameters; - - @BeforeEach - public void setUp() { - parameters = TestUtils.standardParameters(); - parameters.setBatchLogsSize(3); // Small batch size for testing - executor = Executors.newSingleThreadExecutor(); - } - - @AfterEach - public void tearDown() { - CommonUtils.shutdownExecutorService(executor); - } - - @Test - @Timeout(value = 30, unit = TimeUnit.SECONDS) - @SuppressWarnings("unchecked") - public void test_launch_finish_waits_for_log_completion() throws InterruptedException { - ReportPortalClient client = mock(ReportPortalClient.class); - TestUtils.mockLaunch(client, "launchUuid"); - - // Track the order of operations - AtomicInteger operationOrder = new AtomicInteger(0); - AtomicInteger logResponseTime = new AtomicInteger(-1); - AtomicInteger finishStartTime = new AtomicInteger(-1); - AtomicInteger finishEndTime = new AtomicInteger(-1); - - when(client.log(any(List.class))).thenAnswer(invocation -> { - return Maybe.fromCallable(() -> { - // Simulate network delay for log processing - Thread.sleep(300); // Simulate network delay - logResponseTime.set(operationOrder.incrementAndGet()); - return new BatchSaveOperatingRS(); - }); - }); - - Launch launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); - launch.start().blockingGet(); - - // Emit logs that will create at least one batch - for (int i = 0; i < 3; i++) { - final int logIndex = i; - launch.log(launchUuid -> { - SaveLogRQ rq = new SaveLogRQ(); - rq.setLaunchUuid(launchUuid); - rq.setLevel(LogLevel.INFO.name()); - rq.setLogTime(Calendar.getInstance().getTime()); - rq.setMessage("Test log " + logIndex); - return rq; - }); - } - - // Give some time for logs to be emitted - Thread.sleep(100); - - // Record when finish starts - finishStartTime.set(operationOrder.incrementAndGet()); - - // Finish the launch - this should wait for all logs to be processed - launch.finish(TestUtils.positiveFinishRequest()); - - // Record when finish ends - finishEndTime.set(operationOrder.incrementAndGet()); - - // Verify that log response happened between finish start and finish end - assertThat("Log response should have completed", logResponseTime.get(), greaterThan(0)); - assertThat("Log should have started before finish started", finishStartTime.get(), greaterThan(0)); - assertThat("Finish should have waited for log response", logResponseTime.get(), greaterThan(finishStartTime.get())); - assertThat("Finish should have completed after log response", finishEndTime.get(), greaterThan(logResponseTime.get())); - - // Verify log call was made - verify(client, atLeastOnce()).log(any(List.class)); - } - - @Test - @Timeout(value = 30, unit = TimeUnit.SECONDS) - @SuppressWarnings("unchecked") - public void test_launch_finish_without_logs_completes_quickly() { - ReportPortalClient client = mock(ReportPortalClient.class); - TestUtils.mockLaunch(client, "launchUuid"); - - Launch launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); - launch.start().blockingGet(); - - // Finish the launch without any logs - long startTime = System.currentTimeMillis(); - launch.finish(TestUtils.positiveFinishRequest()); - long endTime = System.currentTimeMillis(); - - // Verify that finish() completed quickly since there were no logs - long duration = endTime - startTime; - assertThat("Launch finish should complete quickly without logs", duration, lessThan(1000L)); - - // Verify no log calls were made - verify(client, never()).log(any(List.class)); - } - - @Test - @Timeout(value = 30, unit = TimeUnit.SECONDS) - @SuppressWarnings("unchecked") - public void test_launch_finish_handles_log_errors_gracefully() throws InterruptedException { - ReportPortalClient client = mock(ReportPortalClient.class); - TestUtils.mockLaunch(client, "launchUuid"); - - // Simulate log failure - when(client.log(any(List.class))).thenReturn(Maybe.error(new RuntimeException("Network error"))); - - Launch launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); - launch.start().blockingGet(); - - // Emit some logs - for (int i = 0; i < 3; i++) { - final int logIndex = i; - launch.log(launchUuid -> { - SaveLogRQ rq = new SaveLogRQ(); - rq.setLaunchUuid(launchUuid); - rq.setLevel(LogLevel.INFO.name()); - rq.setLogTime(Calendar.getInstance().getTime()); - rq.setMessage("Test log " + logIndex); - return rq; - }); - } - - // Finish the launch - this should handle log errors gracefully - long startTime = System.currentTimeMillis(); - launch.finish(TestUtils.positiveFinishRequest()); - long endTime = System.currentTimeMillis(); - - // Verify that finish() completed (even with log errors) - long duration = endTime - startTime; - assertThat("Launch finish should complete despite log errors", duration, lessThan(5000L)); - - // Verify log call was attempted - verify(client, atLeastOnce()).log(any(List.class)); - } - - @Test - @Timeout(value = 30, unit = TimeUnit.SECONDS) - @SuppressWarnings("unchecked") - public void test_virtual_item_logs_wait_for_completion() { - ReportPortalClient client = mock(ReportPortalClient.class); - TestUtils.mockLaunch(client, "launchUuid"); - TestUtils.mockStartTestItem(client, "itemUuid"); - - // Track the order of operations for virtual item logs - AtomicInteger operationOrder = new AtomicInteger(0); - AtomicInteger logResponseTime = new AtomicInteger(-1); - AtomicInteger finishStartTime = new AtomicInteger(-1); - AtomicInteger finishEndTime = new AtomicInteger(-1); - - when(client.log(any(List.class))).thenAnswer(invocation -> { - return Maybe.fromCallable(() -> { - // Simulate network delay for log processing - Thread.sleep(200); // Simulate network delay - logResponseTime.set(operationOrder.incrementAndGet()); - return new BatchSaveOperatingRS(); - }); - }); - - LaunchImpl launch = new LaunchImpl(client, parameters, TestUtils.standardLaunchRequest(parameters), executor); - launch.start().blockingGet(); - - // Create a virtual item - Maybe virtualItem = launch.createVirtualItem(); - - // Log to the virtual item using ReportPortal.emitLog (this goes through LoggingContext) - ReportPortal.emitLog(virtualItem, itemUuid -> { - SaveLogRQ rq = new SaveLogRQ(); - rq.setItemUuid(itemUuid); - rq.setLevel(LogLevel.INFO.name()); - rq.setLogTime(Calendar.getInstance().getTime()); - rq.setMessage("Virtual item log"); - return rq; - }); - - // Populate the virtual item - launch.startVirtualTestItem(virtualItem, TestUtils.standardStartStepRequest()); - - // Give some time for logs to be processed - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - // Record when finish starts - finishStartTime.set(operationOrder.incrementAndGet()); - - // Finish the launch - this should wait for virtual item logs to complete - launch.finish(TestUtils.positiveFinishRequest()); - - // Record when finish ends - finishEndTime.set(operationOrder.incrementAndGet()); - - // Verify that log response happened and finish waited for it - assertThat("Log response should have completed", logResponseTime.get(), greaterThan(0)); - assertThat("Finish should have waited for virtual item log response", logResponseTime.get(), greaterThan(finishStartTime.get())); - assertThat("Finish should have completed after log response", finishEndTime.get(), greaterThan(logResponseTime.get())); - - // Verify log call was made - verify(client, atLeastOnce()).log(any(List.class)); - } -} \ No newline at end of file From c495f735f43d14ccaf1a7383c6ecfe00d475c5ac Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 13:57:24 +0300 Subject: [PATCH 03/17] Revert LaunchImpl.java --- .../epam/reportportal/service/LaunchImpl.java | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index a97eb223..32687807 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -22,7 +22,7 @@ import com.epam.reportportal.listeners.ListenerParameters; import com.epam.reportportal.message.TypeAwareByteSource; import com.epam.reportportal.service.logs.LogBatchingFlowable; -import com.epam.reportportal.service.logs.TrackingLoggingSubscriber; +import com.epam.reportportal.service.logs.LoggingSubscriber; import com.epam.reportportal.service.statistics.StatisticsService; import com.epam.reportportal.utils.*; import com.epam.reportportal.utils.files.ByteSource; @@ -127,7 +127,6 @@ public class LaunchImpl extends Launch { private final Supplier> launch; private final AtomicBoolean isShutDownHook = new AtomicBoolean(false); private final PublishSubject logEmitter; - private final TrackingLoggingSubscriber loggingSubscriber; private final ExecutorService executor; private final Scheduler scheduler; private StatisticsService statisticsService; @@ -143,15 +142,11 @@ private static Supplier> getLaunchSupplier(@Nonnull final ReportPo private static PublishSubject getLogEmitter(@Nonnull final ReportPortalClient client, @Nonnull final ListenerParameters parameters, @Nonnull final Scheduler scheduler, - @Nonnull final TrackingLoggingSubscriber loggingSubscriber) { + @Nonnull final FlowableSubscriber loggingSubscriber) { PublishSubject emitter = PublishSubject.create(); RxJavaPlugins.onAssembly(new LogBatchingFlowable(new FlowableFromObservable<>(emitter), parameters)) - .flatMap((Function, Flowable>) rqs -> { - // Notify the subscriber that a batch is being processed - do this first - loggingSubscriber.onBatchProcessing(); - // Then make the HTTP request - return client.log(HttpRequestUtils.buildLogMultiPartRequest(rqs)).retry(DEFAULT_REQUEST_RETRY).toFlowable(); - }) + .flatMap((Function, Flowable>) rqs -> client.log(HttpRequestUtils.buildLogMultiPartRequest( + rqs)).retry(DEFAULT_REQUEST_RETRY).toFlowable()) .onBackpressureBuffer(parameters.getRxBufferSize(), false, true) .cache() .subscribeOn(scheduler) @@ -181,19 +176,13 @@ protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonn LOGGER.info("Rerun: {}", parameters.isRerun()); launch = getLaunchSupplier(getClient(), getScheduler(), startRq); - // Use TrackingLoggingSubscriber with delegation to the provided subscriber - if (loggingSubscriber instanceof TrackingLoggingSubscriber) { - this.loggingSubscriber = (TrackingLoggingSubscriber) loggingSubscriber; - } else { - this.loggingSubscriber = new TrackingLoggingSubscriber(loggingSubscriber); - } - logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), this.loggingSubscriber); + logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), loggingSubscriber); projectSettings = getProjectSettings(getClient(), getScheduler()); } protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonnull final ListenerParameters parameters, @Nonnull final StartLaunchRQ rq, @Nonnull final ExecutorService executorService) { - this(reportPortalClient, parameters, rq, executorService, new TrackingLoggingSubscriber()); + this(reportPortalClient, parameters, rq, executorService, new LoggingSubscriber()); } protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonnull final ListenerParameters parameters, @@ -210,8 +199,7 @@ protected LaunchImpl(@Nonnull final ReportPortalClient reportPortalClient, @Nonn LOGGER.info("Rerun: {}", parameters.isRerun()); launch = () -> launchMaybe.cache().subscribeOn(getScheduler()); - loggingSubscriber = new TrackingLoggingSubscriber(); - logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), loggingSubscriber); + logEmitter = getLogEmitter(getClient(), getParameters(), getScheduler(), new LoggingSubscriber()); projectSettings = getProjectSettings(getClient(), getScheduler()); } @@ -432,6 +420,13 @@ public void finish(final FinishExecutionRQ request) { throw new InternalReportPortalClientException("Executor service is already shut down"); } + try { + // FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // Close and re-create statistics service getStatisticsService().close(); statisticsService = new StatisticsService(getParameters()); From 1f3359180f2be952d6fc511267938077ea8b62dc Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 14:05:49 +0300 Subject: [PATCH 04/17] Add LaunchImpl.log(Maybe, jFunction) method --- CHANGELOG.md | 2 ++ .../epam/reportportal/service/LaunchImpl.java | 24 +++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e299823..05cb5422 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## [Unreleased] +### Added +- Add LaunchImpl.log(Maybe, jFunction) method, by @HardNorth ## [5.3.11] ### Fixed diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 32687807..17723e21 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -84,6 +84,8 @@ public class LaunchImpl extends Launch { private static final int ITEM_FINISH_MAX_RETRIES = 10; private static final int ITEM_FINISH_RETRY_TIMEOUT = 10; + private static final int LOG_REMOVE_FACTOR = 100; + private static final Predicate INTERNAL_CLIENT_EXCEPTION_PREDICATE = throwable -> throwable instanceof InternalReportPortalClientException; private static final Predicate TEST_ITEM_FINISH_RETRY_PREDICATE = throwable -> (throwable instanceof ReportPortalException && ErrorType.FINISH_ITEM_NOT_ALLOWED.equals(((ReportPortalException) throwable).getError().getErrorType())) @@ -825,8 +827,7 @@ public Maybe finishTestItem(final Maybe item, fin getStepReporter().removeParent(item); LoggingContext.dispose(); - int removeFactor = 100; - if (rq.hashCode() % removeFactor == 0) { + if (rq.hashCode() % LOG_REMOVE_FACTOR == 0) { logCompletables.removeIf(c -> c.test().completions() > 0); } return finishResponse; @@ -880,6 +881,25 @@ public void log(@Nonnull final java.util.function.Function lo result.subscribe(SubscriptionUtils.logMaybeResults("Log item")); } + /** + * Logs message to the ReportPortal Launch. + * + * @param logItemUuid Test Item ID promise + * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. + */ + public void log(@Nonnull final Maybe logItemUuid, @Nonnull final java.util.function.Function logSupplier) { + Maybe result = Maybe.zip( + getLaunch(), logItemUuid, (launchUuid, itemUuid) -> { + SaveLogRQ rq = prepareRequest(logSupplier.apply(launchUuid)); + rq.setItemUuid(itemUuid); + emitLog(rq); + return rq; + } + ).cache(); + logCompletables.add(result.ignoreElement()); + result.subscribe(SubscriptionUtils.logMaybeResults("Log item")); + } + /** * Wrapper around TestItem entity to be able to track parent and children items */ From 4df521b73c55a0c2ddd63c5ffd09c5b50270d732 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 14:31:05 +0300 Subject: [PATCH 05/17] Add Launch.log(Maybe, jFunction) method --- CHANGELOG.md | 2 +- .../com/epam/reportportal/service/Launch.java | 16 ++++++++++++++-- .../epam/reportportal/service/LaunchImpl.java | 3 +++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05cb5422..d7846ba3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## [Unreleased] ### Added -- Add LaunchImpl.log(Maybe, jFunction) method, by @HardNorth +- Add Launch.log(Maybe, jFunction) method, by @HardNorth ## [5.3.11] ### Fixed diff --git a/src/main/java/com/epam/reportportal/service/Launch.java b/src/main/java/com/epam/reportportal/service/Launch.java index 5627690a..82fe7b25 100644 --- a/src/main/java/com/epam/reportportal/service/Launch.java +++ b/src/main/java/com/epam/reportportal/service/Launch.java @@ -145,19 +145,27 @@ abstract public Maybe startVirtualTestItem(final Maybe parentId, final StartTestItemRQ rq); /** - * Logs message to the ReportPortal Launch, root item. + * // * Logs message to the ReportPortal Launch, provided request. * * @param rq Log request. */ abstract public void log(@Nonnull final SaveLogRQ rq); /** - * Logs message to the ReportPortal Launch. + * Logs message to the ReportPortal Launch, request factory. * * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. */ abstract public void log(@Nonnull final Function logSupplier); + /** + * Logs message to the ReportPortal Launch, request factory with specified Test Item UUID. + * + * @param logItemUuid Test Item ID promise + * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. + */ + public abstract void log(@Nonnull Maybe logItemUuid, @Nonnull Function logSupplier); + /** * Finishes Test Item in ReportPortal asynchronously (non-blocking). Schedules finish after success of all child items. * @@ -284,6 +292,10 @@ public void log(@Nonnull SaveLogRQ rq) { public void log(@Nonnull final Function logSupplier) { } + @Override + public void log(@Nonnull Maybe logItemUuid, @Nonnull Function logSupplier) { + } + @Override @Nonnull public Maybe finishTestItem(Maybe itemId, FinishTestItemRQ rq) { diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 17723e21..7cd04370 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -857,6 +857,7 @@ private void emitLog(@Nonnull final SaveLogRQ rq) { * * @param rq Log request. */ + @Override public void log(@Nonnull final SaveLogRQ rq) { Maybe result = getLaunch().map(launchUuid -> { emitLog(prepareRequest(launchUuid, rq)); @@ -871,6 +872,7 @@ public void log(@Nonnull final SaveLogRQ rq) { * * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. */ + @Override public void log(@Nonnull final java.util.function.Function logSupplier) { Maybe result = getLaunch().map(launchUuid -> { SaveLogRQ rq = prepareRequest(logSupplier.apply(launchUuid)); @@ -887,6 +889,7 @@ public void log(@Nonnull final java.util.function.Function lo * @param logItemUuid Test Item ID promise * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. */ + @Override public void log(@Nonnull final Maybe logItemUuid, @Nonnull final java.util.function.Function logSupplier) { Maybe result = Maybe.zip( getLaunch(), logItemUuid, (launchUuid, itemUuid) -> { From c32f14a7d80f2f0abf73853b58ddc8b66f7bc581 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 14:40:52 +0300 Subject: [PATCH 06/17] Simplify LoggingContext class --- .../epam/reportportal/service/LaunchImpl.java | 8 --- .../reportportal/service/LoggingContext.java | 52 +++---------------- 2 files changed, 6 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 7cd04370..9f26c547 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -407,7 +407,6 @@ protected void waitForItemsCompletion(Completable itemCompletable) { getLaunch().ignoreElement(), createVirtualItemCompletable(), itemCompletable, - LoggingContext.completed(), completeLogEmitter() ); } @@ -422,13 +421,6 @@ public void finish(final FinishExecutionRQ request) { throw new InternalReportPortalClientException("Executor service is already shut down"); } - try { - // FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // Close and re-create statistics service getStatisticsService().close(); statisticsService = new StatisticsService(getParameters()); diff --git a/src/main/java/com/epam/reportportal/service/LoggingContext.java b/src/main/java/com/epam/reportportal/service/LoggingContext.java index 5c0df685..348a828c 100644 --- a/src/main/java/com/epam/reportportal/service/LoggingContext.java +++ b/src/main/java/com/epam/reportportal/service/LoggingContext.java @@ -15,19 +15,18 @@ */ package com.epam.reportportal.service; -import com.epam.reportportal.utils.SubscriptionUtils; import com.epam.ta.reportportal.ws.model.log.SaveLogRQ; -import io.reactivex.Completable; import io.reactivex.Flowable; import io.reactivex.Maybe; import org.apache.commons.lang3.tuple.Pair; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.*; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.Collectors; import static java.util.Optional.ofNullable; @@ -44,7 +43,6 @@ * @see LoggingContext#init(Maybe) */ public class LoggingContext { - private static final Queue USED_CONTEXTS = new ConcurrentLinkedQueue<>(); private static final ThreadLocal>> CONTEXT_THREAD_LOCAL = new InheritableThreadLocal<>(); private static final Set THREAD_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -95,31 +93,9 @@ public static LoggingContext init(@Nonnull final Maybe itemUuid) { * Disposes current logging context */ public static void dispose() { - int removeFactor = 100; - ofNullable(getContext()).map(Deque::poll).ifPresent(context -> { - USED_CONTEXTS.add(context); - if (context.hashCode() % removeFactor == 0) { - USED_CONTEXTS.removeIf(ctx -> { - ctx.completables.removeIf(c -> c.test().completions() > 0); - return ctx.completables.isEmpty(); - }); - } - }); + ofNullable(getContext()).ifPresent(Deque::poll); } - public static Completable completed() { - Completable completable = Completable.merge(USED_CONTEXTS.stream().map(LoggingContext::complete).collect(Collectors.toList())); - Deque context = getContext(); - return ofNullable(context).map(ctx -> Completable.concat(Arrays.asList( - completable, - Completable.merge(ctx.stream().map(LoggingContext::complete).collect(Collectors.toList())) - ))).orElse(completable); - } - - /** - * Messages queue to track items execution order - */ - private final Queue completables = new ConcurrentLinkedQueue<>(); /* a UUID of TestItem in ReportPortal to report into */ private final Maybe itemUuid; @@ -138,12 +114,7 @@ public void emit(@Nonnull final Maybe logItemUuid, @Nonnull final java.u if (launch == null) { return; } - Maybe future = logItemUuid.map(itemUuid -> { - launch.log(logSupplier.apply(itemUuid)); - return itemUuid; - }).cache(); - completables.add(future.ignoreElement()); - future.subscribe(SubscriptionUtils.logMaybeResults("LoggingContext")); + launch.log(logItemUuid, logSupplier); } /** @@ -154,15 +125,4 @@ public void emit(@Nonnull final Maybe logItemUuid, @Nonnull final java.u public void emit(@Nonnull final java.util.function.Function logSupplier) { emit(itemUuid, logSupplier); } - - /** - * Complete the context. - * - * @return the instance will be completed when all logs are sent - */ - public Completable complete() { - Completable completable = Completable.merge(completables); - completables.clear(); - return completable; - } } From a2e727dfe2661d2abebddef8b72390664b8cf862 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 15:11:10 +0300 Subject: [PATCH 07/17] Return wait --- .../com/epam/reportportal/service/LaunchImpl.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 9f26c547..8d3cb626 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -421,6 +421,13 @@ public void finish(final FinishExecutionRQ request) { throw new InternalReportPortalClientException("Executor service is already shut down"); } + try { + // FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // Close and re-create statistics service getStatisticsService().close(); statisticsService = new StatisticsService(getParameters()); @@ -883,14 +890,13 @@ public void log(@Nonnull final java.util.function.Function lo */ @Override public void log(@Nonnull final Maybe logItemUuid, @Nonnull final java.util.function.Function logSupplier) { - Maybe result = Maybe.zip( + Maybe result = RxJavaPlugins.onAssembly(Maybe.zip( getLaunch(), logItemUuid, (launchUuid, itemUuid) -> { - SaveLogRQ rq = prepareRequest(logSupplier.apply(launchUuid)); - rq.setItemUuid(itemUuid); + SaveLogRQ rq = prepareRequest(launchUuid, logSupplier.apply(itemUuid)); emitLog(rq); return rq; } - ).cache(); + ).cache()); logCompletables.add(result.ignoreElement()); result.subscribe(SubscriptionUtils.logMaybeResults("Log item")); } From fc4eca49998cf7332dab10431ae7c5ea87632c96 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 17:44:23 +0300 Subject: [PATCH 08/17] Simplify --- .../com/epam/reportportal/service/logs/BufferSubscriber.java | 4 ++-- .../epam/reportportal/service/logs/LogBatchingFlowable.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java b/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java index 0e414dd6..37b00069 100644 --- a/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java +++ b/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java @@ -32,7 +32,7 @@ */ public class BufferSubscriber implements FlowableSubscriber, Subscription { private final ReentrantLock lock = new ReentrantLock(); - private final Subscriber> downstream; + private final Subscriber> downstream; private final int maxSize; private final long payloadLimit; @@ -41,7 +41,7 @@ public class BufferSubscriber implements FlowableSubscriber, Subscrip private volatile Subscription upstream; private volatile boolean done; - public BufferSubscriber(Subscriber> actual, int batchMaxSize, long batchPayloadLimit) { + public BufferSubscriber(Subscriber> actual, int batchMaxSize, long batchPayloadLimit) { downstream = actual; maxSize = batchMaxSize; payloadLimit = batchPayloadLimit; diff --git a/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java b/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java index 730b39b1..00858534 100644 --- a/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java +++ b/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java @@ -45,7 +45,7 @@ public LogBatchingFlowable(Flowable flowableSource, ListenerParameter @Override protected void subscribeActual(Subscriber> s) { - source.subscribe(new BufferSubscriber(new SerializedSubscriber<>(s), maxSize, payloadLimit)); + source.subscribe(new BufferSubscriber(s, maxSize, payloadLimit)); } @Override From 4f61048396bfd7f1f2ec09abafd303ba3565ff2e Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 17:49:58 +0300 Subject: [PATCH 09/17] Return back hard wait --- src/main/java/com/epam/reportportal/service/LaunchImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 8d3cb626..e3bd9d1d 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -423,7 +423,7 @@ public void finish(final FinishExecutionRQ request) { try { // FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed - Thread.sleep(500); + Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } From 2ca875c0cb8a23b219444d5e9282b8f3885306f9 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:02:15 +0300 Subject: [PATCH 10/17] Remove TrackingLoggingSubscriber.java --- .../logs/TrackingLoggingSubscriber.java | 153 ------------------ 1 file changed, 153 deletions(-) delete mode 100644 src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java diff --git a/src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java b/src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java deleted file mode 100644 index efe9f799..00000000 --- a/src/main/java/com/epam/reportportal/service/logs/TrackingLoggingSubscriber.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2025 EPAM Systems - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.epam.reportportal.service.logs; - -import com.epam.ta.reportportal.ws.model.BatchSaveOperatingRS; -import io.reactivex.Completable; -import io.reactivex.FlowableSubscriber; -import io.reactivex.subjects.CompletableSubject; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A logging subscriber that tracks the completion of log processing. - * This subscriber allows waiting for all log batches to be processed before completing. - * It can also delegate to another subscriber for compatibility. - */ -public final class TrackingLoggingSubscriber implements FlowableSubscriber { - private static final Logger LOGGER = LoggerFactory.getLogger(TrackingLoggingSubscriber.class); - - private final AtomicInteger pendingBatches = new AtomicInteger(0); - private final AtomicReference completionSubject = new AtomicReference<>(CompletableSubject.create()); - private final FlowableSubscriber delegate; - private volatile boolean completed = false; - - /** - * Creates a tracking subscriber that logs errors but doesn't delegate. - */ - public TrackingLoggingSubscriber() { - this(null); - } - - /** - * Creates a tracking subscriber that delegates to another subscriber. - * - * @param delegate The subscriber to delegate to, can be null - */ - public TrackingLoggingSubscriber(@Nullable FlowableSubscriber delegate) { - this.delegate = delegate; - } - - @Override - public void onSubscribe(@Nonnull Subscription s) { - // Request unlimited items to ensure all batches are processed - s.request(Long.MAX_VALUE); - if (delegate != null) { - delegate.onSubscribe(s); - } - } - - @Override - public void onNext(BatchSaveOperatingRS result) { - // Decrement pending batches when we receive a batch response - int remaining = pendingBatches.decrementAndGet(); - if (remaining < 0) { - LOGGER.warn( - "[{}] Pending batches counter went negative: {}. This indicates a bug in batch tracking.", - Thread.currentThread().getId(), - remaining - ); - // Reset to 0 to prevent issues - pendingBatches.set(0); - } - checkCompletion(); - if (delegate != null) { - delegate.onNext(result); - } - } - - @Override - public void onError(Throwable e) { - LOGGER.error("[{}] ReportPortal logging error", Thread.currentThread().getId(), e); - // Complete with error - CompletableSubject subject = completionSubject.get(); - if (subject != null && !subject.hasComplete() && !subject.hasThrowable()) { - subject.onError(e); - } - if (delegate != null) { - delegate.onError(e); - } - } - - @Override - public void onComplete() { - completed = true; - checkCompletion(); - if (delegate != null) { - delegate.onComplete(); - } - } - - /** - * Notifies that a log batch is being processed. - * This should be called when a batch is sent to the network layer. - */ - public void onBatchProcessing() { - int pending = pendingBatches.incrementAndGet(); - LOGGER.debug("[{}] Log batch processing started. Pending batches: {}", Thread.currentThread().getId(), pending); - } - - /** - * Returns a Completable that completes when all log processing is finished. - * This includes both the completion of the log stream and all pending batch requests. - * - * @return A Completable that signals when all logging is complete - */ - @Nonnull - public Completable getCompletion() { - return completionSubject.get(); - } - - /** - * Checks if all conditions for completion are met and completes the subject if so. - * This method handles the race condition between onComplete() and the last onNext() calls. - *

- * Race condition scenario: - * 1. Log stream emits completion (onComplete called, completed = true) - * 2. But there are still pending HTTP batch requests - * 3. When those HTTP responses arrive (onNext called), we need to check if we can now complete - *

- * Alternative scenario: - * 1. All HTTP responses arrive first (pendingBatches = 0) - * 2. Then log stream completes (onComplete called) - * 3. We need to complete immediately since all conditions are met - */ - private void checkCompletion() { - // Both conditions must be true: stream completed AND no pending batches - if (completed && pendingBatches.get() == 0) { - CompletableSubject subject = completionSubject.get(); - if (subject != null && !subject.hasComplete() && !subject.hasThrowable()) { - subject.onComplete(); - } - } - } -} \ No newline at end of file From a20f57cf3bf52cb7809f636340ce62f37949cf79 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:03:23 +0300 Subject: [PATCH 11/17] Change log waiting order --- .../epam/reportportal/service/LaunchImpl.java | 16 +++++----------- .../service/step/ManualNestedStepTest.java | 2 +- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index e3bd9d1d..f42f88a2 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -303,11 +303,10 @@ private void truncateAttributes(@Nonnull final FinishExecutionRQ rq) { * * @return {@link Completable} */ - public Completable completeLogEmitter() { + public Completable completeLogCompletables() { Completable items = Completable.merge(logCompletables); logCompletables.clear(); - logEmitter.onComplete(); - return Completable.concat(Arrays.asList(items, logEmitter.ignoreElements())); + return items; } /** @@ -407,7 +406,7 @@ protected void waitForItemsCompletion(Completable itemCompletable) { getLaunch().ignoreElement(), createVirtualItemCompletable(), itemCompletable, - completeLogEmitter() + completeLogCompletables() ); } @@ -421,13 +420,6 @@ public void finish(final FinishExecutionRQ request) { throw new InternalReportPortalClientException("Executor service is already shut down"); } - try { - // FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // Close and re-create statistics service getStatisticsService().close(); statisticsService = new StatisticsService(getParameters()); @@ -455,6 +447,8 @@ public void finish(final FinishExecutionRQ request) { d.dispose(); return true; }); + logEmitter.onComplete(); + waitForCompletable(logEmitter.ignoreElements()); } private static Maybe createErrorResponse(Throwable cause) { diff --git a/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java b/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java index ac61d8a9..6305e53d 100644 --- a/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java +++ b/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java @@ -322,7 +322,7 @@ public void verify_passed_actions_nested_step() { assertThat(nestedStepFinish.getStatus(), equalTo(ItemStatus.PASSED.name())); assertThat(nestedStepFinish.getEndTime(), notNullValue()); - launch.completeLogEmitter().blockingAwait(10, TimeUnit.SECONDS); + launch.completeLogCompletables().blockingAwait(10, TimeUnit.SECONDS); ArgumentCaptor> logCaptor = ArgumentCaptor.forClass(List.class); verify(client, timeout(1000).atLeastOnce()).log(logCaptor.capture()); List> logRequests = logCaptor.getAllValues() From 0c516a1d1bbe8a167fa7061cae074a89ec996388 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:17:29 +0300 Subject: [PATCH 12/17] Revert `onAssembly` to test --- src/main/java/com/epam/reportportal/service/LaunchImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index f42f88a2..74409137 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -884,13 +884,13 @@ public void log(@Nonnull final java.util.function.Function lo */ @Override public void log(@Nonnull final Maybe logItemUuid, @Nonnull final java.util.function.Function logSupplier) { - Maybe result = RxJavaPlugins.onAssembly(Maybe.zip( + Maybe result = Maybe.zip( getLaunch(), logItemUuid, (launchUuid, itemUuid) -> { SaveLogRQ rq = prepareRequest(launchUuid, logSupplier.apply(itemUuid)); emitLog(rq); return rq; } - ).cache()); + ).cache(); logCompletables.add(result.ignoreElement()); result.subscribe(SubscriptionUtils.logMaybeResults("Log item")); } From b584f90287cfff039d54ba0e9a7482b1dc321e13 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:20:13 +0300 Subject: [PATCH 13/17] Revert "Revert `onAssembly` to test" This reverts commit 0c516a1d1bbe8a167fa7061cae074a89ec996388. --- src/main/java/com/epam/reportportal/service/LaunchImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 74409137..f42f88a2 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -884,13 +884,13 @@ public void log(@Nonnull final java.util.function.Function lo */ @Override public void log(@Nonnull final Maybe logItemUuid, @Nonnull final java.util.function.Function logSupplier) { - Maybe result = Maybe.zip( + Maybe result = RxJavaPlugins.onAssembly(Maybe.zip( getLaunch(), logItemUuid, (launchUuid, itemUuid) -> { SaveLogRQ rq = prepareRequest(launchUuid, logSupplier.apply(itemUuid)); emitLog(rq); return rq; } - ).cache(); + ).cache()); logCompletables.add(result.ignoreElement()); result.subscribe(SubscriptionUtils.logMaybeResults("Log item")); } From 213e068974bb454b2645a20337ba4e7702c52113 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:22:49 +0300 Subject: [PATCH 14/17] A try to fix test --- .../reportportal/utils/formatting/ExceptionUtils.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java b/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java index 97259c75..9103052a 100644 --- a/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java +++ b/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java @@ -41,15 +41,16 @@ private ExceptionUtils() { */ public static String getStackTrace(Throwable throwable, Throwable baseThrowable, boolean preserveCause) { String[] mainFrames = org.apache.commons.lang3.exception.ExceptionUtils.getStackFrames(throwable); - Set baseFrames = Arrays.stream(org.apache.commons.lang3.exception.ExceptionUtils.getStackFrames(baseThrowable)).collect( - Collectors.toSet()); + Set baseFrames = Arrays.stream(org.apache.commons.lang3.exception.ExceptionUtils.getStackFrames(baseThrowable)) + .map(String::trim) + .collect(Collectors.toSet()); StringBuilder sb = new StringBuilder(); if (mainFrames.length > 0) { sb.append(mainFrames[0]).append(LINE_DELIMITER); boolean skipping = false; for (int i = 1; i < mainFrames.length; i++) { - String frame = mainFrames[i]; - if(baseFrames.contains(frame) && (!frame.startsWith("Caused by:") || !preserveCause)) { + String frame = mainFrames[i].trim(); + if (baseFrames.contains(frame) && (!frame.startsWith("Caused by:") || !preserveCause)) { if (!skipping) { sb.append(SKIP_TRACE_MARKER); skipping = true; @@ -66,7 +67,7 @@ public static String getStackTrace(Throwable throwable, Throwable baseThrowable, /** * Get stack trace of the throwable excluding the stack trace of the base throwable. * - * @param throwable Throwable to get stack trace from + * @param throwable Throwable to get stack trace from * @param baseThrowable Throwable to exclude stack trace from * @return Formatted stack trace */ From 4037445a7dc63dbb9837e025a99d609382fb0c32 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:30:31 +0300 Subject: [PATCH 15/17] Revert "A try to fix test" This reverts commit 213e068974bb454b2645a20337ba4e7702c52113. --- .../reportportal/utils/formatting/ExceptionUtils.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java b/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java index 9103052a..97259c75 100644 --- a/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java +++ b/src/main/java/com/epam/reportportal/utils/formatting/ExceptionUtils.java @@ -41,16 +41,15 @@ private ExceptionUtils() { */ public static String getStackTrace(Throwable throwable, Throwable baseThrowable, boolean preserveCause) { String[] mainFrames = org.apache.commons.lang3.exception.ExceptionUtils.getStackFrames(throwable); - Set baseFrames = Arrays.stream(org.apache.commons.lang3.exception.ExceptionUtils.getStackFrames(baseThrowable)) - .map(String::trim) - .collect(Collectors.toSet()); + Set baseFrames = Arrays.stream(org.apache.commons.lang3.exception.ExceptionUtils.getStackFrames(baseThrowable)).collect( + Collectors.toSet()); StringBuilder sb = new StringBuilder(); if (mainFrames.length > 0) { sb.append(mainFrames[0]).append(LINE_DELIMITER); boolean skipping = false; for (int i = 1; i < mainFrames.length; i++) { - String frame = mainFrames[i].trim(); - if (baseFrames.contains(frame) && (!frame.startsWith("Caused by:") || !preserveCause)) { + String frame = mainFrames[i]; + if(baseFrames.contains(frame) && (!frame.startsWith("Caused by:") || !preserveCause)) { if (!skipping) { sb.append(SKIP_TRACE_MARKER); skipping = true; @@ -67,7 +66,7 @@ public static String getStackTrace(Throwable throwable, Throwable baseThrowable, /** * Get stack trace of the throwable excluding the stack trace of the base throwable. * - * @param throwable Throwable to get stack trace from + * @param throwable Throwable to get stack trace from * @param baseThrowable Throwable to exclude stack trace from * @return Formatted stack trace */ From 1738036c7a68c3ee78675a4c4c3b27f071412a02 Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:30:47 +0300 Subject: [PATCH 16/17] A try to fix test --- src/main/java/com/epam/reportportal/service/ReportPortal.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/epam/reportportal/service/ReportPortal.java b/src/main/java/com/epam/reportportal/service/ReportPortal.java index ab883c0f..b439b4ee 100644 --- a/src/main/java/com/epam/reportportal/service/ReportPortal.java +++ b/src/main/java/com/epam/reportportal/service/ReportPortal.java @@ -415,6 +415,7 @@ public static boolean emitLaunchLog(final ReportPortalMessage message, final Str */ public static void sendStackTraceToRP(final Throwable cause) { ListenerParameters myParameters = ofNullable(Launch.currentLaunch()).map(Launch::getParameters).orElseGet(ListenerParameters::new); + Throwable base = new Throwable(); ReportPortal.emitLog(itemUuid -> { SaveLogRQ rq = new SaveLogRQ(); rq.setItemUuid(itemUuid); @@ -422,7 +423,7 @@ public static void sendStackTraceToRP(final Throwable cause) { rq.setLogTime(Calendar.getInstance().getTime()); if (cause != null) { if (myParameters.isExceptionTruncate()) { - rq.setMessage(getStackTrace(cause, new Throwable())); + rq.setMessage(getStackTrace(cause, base)); } else { rq.setMessage(ExceptionUtils.getStackTrace(cause)); } From dc0dade27e9f15d63da0997aa1d3429d72d218da Mon Sep 17 00:00:00 2001 From: Vadzim Hushchanskou Date: Mon, 26 May 2025 18:35:11 +0300 Subject: [PATCH 17/17] CHANGELOG.md update --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7846ba3..a40fa2be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ## [Unreleased] ### Added - Add Launch.log(Maybe, jFunction) method, by @HardNorth +### Changed +- Simplified `LoggingContext` class, by @HardNorth +### Fixed +- Log items wait mechanism, by @HardNorth ## [5.3.11] ### Fixed