diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e299823..a40fa2be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,12 @@ # Changelog ## [Unreleased] +### Added +- Add Launch.log(Maybe, jFunction) method, by @HardNorth +### Changed +- Simplified `LoggingContext` class, by @HardNorth +### Fixed +- Log items wait mechanism, by @HardNorth ## [5.3.11] ### Fixed diff --git a/src/main/java/com/epam/reportportal/service/Launch.java b/src/main/java/com/epam/reportportal/service/Launch.java index 5627690a..82fe7b25 100644 --- a/src/main/java/com/epam/reportportal/service/Launch.java +++ b/src/main/java/com/epam/reportportal/service/Launch.java @@ -145,19 +145,27 @@ abstract public Maybe startVirtualTestItem(final Maybe parentId, final StartTestItemRQ rq); /** - * Logs message to the ReportPortal Launch, root item. + * // * Logs message to the ReportPortal Launch, provided request. * * @param rq Log request. */ abstract public void log(@Nonnull final SaveLogRQ rq); /** - * Logs message to the ReportPortal Launch. + * Logs message to the ReportPortal Launch, request factory. * * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. */ abstract public void log(@Nonnull final Function logSupplier); + /** + * Logs message to the ReportPortal Launch, request factory with specified Test Item UUID. + * + * @param logItemUuid Test Item ID promise + * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. + */ + public abstract void log(@Nonnull Maybe logItemUuid, @Nonnull Function logSupplier); + /** * Finishes Test Item in ReportPortal asynchronously (non-blocking). Schedules finish after success of all child items. * @@ -284,6 +292,10 @@ public void log(@Nonnull SaveLogRQ rq) { public void log(@Nonnull final Function logSupplier) { } + @Override + public void log(@Nonnull Maybe logItemUuid, @Nonnull Function logSupplier) { + } + @Override @Nonnull public Maybe finishTestItem(Maybe itemId, FinishTestItemRQ rq) { diff --git a/src/main/java/com/epam/reportportal/service/LaunchImpl.java b/src/main/java/com/epam/reportportal/service/LaunchImpl.java index 32687807..f42f88a2 100644 --- a/src/main/java/com/epam/reportportal/service/LaunchImpl.java +++ b/src/main/java/com/epam/reportportal/service/LaunchImpl.java @@ -84,6 +84,8 @@ public class LaunchImpl extends Launch { private static final int ITEM_FINISH_MAX_RETRIES = 10; private static final int ITEM_FINISH_RETRY_TIMEOUT = 10; + private static final int LOG_REMOVE_FACTOR = 100; + private static final Predicate INTERNAL_CLIENT_EXCEPTION_PREDICATE = throwable -> throwable instanceof InternalReportPortalClientException; private static final Predicate TEST_ITEM_FINISH_RETRY_PREDICATE = throwable -> (throwable instanceof ReportPortalException && ErrorType.FINISH_ITEM_NOT_ALLOWED.equals(((ReportPortalException) throwable).getError().getErrorType())) @@ -301,11 +303,10 @@ private void truncateAttributes(@Nonnull final FinishExecutionRQ rq) { * * @return {@link Completable} */ - public Completable completeLogEmitter() { + public Completable completeLogCompletables() { Completable items = Completable.merge(logCompletables); logCompletables.clear(); - logEmitter.onComplete(); - return Completable.concat(Arrays.asList(items, logEmitter.ignoreElements())); + return items; } /** @@ -405,8 +406,7 @@ protected void waitForItemsCompletion(Completable itemCompletable) { getLaunch().ignoreElement(), createVirtualItemCompletable(), itemCompletable, - LoggingContext.completed(), - completeLogEmitter() + completeLogCompletables() ); } @@ -420,13 +420,6 @@ public void finish(final FinishExecutionRQ request) { throw new InternalReportPortalClientException("Executor service is already shut down"); } - try { - // FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - // Close and re-create statistics service getStatisticsService().close(); statisticsService = new StatisticsService(getParameters()); @@ -454,6 +447,8 @@ public void finish(final FinishExecutionRQ request) { d.dispose(); return true; }); + logEmitter.onComplete(); + waitForCompletable(logEmitter.ignoreElements()); } private static Maybe createErrorResponse(Throwable cause) { @@ -825,8 +820,7 @@ public Maybe finishTestItem(final Maybe item, fin getStepReporter().removeParent(item); LoggingContext.dispose(); - int removeFactor = 100; - if (rq.hashCode() % removeFactor == 0) { + if (rq.hashCode() % LOG_REMOVE_FACTOR == 0) { logCompletables.removeIf(c -> c.test().completions() > 0); } return finishResponse; @@ -856,6 +850,7 @@ private void emitLog(@Nonnull final SaveLogRQ rq) { * * @param rq Log request. */ + @Override public void log(@Nonnull final SaveLogRQ rq) { Maybe result = getLaunch().map(launchUuid -> { emitLog(prepareRequest(launchUuid, rq)); @@ -870,6 +865,7 @@ public void log(@Nonnull final SaveLogRQ rq) { * * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. */ + @Override public void log(@Nonnull final java.util.function.Function logSupplier) { Maybe result = getLaunch().map(launchUuid -> { SaveLogRQ rq = prepareRequest(logSupplier.apply(launchUuid)); @@ -880,6 +876,25 @@ public void log(@Nonnull final java.util.function.Function lo result.subscribe(SubscriptionUtils.logMaybeResults("Log item")); } + /** + * Logs message to the ReportPortal Launch. + * + * @param logItemUuid Test Item ID promise + * @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID. + */ + @Override + public void log(@Nonnull final Maybe logItemUuid, @Nonnull final java.util.function.Function logSupplier) { + Maybe result = RxJavaPlugins.onAssembly(Maybe.zip( + getLaunch(), logItemUuid, (launchUuid, itemUuid) -> { + SaveLogRQ rq = prepareRequest(launchUuid, logSupplier.apply(itemUuid)); + emitLog(rq); + return rq; + } + ).cache()); + logCompletables.add(result.ignoreElement()); + result.subscribe(SubscriptionUtils.logMaybeResults("Log item")); + } + /** * Wrapper around TestItem entity to be able to track parent and children items */ diff --git a/src/main/java/com/epam/reportportal/service/LoggingContext.java b/src/main/java/com/epam/reportportal/service/LoggingContext.java index 5c0df685..348a828c 100644 --- a/src/main/java/com/epam/reportportal/service/LoggingContext.java +++ b/src/main/java/com/epam/reportportal/service/LoggingContext.java @@ -15,19 +15,18 @@ */ package com.epam.reportportal.service; -import com.epam.reportportal.utils.SubscriptionUtils; import com.epam.ta.reportportal.ws.model.log.SaveLogRQ; -import io.reactivex.Completable; import io.reactivex.Flowable; import io.reactivex.Maybe; import org.apache.commons.lang3.tuple.Pair; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.util.*; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.Collectors; import static java.util.Optional.ofNullable; @@ -44,7 +43,6 @@ * @see LoggingContext#init(Maybe) */ public class LoggingContext { - private static final Queue USED_CONTEXTS = new ConcurrentLinkedQueue<>(); private static final ThreadLocal>> CONTEXT_THREAD_LOCAL = new InheritableThreadLocal<>(); private static final Set THREAD_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -95,31 +93,9 @@ public static LoggingContext init(@Nonnull final Maybe itemUuid) { * Disposes current logging context */ public static void dispose() { - int removeFactor = 100; - ofNullable(getContext()).map(Deque::poll).ifPresent(context -> { - USED_CONTEXTS.add(context); - if (context.hashCode() % removeFactor == 0) { - USED_CONTEXTS.removeIf(ctx -> { - ctx.completables.removeIf(c -> c.test().completions() > 0); - return ctx.completables.isEmpty(); - }); - } - }); + ofNullable(getContext()).ifPresent(Deque::poll); } - public static Completable completed() { - Completable completable = Completable.merge(USED_CONTEXTS.stream().map(LoggingContext::complete).collect(Collectors.toList())); - Deque context = getContext(); - return ofNullable(context).map(ctx -> Completable.concat(Arrays.asList( - completable, - Completable.merge(ctx.stream().map(LoggingContext::complete).collect(Collectors.toList())) - ))).orElse(completable); - } - - /** - * Messages queue to track items execution order - */ - private final Queue completables = new ConcurrentLinkedQueue<>(); /* a UUID of TestItem in ReportPortal to report into */ private final Maybe itemUuid; @@ -138,12 +114,7 @@ public void emit(@Nonnull final Maybe logItemUuid, @Nonnull final java.u if (launch == null) { return; } - Maybe future = logItemUuid.map(itemUuid -> { - launch.log(logSupplier.apply(itemUuid)); - return itemUuid; - }).cache(); - completables.add(future.ignoreElement()); - future.subscribe(SubscriptionUtils.logMaybeResults("LoggingContext")); + launch.log(logItemUuid, logSupplier); } /** @@ -154,15 +125,4 @@ public void emit(@Nonnull final Maybe logItemUuid, @Nonnull final java.u public void emit(@Nonnull final java.util.function.Function logSupplier) { emit(itemUuid, logSupplier); } - - /** - * Complete the context. - * - * @return the instance will be completed when all logs are sent - */ - public Completable complete() { - Completable completable = Completable.merge(completables); - completables.clear(); - return completable; - } } diff --git a/src/main/java/com/epam/reportportal/service/ReportPortal.java b/src/main/java/com/epam/reportportal/service/ReportPortal.java index ab883c0f..b439b4ee 100644 --- a/src/main/java/com/epam/reportportal/service/ReportPortal.java +++ b/src/main/java/com/epam/reportportal/service/ReportPortal.java @@ -415,6 +415,7 @@ public static boolean emitLaunchLog(final ReportPortalMessage message, final Str */ public static void sendStackTraceToRP(final Throwable cause) { ListenerParameters myParameters = ofNullable(Launch.currentLaunch()).map(Launch::getParameters).orElseGet(ListenerParameters::new); + Throwable base = new Throwable(); ReportPortal.emitLog(itemUuid -> { SaveLogRQ rq = new SaveLogRQ(); rq.setItemUuid(itemUuid); @@ -422,7 +423,7 @@ public static void sendStackTraceToRP(final Throwable cause) { rq.setLogTime(Calendar.getInstance().getTime()); if (cause != null) { if (myParameters.isExceptionTruncate()) { - rq.setMessage(getStackTrace(cause, new Throwable())); + rq.setMessage(getStackTrace(cause, base)); } else { rq.setMessage(ExceptionUtils.getStackTrace(cause)); } diff --git a/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java b/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java index 0e414dd6..37b00069 100644 --- a/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java +++ b/src/main/java/com/epam/reportportal/service/logs/BufferSubscriber.java @@ -32,7 +32,7 @@ */ public class BufferSubscriber implements FlowableSubscriber, Subscription { private final ReentrantLock lock = new ReentrantLock(); - private final Subscriber> downstream; + private final Subscriber> downstream; private final int maxSize; private final long payloadLimit; @@ -41,7 +41,7 @@ public class BufferSubscriber implements FlowableSubscriber, Subscrip private volatile Subscription upstream; private volatile boolean done; - public BufferSubscriber(Subscriber> actual, int batchMaxSize, long batchPayloadLimit) { + public BufferSubscriber(Subscriber> actual, int batchMaxSize, long batchPayloadLimit) { downstream = actual; maxSize = batchMaxSize; payloadLimit = batchPayloadLimit; diff --git a/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java b/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java index 730b39b1..00858534 100644 --- a/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java +++ b/src/main/java/com/epam/reportportal/service/logs/LogBatchingFlowable.java @@ -45,7 +45,7 @@ public LogBatchingFlowable(Flowable flowableSource, ListenerParameter @Override protected void subscribeActual(Subscriber> s) { - source.subscribe(new BufferSubscriber(new SerializedSubscriber<>(s), maxSize, payloadLimit)); + source.subscribe(new BufferSubscriber(s, maxSize, payloadLimit)); } @Override diff --git a/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java b/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java index ac61d8a9..6305e53d 100644 --- a/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java +++ b/src/test/java/com/epam/reportportal/service/step/ManualNestedStepTest.java @@ -322,7 +322,7 @@ public void verify_passed_actions_nested_step() { assertThat(nestedStepFinish.getStatus(), equalTo(ItemStatus.PASSED.name())); assertThat(nestedStepFinish.getEndTime(), notNullValue()); - launch.completeLogEmitter().blockingAwait(10, TimeUnit.SECONDS); + launch.completeLogCompletables().blockingAwait(10, TimeUnit.SECONDS); ArgumentCaptor> logCaptor = ArgumentCaptor.forClass(List.class); verify(client, timeout(1000).atLeastOnce()).log(logCaptor.capture()); List> logRequests = logCaptor.getAllValues()