Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.graylog.collectors.db.CoalescedActions;
import org.graylog.collectors.db.FleetReassignedPayload;
import org.graylog.collectors.db.MarkerPayload;
import org.graylog.collectors.db.MarkerType;
import org.graylog.collectors.db.TransactionMarker;
import org.graylog2.database.MongoCollections;
Expand All @@ -39,32 +41,37 @@
import java.util.List;
import java.util.Set;

import static org.graylog.collectors.db.TransactionMarker.FIELD_CREATED_AT;
import static org.graylog.collectors.db.TransactionMarker.FIELD_CREATED_BY;
import static org.graylog.collectors.db.TransactionMarker.FIELD_CREATED_BY_USER;
import static org.graylog.collectors.db.TransactionMarker.FIELD_ID;
import static org.graylog.collectors.db.TransactionMarker.FIELD_PAYLOAD;
import static org.graylog.collectors.db.TransactionMarker.FIELD_TARGET;
import static org.graylog.collectors.db.TransactionMarker.FIELD_TARGET_ID;
import static org.graylog.collectors.db.TransactionMarker.FIELD_TYPE;
import static org.graylog.collectors.db.TransactionMarker.TARGET_COLLECTOR;
import static org.graylog.collectors.db.TransactionMarker.TARGET_FLEET;

