Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog

## [Unreleased]
### Added
- Add Launch.log(Maybe<java.lang.String>, jFunction<String, SaveLogRQ>) method, by @HardNorth
### Changed
- Simplified `LoggingContext` class, by @HardNorth
### Fixed
- Log items wait mechanism, by @HardNorth

## [5.3.11]
### Fixed
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/com/epam/reportportal/service/Launch.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,27 @@ abstract public Maybe<String> startVirtualTestItem(final Maybe<String> parentId,
final StartTestItemRQ rq);

/**
* Logs message to the ReportPortal Launch, root item.
* // * Logs message to the ReportPortal Launch, provided request.
*
* @param rq Log request.
*/
abstract public void log(@Nonnull final SaveLogRQ rq);

/**
* Logs message to the ReportPortal Launch.
* Logs message to the ReportPortal Launch, request factory.
*
* @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID.
*/
abstract public void log(@Nonnull final Function<String, SaveLogRQ> logSupplier);

/**
* Logs message to the ReportPortal Launch, request factory with specified Test Item UUID.
*
* @param logItemUuid Test Item ID promise
* @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID.
*/
public abstract void log(@Nonnull Maybe<String> logItemUuid, @Nonnull Function<String, SaveLogRQ> logSupplier);

/**
* Finishes Test Item in ReportPortal asynchronously (non-blocking). Schedules finish after success of all child items.
*
Expand Down Expand Up @@ -284,6 +292,10 @@ public void log(@Nonnull SaveLogRQ rq) {
public void log(@Nonnull final Function<String, SaveLogRQ> logSupplier) {
}

@Override
public void log(@Nonnull Maybe<String> logItemUuid, @Nonnull Function<String, SaveLogRQ> logSupplier) {
}

@Override
@Nonnull
public Maybe<OperationCompletionRS> finishTestItem(Maybe<String> itemId, FinishTestItemRQ rq) {
Expand Down
43 changes: 29 additions & 14 deletions src/main/java/com/epam/reportportal/service/LaunchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class LaunchImpl extends Launch {
private static final int ITEM_FINISH_MAX_RETRIES = 10;
private static final int ITEM_FINISH_RETRY_TIMEOUT = 10;

private static final int LOG_REMOVE_FACTOR = 100;

private static final Predicate<Throwable> INTERNAL_CLIENT_EXCEPTION_PREDICATE = throwable -> throwable instanceof InternalReportPortalClientException;
private static final Predicate<Throwable> TEST_ITEM_FINISH_RETRY_PREDICATE = throwable -> (throwable instanceof ReportPortalException
&& ErrorType.FINISH_ITEM_NOT_ALLOWED.equals(((ReportPortalException) throwable).getError().getErrorType()))
Expand Down Expand Up @@ -301,11 +303,10 @@ private void truncateAttributes(@Nonnull final FinishExecutionRQ rq) {
*
* @return {@link Completable}
*/
public Completable completeLogEmitter() {
public Completable completeLogCompletables() {
Completable items = Completable.merge(logCompletables);
logCompletables.clear();
logEmitter.onComplete();
return Completable.concat(Arrays.asList(items, logEmitter.ignoreElements()));
return items;
}

/**
Expand Down Expand Up @@ -405,8 +406,7 @@ protected void waitForItemsCompletion(Completable itemCompletable) {
getLaunch().ignoreElement(),
createVirtualItemCompletable(),
itemCompletable,
LoggingContext.completed(),
completeLogEmitter()
completeLogCompletables()
);
}

Expand All @@ -420,13 +420,6 @@ public void finish(final FinishExecutionRQ request) {
throw new InternalReportPortalClientException("Executor service is already shut down");
}

try {
// FIXME: Find out a way to ensure that everything in Schedulers, Completables and in the middle were processed
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

// Close and re-create statistics service
getStatisticsService().close();
statisticsService = new StatisticsService(getParameters());
Expand Down Expand Up @@ -454,6 +447,8 @@ public void finish(final FinishExecutionRQ request) {
d.dispose();
return true;
});
logEmitter.onComplete();
waitForCompletable(logEmitter.ignoreElements());
}

private static <T> Maybe<T> createErrorResponse(Throwable cause) {
Expand Down Expand Up @@ -825,8 +820,7 @@ public Maybe<OperationCompletionRS> finishTestItem(final Maybe<String> item, fin

getStepReporter().removeParent(item);
LoggingContext.dispose();
int removeFactor = 100;
if (rq.hashCode() % removeFactor == 0) {
if (rq.hashCode() % LOG_REMOVE_FACTOR == 0) {
logCompletables.removeIf(c -> c.test().completions() > 0);
}
return finishResponse;
Expand Down Expand Up @@ -856,6 +850,7 @@ private void emitLog(@Nonnull final SaveLogRQ rq) {
*
* @param rq Log request.
*/
@Override
public void log(@Nonnull final SaveLogRQ rq) {
Maybe<SaveLogRQ> result = getLaunch().map(launchUuid -> {
emitLog(prepareRequest(launchUuid, rq));
Expand All @@ -870,6 +865,7 @@ public void log(@Nonnull final SaveLogRQ rq) {
*
* @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID.
*/
@Override
public void log(@Nonnull final java.util.function.Function<String, SaveLogRQ> logSupplier) {
Maybe<SaveLogRQ> result = getLaunch().map(launchUuid -> {
SaveLogRQ rq = prepareRequest(logSupplier.apply(launchUuid));
Expand All @@ -880,6 +876,25 @@ public void log(@Nonnull final java.util.function.Function<String, SaveLogRQ> lo
result.subscribe(SubscriptionUtils.logMaybeResults("Log item"));
}

/**
* Logs message to the ReportPortal Launch.
*
* @param logItemUuid Test Item ID promise
* @param logSupplier Log Message Factory. Argument of the function will be actual launch UUID.
*/
@Override
public void log(@Nonnull final Maybe<String> logItemUuid, @Nonnull final java.util.function.Function<String, SaveLogRQ> logSupplier) {
Maybe<SaveLogRQ> result = RxJavaPlugins.onAssembly(Maybe.zip(
getLaunch(), logItemUuid, (launchUuid, itemUuid) -> {
SaveLogRQ rq = prepareRequest(launchUuid, logSupplier.apply(itemUuid));
emitLog(rq);
return rq;
}
).cache());
logCompletables.add(result.ignoreElement());
result.subscribe(SubscriptionUtils.logMaybeResults("Log item"));
}

/**
* Wrapper around TestItem entity to be able to track parent and children items
*/
Expand Down
52 changes: 6 additions & 46 deletions src/main/java/com/epam/reportportal/service/LoggingContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@
*/
package com.epam.reportportal.service;

import com.epam.reportportal.utils.SubscriptionUtils;
import com.epam.ta.reportportal.ws.model.log.SaveLogRQ;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import org.apache.commons.lang3.tuple.Pair;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

import static java.util.Optional.ofNullable;

Expand All @@ -44,7 +43,6 @@
* @see LoggingContext#init(Maybe)
*/
public class LoggingContext {
private static final Queue<LoggingContext> USED_CONTEXTS = new ConcurrentLinkedQueue<>();
private static final ThreadLocal<Pair<Long, Deque<LoggingContext>>> CONTEXT_THREAD_LOCAL = new InheritableThreadLocal<>();

private static final Set<Long> THREAD_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
Expand Down Expand Up @@ -95,31 +93,9 @@ public static LoggingContext init(@Nonnull final Maybe<String> itemUuid) {
* Disposes current logging context
*/
public static void dispose() {
int removeFactor = 100;
ofNullable(getContext()).map(Deque::poll).ifPresent(context -> {
USED_CONTEXTS.add(context);
if (context.hashCode() % removeFactor == 0) {
USED_CONTEXTS.removeIf(ctx -> {
ctx.completables.removeIf(c -> c.test().completions() > 0);
return ctx.completables.isEmpty();
});
}
});
ofNullable(getContext()).ifPresent(Deque::poll);
}

public static Completable completed() {
Completable completable = Completable.merge(USED_CONTEXTS.stream().map(LoggingContext::complete).collect(Collectors.toList()));
Deque<LoggingContext> context = getContext();
return ofNullable(context).map(ctx -> Completable.concat(Arrays.asList(
completable,
Completable.merge(ctx.stream().map(LoggingContext::complete).collect(Collectors.toList()))
))).orElse(completable);
}

/**
* Messages queue to track items execution order
*/
private final Queue<Completable> completables = new ConcurrentLinkedQueue<>();
/* a UUID of TestItem in ReportPortal to report into */
private final Maybe<String> itemUuid;

Expand All @@ -138,12 +114,7 @@ public void emit(@Nonnull final Maybe<String> logItemUuid, @Nonnull final java.u
if (launch == null) {
return;
}
Maybe<String> future = logItemUuid.map(itemUuid -> {
launch.log(logSupplier.apply(itemUuid));
return itemUuid;
}).cache();
completables.add(future.ignoreElement());
future.subscribe(SubscriptionUtils.logMaybeResults("LoggingContext"));
launch.log(logItemUuid, logSupplier);
}

/**
Expand All @@ -154,15 +125,4 @@ public void emit(@Nonnull final Maybe<String> logItemUuid, @Nonnull final java.u
public void emit(@Nonnull final java.util.function.Function<String, SaveLogRQ> logSupplier) {
emit(itemUuid, logSupplier);
}

/**
* Complete the context.
*
* @return the instance will be completed when all logs are sent
*/
public Completable complete() {
Completable completable = Completable.merge(completables);
completables.clear();
return completable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,15 @@ public static boolean emitLaunchLog(final ReportPortalMessage message, final Str
*/
public static void sendStackTraceToRP(final Throwable cause) {
ListenerParameters myParameters = ofNullable(Launch.currentLaunch()).map(Launch::getParameters).orElseGet(ListenerParameters::new);
Throwable base = new Throwable();
ReportPortal.emitLog(itemUuid -> {
SaveLogRQ rq = new SaveLogRQ();
rq.setItemUuid(itemUuid);
rq.setLevel("ERROR");
rq.setLogTime(Calendar.getInstance().getTime());
if (cause != null) {
if (myParameters.isExceptionTruncate()) {
rq.setMessage(getStackTrace(cause, new Throwable()));
rq.setMessage(getStackTrace(cause, base));
} else {
rq.setMessage(ExceptionUtils.getStackTrace(cause));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public class BufferSubscriber implements FlowableSubscriber<SaveLogRQ>, Subscription {
private final ReentrantLock lock = new ReentrantLock();
private final Subscriber<List<SaveLogRQ>> downstream;
private final Subscriber<? super List<SaveLogRQ>> downstream;
private final int maxSize;
private final long payloadLimit;

Expand All @@ -41,7 +41,7 @@ public class BufferSubscriber implements FlowableSubscriber<SaveLogRQ>, Subscrip
private volatile Subscription upstream;
private volatile boolean done;

public BufferSubscriber(Subscriber<List<SaveLogRQ>> actual, int batchMaxSize, long batchPayloadLimit) {
public BufferSubscriber(Subscriber<? super List<SaveLogRQ>> actual, int batchMaxSize, long batchPayloadLimit) {
downstream = actual;
maxSize = batchMaxSize;
payloadLimit = batchPayloadLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public LogBatchingFlowable(Flowable<SaveLogRQ> flowableSource, ListenerParameter

@Override
protected void subscribeActual(Subscriber<? super List<SaveLogRQ>> s) {
source.subscribe(new BufferSubscriber(new SerializedSubscriber<>(s), maxSize, payloadLimit));
source.subscribe(new BufferSubscriber(s, maxSize, payloadLimit));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public void verify_passed_actions_nested_step() {
assertThat(nestedStepFinish.getStatus(), equalTo(ItemStatus.PASSED.name()));
assertThat(nestedStepFinish.getEndTime(), notNullValue());

launch.completeLogEmitter().blockingAwait(10, TimeUnit.SECONDS);
launch.completeLogCompletables().blockingAwait(10, TimeUnit.SECONDS);
ArgumentCaptor<List<MultipartBody.Part>> logCaptor = ArgumentCaptor.forClass(List.class);
verify(client, timeout(1000).atLeastOnce()).log(logCaptor.capture());
List<Pair<String, String>> logRequests = logCaptor.getAllValues()
Expand Down