@Singleton
public class FleetTransactionLogService {

static final String COLLECTION_NAME = "collector_fleet_transaction_log";
static final String SEQUENCE_TOPIC = "fleet_txn_log";
static final String FIELD_TARGET = "target";
static final String FIELD_TARGET_ID = "target_id";
static final String FIELD_TYPE = "type";
static final String FIELD_PAYLOAD = "payload";
static final String FIELD_CREATED_AT = "created_at";
static final String FIELD_CREATED_BY = "created_by";
static final String FIELD_CREATED_BY_USER = "created_by_user";

/**
* The maximum number of bulk action targets we allow for a transaction log entry.
*/
public static final int MAX_BULK_TARGET_SIZE = 100;

private final MongoCollection<Document> collection;
private final MongoCollection<TransactionMarker> collection;
private final MongoSequenceService sequenceService;
private final NodeId nodeId;

@Inject
public FleetTransactionLogService(MongoCollections mongoCollections,
MongoSequenceService sequenceService,
NodeId nodeId) {
this.collection = mongoCollections.nonEntityCollection(COLLECTION_NAME, Document.class)
this.collection = mongoCollections.nonEntityCollection(COLLECTION_NAME, TransactionMarker.class)
.withWriteConcern(WriteConcern.JOURNALED);
this.sequenceService = sequenceService;
this.nodeId = nodeId;
Expand All @@ -73,7 +80,7 @@ public FleetTransactionLogService(MongoCollections mongoCollections,
collection.createIndex(
Indexes.compoundIndex(
Indexes.ascending(FIELD_TARGET, FIELD_TARGET_ID),
Indexes.ascending("_id")
Indexes.ascending(FIELD_ID)
)
);
}
Expand All @@ -84,27 +91,27 @@ public long appendFleetMarker(String fleetId, MarkerType type) {
}

public long appendFleetMarker(Set<String> fleetIds, MarkerType type) {
return appendMarker(TransactionMarker.TARGET_FLEET, fleetIds, type, null);
return appendMarker(TARGET_FLEET, fleetIds, type, null);
}

public long appendCollectorMarker(Set<String> instanceUids, MarkerType type, @Nullable Document payload) {
public long appendCollectorMarker(Set<String> instanceUids, MarkerType type, @Nullable MarkerPayload payload) {
if (instanceUids == null || instanceUids.isEmpty()) {
throw new IllegalArgumentException("instanceUids must not be empty");
}
if (instanceUids.size() > MAX_BULK_TARGET_SIZE) {
throw new IllegalArgumentException("instanceUids must not exceed " + MAX_BULK_TARGET_SIZE + " elements, got " + instanceUids.size());
}
return appendMarker(TransactionMarker.TARGET_COLLECTOR, instanceUids, type, payload);
return appendMarker(TARGET_COLLECTOR, instanceUids, type, payload);
}

private long appendMarker(String target, Set<String> targetIds, MarkerType type, @Nullable Document payload) {
private long appendMarker(String target, Set<String> targetIds, MarkerType type, @Nullable MarkerPayload payload) {
if (targetIds == null || targetIds.isEmpty() || targetIds.stream().noneMatch(StringUtils::isNotBlank)) {
throw new IllegalArgumentException("targetIds must not be empty");
}

final long seq = sequenceService.incrementAndGet(SEQUENCE_TOPIC);
collection.updateOne(
Filters.eq("_id", seq),
Filters.eq(FIELD_ID, seq),
Updates.combine(
Updates.set(FIELD_TARGET, target),
Updates.set(FIELD_TARGET_ID, targetIds),
Expand All @@ -126,51 +133,35 @@ public List<TransactionMarker> getUnprocessedMarkers(@Nullable String fleetId,
throw new IllegalArgumentException("At least one of fleetId or instanceUid must be non-null");
}

final Bson seqFilter = Filters.gt("_id", lastProcessedSeq);
final Bson seqFilter = Filters.gt(FIELD_ID, lastProcessedSeq);
final Bson filter;

if (fleetId != null && instanceUid != null) {
filter = Filters.and(seqFilter, Filters.or(
Filters.and(
Filters.eq(FIELD_TARGET, TransactionMarker.TARGET_FLEET),
Filters.eq(FIELD_TARGET, TARGET_FLEET),
Filters.eq(FIELD_TARGET_ID, fleetId)
),
Filters.and(
Filters.eq(FIELD_TARGET, TransactionMarker.TARGET_COLLECTOR),
Filters.eq(FIELD_TARGET, TARGET_COLLECTOR),
Filters.eq(FIELD_TARGET_ID, instanceUid)
)
));
} else if (fleetId != null) {
filter = Filters.and(seqFilter,
Filters.eq(FIELD_TARGET, TransactionMarker.TARGET_FLEET),
Filters.eq(FIELD_TARGET, TARGET_FLEET),
Filters.eq(FIELD_TARGET_ID, fleetId));
} else {
filter = Filters.and(seqFilter,
Filters.eq(FIELD_TARGET, TransactionMarker.TARGET_COLLECTOR),
Filters.eq(FIELD_TARGET, TARGET_COLLECTOR),
Filters.eq(FIELD_TARGET_ID, instanceUid));
}

return collection.find(filter)
.sort(new Document("_id", 1))
.map(this::documentToMarker)
.sort(Sorts.ascending(FIELD_ID))
.into(new ArrayList<>());
}

private TransactionMarker documentToMarker(Document doc) {
final String rawType = doc.getString(FIELD_TYPE);
final var createdAt = doc.getDate(FIELD_CREATED_AT);
return new TransactionMarker(
doc.getLong("_id"),
doc.getString(FIELD_TARGET),
Set.copyOf(doc.getList(FIELD_TARGET_ID, String.class)),
MarkerType.fromString(rawType),
rawType,
doc.get(FIELD_PAYLOAD, Document.class),
createdAt != null ? createdAt.toInstant() : null,
doc.getString(FIELD_CREATED_BY_USER)
);
}

@Nullable
private static String resolveCurrentUsername() {
try {
Expand All @@ -194,9 +185,8 @@ public List<TransactionMarker> getRecentMarkers(int limit) {
MarkerType.FLEET_REASSIGNED.name());

return collection.find(filter)
.sort(new Document("_id", -1))
.sort(Sorts.descending(FIELD_ID))
.limit(limit)
.map(this::documentToMarker)
.into(new ArrayList<>());
}

Expand Down Expand Up @@ -233,13 +223,13 @@ static CoalescedActions doCoalesce(List<TransactionMarker> markers) {
if (latestReassignment != null) {
// Fleet reassignment: recompute config from new fleet, discard fleet-level markers
recomputeConfig = true;
newFleetId = latestReassignment.payload() != null
? latestReassignment.payload().getString("new_fleet_id")
newFleetId = latestReassignment.payload() instanceof FleetReassignedPayload(String fleetId)
? fleetId
: null;

// Only process collector-level commands (fleet-level ones are from old fleet)
for (var marker : markers) {
if (TransactionMarker.TARGET_COLLECTOR.equals(marker.target())) {
if (TARGET_COLLECTOR.equals(marker.target())) {
switch (marker.type()) {
case RESTART -> restart = true;
case DISCOVERY_RUN -> runDiscovery = true;
Expand Down Expand Up @@ -267,7 +257,7 @@ static CoalescedActions doCoalesce(List<TransactionMarker> markers) {
}

// Package-private for test access
MongoCollection<Document> getCollection() {
MongoCollection<TransactionMarker> getCollection() {
return collection;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("FLEET_REASSIGNED")
public record FleetReassignedPayload(
@JsonProperty("new_fleet_id") String newFleetId
) implements MarkerPayload {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.collectors.db;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import static org.graylog.collectors.db.TransactionMarker.FIELD_TYPE;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXTERNAL_PROPERTY, property = FIELD_TYPE, defaultImpl = Void.class)
@JsonSubTypes(@JsonSubTypes.Type(FleetReassignedPayload.class))
public sealed interface MarkerPayload permits FleetReassignedPayload {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.graylog.collectors.db;

import com.fasterxml.jackson.annotation.JsonCreator;

/**
* Types of markers in the fleet transaction log. Stored as strings in MongoDB.
* Unknown values (from newer server versions) are parsed as {@link #UNKNOWN} to ensure
* forward compatibility across cluster upgrades.
*
* This is persisted in a non-entity collection, thus it has no Jackson mapping.
*/
public enum MarkerType {
CONFIG_CHANGED,
Expand All @@ -35,6 +35,7 @@ public enum MarkerType {
* Parse a marker type from its string representation, returning {@link #UNKNOWN}
* for unrecognized values instead of throwing.
*/
@JsonCreator
public static MarkerType fromString(String value) {
try {
return valueOf(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,43 @@
*/
package org.graylog.collectors.db;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;
import org.bson.Document;
import org.mongojack.Id;

import java.time.Instant;
import java.util.Set;

/**
* A raw marker entry from the fleet transaction log.
* A marker entry from the fleet transaction log.
*
* This is persisted in a non-entity collection, thus it has no Jackson mapping.
*
* @param seq sequence number (the _id in MongoDB)
* @param target "fleet" or "collector"
* @param targetIds fleet IDs or collector instance UIDs (always a set, even for single targets)
* @param type parsed marker type
* @param rawType original string from MongoDB (for logging unknown types)
* @param payload optional type-specific data (e.g., new_fleet_id for FLEET_REASSIGNED)
* @param createdAt timestamp when the marker was created
* @param createdByUser username of the actor, or null for system-initiated actions
* @param seq sequence number (the _id in MongoDB)
* @param target "fleet" or "collector"
* @param targetIds fleet IDs or collector instance UIDs (always a set, even for single targets)
* @param type parsed marker type
* @param payload optional type-specific data (e.g., new_fleet_id for FLEET_REASSIGNED)
* @param createdAt timestamp when the marker was created
* @param createdBy node ID of the server that created the marker
* @param createdByUser username of the actor, or null for system-initiated actions
*/
public record TransactionMarker(long seq,
String target,
Set<String> targetIds,
MarkerType type,
String rawType,
@Nullable Document payload,
@Nullable Instant createdAt,
@Nullable String createdByUser) {
public record TransactionMarker(
@Id @JsonProperty(FIELD_ID) long seq,
@JsonProperty(FIELD_TARGET) String target,
@JsonProperty(FIELD_TARGET_ID) Set<String> targetIds,
@JsonProperty(FIELD_TYPE) MarkerType type,
@JsonProperty(FIELD_PAYLOAD) @Nullable MarkerPayload payload,
@JsonProperty(FIELD_CREATED_AT) @Nullable Instant createdAt,
@JsonProperty(FIELD_CREATED_BY) @Nullable String createdBy,
@JsonProperty(FIELD_CREATED_BY_USER) @Nullable String createdByUser) {

public static final String FIELD_ID = "_id";
public static final String FIELD_TARGET = "target";
public static final String FIELD_TARGET_ID = "target_id";
public static final String FIELD_TYPE = "type";
public static final String FIELD_PAYLOAD = "payload";
public static final String FIELD_CREATED_AT = "created_at";
public static final String FIELD_CREATED_BY = "created_by";
public static final String FIELD_CREATED_BY_USER = "created_by_user";

public static final String TARGET_FLEET = "fleet";
public static final String TARGET_COLLECTOR = "collector";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.bson.Document;
import org.graylog.collectors.db.FleetReassignedPayload;
import org.bson.conversions.Bson;
import org.graylog.collectors.CollectorInstanceService;
import org.graylog.collectors.CollectorsConfigService;
Expand Down Expand Up @@ -316,7 +316,7 @@ public Response reassignInstances(@Valid @NotNull @RequestBody(required = true,
txnLogService.appendCollectorMarker(
permittedInstanceUids,
MarkerType.FLEET_REASSIGNED,
new Document("new_fleet_id", targetFleetId));
new FleetReassignedPayload(targetFleetId));

final AuditActor auditActor = AuditActor.user(getCurrentUser());
permittedInstances.values().forEach(dto ->
Expand Down
Loading
Loading