From 32f5f15613b4eda275f33c09e050e3971e35db23 Mon Sep 17 00:00:00 2001 From: George Rybakov Date: Thu, 18 Jun 2026 15:53:20 +0300 Subject: [PATCH] backup: implement manifest aggregation Add backup manifest data types, fragment decoding, aggregation, validation, warnings, and unit tests with testdata fixtures. Closes TNTP-8208 --- lib/backup/aggregation.go | 255 ++++++++++++++++ lib/backup/aggregation_test.go | 195 +++++++++++++ lib/backup/fixtures_test.go | 65 +++++ lib/backup/manifest.go | 59 ++++ lib/backup/testdata/cluster_manifest.json | 109 +++++++ lib/backup/testdata/fragment_a.json | 18 ++ lib/backup/testdata/fragment_b.json | 18 ++ .../fragment_with_empty_recovery_points.json | 15 + .../fragment_without_recovery_points.json | 14 + lib/backup/types.go | 29 ++ lib/backup/validation.go | 179 ++++++++++++ lib/backup/validation_test.go | 274 ++++++++++++++++++ lib/backup/warnings.go | 67 +++++ 13 files changed, 1297 insertions(+) create mode 100644 lib/backup/aggregation.go create mode 100644 lib/backup/aggregation_test.go create mode 100644 lib/backup/fixtures_test.go create mode 100644 lib/backup/manifest.go create mode 100644 lib/backup/testdata/cluster_manifest.json create mode 100644 lib/backup/testdata/fragment_a.json create mode 100644 lib/backup/testdata/fragment_b.json create mode 100644 lib/backup/testdata/fragment_with_empty_recovery_points.json create mode 100644 lib/backup/testdata/fragment_without_recovery_points.json create mode 100644 lib/backup/types.go create mode 100644 lib/backup/validation.go create mode 100644 lib/backup/validation_test.go create mode 100644 lib/backup/warnings.go diff --git a/lib/backup/aggregation.go b/lib/backup/aggregation.go new file mode 100644 index 000000000..f3d26bda0 --- /dev/null +++ b/lib/backup/aggregation.go @@ -0,0 +1,255 @@ +package backup + +import ( + "encoding/json" + "fmt" + "time" +) + +const artifactCompression = "zstd" + +// RecoveryPoint describes one engine recovery point returned by Tarantool. +type RecoveryPoint struct { + UUID string `json:"uuid"` + ReplicaID uint32 `json:"replica_id"` + LSN uint64 `json:"lsn"` + Timestamp int64 `json:"timestamp"` // unix-time +} + +// Fragment is a per-replicaset backup description stored in an instance archive. +type Fragment struct { + ReplicasetUUID string `json:"replicaset_uuid"` + InstanceUUID string `json:"instance_uuid"` + InstanceName string `json:"instance_name"` + Hostname string `json:"hostname"` + Type BackupType `json:"type"` + VclockBegin Vclock `json:"vclock_begin"` + VclockEnd Vclock `json:"vclock_end"` + Files []string `json:"files"` + ChecksumSHA256 string `json:"checksum_sha256"` + RecoveryPoints []*RecoveryPoint `json:"recovery_points,omitempty"` +} + +// AggregateInput contains all external data needed to build a manifest. +type AggregateInput struct { + BackupID BackupID + BaseFullBackupID BackupID + PreviousBackupID BackupID + CreationTime time.Time + CreationDuration time.Duration + Topology Topology + Shards []*ShardInput +} + +// ShardInput describes one expected replicaset backup result. +type ShardInput struct { + ReplicasetUUID string + Fragment *Fragment + Location *ArtifactLocation + Err error +} + +// ArtifactLocation identifies an already uploaded shard archive. +type ArtifactLocation struct { + Path string + SizeBytes int64 +} + +// DecodeFragment decodes and validates one instance_backup.json payload. +func DecodeFragment(data []byte) (*Fragment, error) { + var fragment Fragment + + if err := json.Unmarshal(data, &fragment); err != nil { + return nil, fmt.Errorf("decode fragment: %w", err) + } + + if err := fragment.Validate(); err != nil { + return nil, fmt.Errorf("validate fragment: %w", err) + } + + return &fragment, nil +} + +// NewAggregateInput collects backup metadata, topology and shard inputs. +func NewAggregateInput( + backupID BackupID, + previousBackupID BackupID, + baseFullBackupID BackupID, + creationTime time.Time, + creationDuration time.Duration, + topology Topology, + shards []*ShardInput, +) AggregateInput { + return AggregateInput{ + BackupID: backupID, + PreviousBackupID: previousBackupID, + BaseFullBackupID: baseFullBackupID, + CreationTime: creationTime, + CreationDuration: creationDuration, + Topology: topology, + Shards: shards, + } +} + +// Aggregate builds and validates a cluster manifest from shard fragments. +func Aggregate(in AggregateInput) (*ClusterManifest, error) { + manifest := newClusterManifest(in) + + for _, shardInput := range in.Shards { + if err := aggregateShard(manifest, shardInput); err != nil { + return nil, fmt.Errorf("aggregate shard: %w", err) + } + } + + manifest.Status = calculateStatus(manifest) + if err := manifest.Validate(); err != nil { + return nil, fmt.Errorf("validate cluster manifest: %w", err) + } + + return manifest, nil +} + +// newClusterManifest initializes a manifest with immutable aggregate metadata. +func newClusterManifest(in AggregateInput) *ClusterManifest { + return &ClusterManifest{ + SchemaVersion: SchemaVersion, + BackupID: in.BackupID, + PreviousBackupID: in.PreviousBackupID, + BaseFullBackupID: in.BaseFullBackupID, + Status: StatusFailed, + CreationTime: in.CreationTime, + CreationDuration: in.CreationDuration, + Shards: make(map[string]Shard, len(in.Shards)), + Topology: in.Topology, + Warnings: make([]Warning, 0), + } +} + +// aggregateShard adds one shard input to the manifest. +func aggregateShard(manifest *ClusterManifest, shardInput *ShardInput) error { + replicasetUUID := shardInput.ReplicasetUUID + + if shardInput.Fragment == nil { + aggregateFailedShard(manifest, replicasetUUID, shardInput.Err) + return nil + } + + if shardInput.Err != nil { + manifest.Shards[replicasetUUID] = Shard{Error: shardInput.Err.Error()} + manifest.Warnings = append( + manifest.Warnings, + NewShardPartialWarning( + replicasetUUID, + shardInput.Fragment.InstanceUUID, + shardInput.Err.Error(), + ), + ) + return nil + } + + if shardInput.Fragment.ReplicasetUUID != replicasetUUID { + return fmt.Errorf( + "fragment replicaset_uuid %q does not match shard input replicaset_uuid %q", + shardInput.Fragment.ReplicasetUUID, + replicasetUUID, + ) + } + + aggregateSuccessfulShard(manifest, replicasetUUID, shardInput) + return nil +} + +// aggregateFailedShard adds an error result for a shard. +func aggregateFailedShard(manifest *ClusterManifest, replicasetUUID string, err error) { + if err == nil { + manifest.Shards[replicasetUUID] = Shard{Error: "shard unreachable"} + manifest.Warnings = append(manifest.Warnings, + NewShardUnreachableWarning(replicasetUUID)) + return + } + + manifest.Shards[replicasetUUID] = Shard{Error: err.Error()} +} + +// aggregateSuccessfulShard adds an instance result for a shard. +func aggregateSuccessfulShard( + manifest *ClusterManifest, + replicasetUUID string, + shardInput *ShardInput, +) { + fragment := shardInput.Fragment + location := ArtifactLocation{} + if shardInput.Location != nil { + location = *shardInput.Location + } + + manifest.Shards[replicasetUUID] = Shard{ + Instance: &ShardInstance{ + InstanceUUID: fragment.InstanceUUID, + InstanceName: fragment.InstanceName, + Hostname: fragment.Hostname, + VclockBegin: fragment.VclockBegin, + VclockEnd: fragment.VclockEnd, + Artifact: Artifact{ + Path: location.Path, + SizeBytes: location.SizeBytes, + ChecksumSHA256: fragment.ChecksumSHA256, + Compression: artifactCompression, + Files: append([]string(nil), fragment.Files...), + RecoveryPoints: recoveryPointsFromFragment(manifest, replicasetUUID, fragment), + Type: fragment.Type, + }, + }, + } +} + +// recoveryPointsFromFragment converts optional fragment recovery points. +func recoveryPointsFromFragment( + manifest *ClusterManifest, + replicasetUUID string, + fragment *Fragment, +) []RecoveryPoint { + recoveryPoints := make([]RecoveryPoint, 0) + if fragment.RecoveryPoints == nil { + manifest.Warnings = append(manifest.Warnings, + NewRecoveryPointsUnavailableWarning(replicasetUUID, "recovery points unavailable")) + return recoveryPoints + } + + for _, point := range fragment.RecoveryPoints { + if point != nil { + recoveryPoints = append(recoveryPoints, *point) + } + } + return recoveryPoints +} + +// calculateStatus derives cluster backup health from shard results and warnings. +func calculateStatus(manifest *ClusterManifest) Status { + successful := 0 + failed := 0 + + for _, shard := range manifest.Shards { + if shard.Instance != nil { + successful++ + } + if shard.Error != "" { + failed++ + } + } + + if successful == 0 { + return StatusFailed + } + if isDegraded(manifest, successful, failed) { + return StatusDegraded + } + return StatusOK +} + +// isDegraded reports whether a partially useful manifest has issues. +func isDegraded(manifest *ClusterManifest, successful, failed int) bool { + return failed > 0 || + len(manifest.Warnings) > 0 || + successful < len(manifest.Topology.Replicasets) +} diff --git a/lib/backup/aggregation_test.go b/lib/backup/aggregation_test.go new file mode 100644 index 000000000..d81a38b9d --- /dev/null +++ b/lib/backup/aggregation_test.go @@ -0,0 +1,195 @@ +package backup + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestAggregateSuccessfulManifest(t *testing.T) { + fragment := mustDecodeFragment(t, fixtureFragmentA) + + manifest, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: 2300 * time.Millisecond, + Topology: topologyFromClusterManifestFixture(t, testRSA), + Shards: []*ShardInput{ + { + ReplicasetUUID: testRSA, + Fragment: &fragment, + Location: &ArtifactLocation{Path: "data/rs-a.tar.zst", SizeBytes: 42}, + }, + }, + }) + require.NoError(t, err) + require.Equal(t, SchemaVersion, manifest.SchemaVersion) + require.Equal(t, StatusOK, manifest.Status) + require.Empty(t, manifest.Warnings) + + shard := manifest.Shards[testRSA] + require.NotNil(t, shard.Instance) + require.Equal(t, "data/rs-a.tar.zst", shard.Instance.Artifact.Path) + require.Equal(t, int64(42), shard.Instance.Artifact.SizeBytes) + require.Equal(t, "zstd", shard.Instance.Artifact.Compression) + require.Len(t, shard.Instance.Artifact.RecoveryPoints, 2) + require.Equal(t, 2300*time.Millisecond, manifest.CreationDuration) +} + +func TestAggregateUnavailableShard(t *testing.T) { + manifest, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: time.Second, + Topology: topologyFromClusterManifestFixture(t, testRSA), + Shards: []*ShardInput{{ReplicasetUUID: testRSA}}, + }) + require.NoError(t, err) + require.Equal(t, StatusFailed, manifest.Status) + require.Equal(t, "shard unreachable", manifest.Shards[testRSA].Error) + require.Len(t, manifest.Warnings, 1) + require.Equal(t, WarnShardUnreachable, manifest.Warnings[0].Code) +} + +func TestAggregateNilRecoveryPointsAddsWarningAndEmptySlice(t *testing.T) { + fragment := mustDecodeFragment(t, fixtureFragmentWithoutRecoveryPoints) + + manifest, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: time.Second, + Topology: topologyFromClusterManifestFixture(t, testRSA), + Shards: []*ShardInput{{ReplicasetUUID: testRSA, Fragment: &fragment}}, + }) + require.NoError(t, err) + require.Equal(t, StatusDegraded, manifest.Status) + require.Len(t, manifest.Warnings, 1) + require.Equal(t, WarnRecoveryPointsUnavailable, manifest.Warnings[0].Code) + require.NotNil(t, manifest.Shards[testRSA].Instance.Artifact.RecoveryPoints) + require.Empty(t, manifest.Shards[testRSA].Instance.Artifact.RecoveryPoints) +} + +func TestAggregateEmptyRecoveryPointsDoesNotAddWarning(t *testing.T) { + fragment := mustDecodeFragment(t, fixtureFragmentWithEmptyRecoveryPoints) + + manifest, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: time.Second, + Topology: topologyFromClusterManifestFixture(t, testRSA), + Shards: []*ShardInput{{ReplicasetUUID: testRSA, Fragment: &fragment}}, + }) + require.NoError(t, err) + require.Equal(t, StatusOK, manifest.Status) + require.Empty(t, manifest.Warnings) + require.NotNil(t, manifest.Shards[testRSA].Instance.Artifact.RecoveryPoints) +} + +func TestAggregateShardErrorUsesErrorShard(t *testing.T) { + manifest, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: time.Second, + Topology: topologyFromClusterManifestFixture(t, testRSA), + Shards: []*ShardInput{{ + ReplicasetUUID: testRSA, + Err: errors.New("timeout: replicaset unreachable"), + }}, + }) + require.NoError(t, err) + require.Equal(t, "timeout: replicaset unreachable", manifest.Shards[testRSA].Error) + require.Equal(t, StatusFailed, manifest.Status) + require.Empty(t, manifest.Warnings) +} + +func TestAggregateRejectsInvalidFragment(t *testing.T) { + fragment := mustDecodeFragment(t, fixtureFragmentA) + fragment.Type = BackupType("bad") + + _, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: time.Second, + Topology: topologyFromClusterManifestFixture(t, testRSA), + Shards: []*ShardInput{{ReplicasetUUID: testRSA, Fragment: &fragment}}, + }) + require.ErrorContains(t, err, "invalid backup type") +} + +func TestAggregateRejectsReplicasetMismatch(t *testing.T) { + fragment := mustDecodeFragment(t, fixtureFragmentA) + + _, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: time.Second, + Topology: topologyFromClusterManifestFixture(t, testRSB), + Shards: []*ShardInput{{ReplicasetUUID: testRSB, Fragment: &fragment}}, + }) + require.ErrorContains(t, err, "does not match shard input") +} + +func TestAggregateBuildsClusterManifest(t *testing.T) { + fragmentA := mustDecodeFragment(t, fixtureFragmentA) + fragmentB := mustDecodeFragment(t, fixtureFragmentB) + + manifest, err := Aggregate(AggregateInput{ + BackupID: testBackupID, + BaseFullBackupID: testBackupID, + CreationTime: testCreationTime(), + CreationDuration: 2300 * time.Millisecond, + Topology: topologyFromClusterManifestFixture(t, testRSA, testRSB, testRSC), + Shards: []*ShardInput{ + { + ReplicasetUUID: testRSA, + Fragment: &fragmentA, + Location: &ArtifactLocation{ + Path: "20260312T120000Z-replicaset_A_uuid.tar.zst", + SizeBytes: 104857600, + }, + }, + { + ReplicasetUUID: testRSB, + Fragment: &fragmentB, + Location: &ArtifactLocation{ + Path: "20260312T120000Z-replicaset_B_uuid.tar.zst", + SizeBytes: 98304000, + }, + }, + {ReplicasetUUID: testRSC, Err: errors.New("timeout: replicaset unreachable")}, + }, + }) + require.NoError(t, err) + require.NoError(t, manifest.Validate()) + require.Equal(t, StatusDegraded, manifest.Status) + require.Equal(t, "timeout: replicaset unreachable", manifest.Shards[testRSC].Error) + require.Equal( + t, + manifest.Shards[testRSA].Instance.Artifact.RecoveryPoints[0].UUID, + manifest.Shards[testRSB].Instance.Artifact.RecoveryPoints[0].UUID, + ) +} + +func topologyFromClusterManifestFixture(t *testing.T, replicasetUUIDs ...string) Topology { + t.Helper() + + manifest := mustDecodeClusterManifest(t, fixtureClusterManifest) + topology := Topology{Replicasets: make(map[string][]TopologyInstance, len(replicasetUUIDs))} + for _, replicasetUUID := range replicasetUUIDs { + topology.Replicasets[replicasetUUID] = manifest.Topology.Replicasets[replicasetUUID] + } + return topology +} + +func testCreationTime() time.Time { + return time.Date(2026, 3, 12, 12, 0, 2, 456000000, time.UTC) +} diff --git a/lib/backup/fixtures_test.go b/lib/backup/fixtures_test.go new file mode 100644 index 000000000..dcc4f710a --- /dev/null +++ b/lib/backup/fixtures_test.go @@ -0,0 +1,65 @@ +package backup + +import ( + _ "embed" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +var ( + //go:embed testdata/cluster_manifest.json + fixtureClusterManifest []byte + //go:embed testdata/fragment_a.json + fixtureFragmentA []byte + //go:embed testdata/fragment_b.json + fixtureFragmentB []byte + //go:embed testdata/fragment_without_recovery_points.json + fixtureFragmentWithoutRecoveryPoints []byte + //go:embed testdata/fragment_with_empty_recovery_points.json + fixtureFragmentWithEmptyRecoveryPoints []byte +) + +func mustDecodeClusterManifest(t *testing.T, data []byte) ClusterManifest { + t.Helper() + + type clusterManifestJSON struct { + SchemaVersion int `json:"schema_version"` + BackupID BackupID `json:"backup_id"` + PreviousBackupID BackupID `json:"previous_backup_id"` + BaseFullBackupID BackupID `json:"base_full_backup_id"` + Status Status `json:"status"` + CreationTime time.Time `json:"creation_time"` + CreationDuration string `json:"creation_duration"` + Shards map[string]Shard `json:"shards"` + Topology Topology `json:"topology"` + Warnings []Warning `json:"warnings"` + } + + var raw clusterManifestJSON + require.NoError(t, json.Unmarshal(data, &raw)) + duration, err := time.ParseDuration(raw.CreationDuration) + require.NoError(t, err) + return ClusterManifest{ + SchemaVersion: raw.SchemaVersion, + BackupID: raw.BackupID, + PreviousBackupID: raw.PreviousBackupID, + BaseFullBackupID: raw.BaseFullBackupID, + Status: raw.Status, + CreationTime: raw.CreationTime, + CreationDuration: duration, + Shards: raw.Shards, + Topology: raw.Topology, + Warnings: raw.Warnings, + } +} + +func mustDecodeFragment(t *testing.T, data []byte) Fragment { + t.Helper() + + var fragment Fragment + require.NoError(t, json.Unmarshal(data, &fragment)) + return fragment +} diff --git a/lib/backup/manifest.go b/lib/backup/manifest.go new file mode 100644 index 000000000..4fa5913c5 --- /dev/null +++ b/lib/backup/manifest.go @@ -0,0 +1,59 @@ +package backup + +import "time" + +// BackupID is an opaque identifier of a backup in a chain. +type BackupID string + +// ClusterManifest is the complete cluster-level backup manifest. +type ClusterManifest struct { + SchemaVersion int `json:"schema_version"` + BackupID BackupID `json:"backup_id"` + PreviousBackupID BackupID `json:"previous_backup_id"` + BaseFullBackupID BackupID `json:"base_full_backup_id"` + Status Status `json:"status"` + CreationTime time.Time `json:"creation_time"` + CreationDuration time.Duration `json:"creation_duration"` + Shards map[string]Shard `json:"shards"` // Key is replicaset. + Topology Topology `json:"topology"` + Warnings []Warning `json:"warnings"` // Empty is [] +} + +// Shard is one replicaset result: either a backed-up instance or an error. +type Shard struct { + Instance *ShardInstance `json:"instance,omitempty"` + Error string `json:"error,omitempty"` +} + +// ShardInstance contains successful backup metadata for one instance. +type ShardInstance struct { + InstanceUUID string `json:"instance_uuid"` + InstanceName string `json:"instance_name"` + Hostname string `json:"hostname"` + VclockBegin Vclock `json:"vclock_begin"` + VclockEnd Vclock `json:"vclock_end"` + Artifact Artifact `json:"artifact"` +} + +// Artifact describes the stored archive produced for a shard. +type Artifact struct { + Path string `json:"path"` + SizeBytes int64 `json:"size_bytes"` + ChecksumSHA256 string `json:"checksum_sha256"` + Compression string `json:"compression"` + Files []string `json:"files"` + RecoveryPoints []RecoveryPoint `json:"recovery_points"` + Type BackupType `json:"type"` +} + +// Topology lists expected instances grouped by replicaset UUID. +type Topology struct { + Replicasets map[string][]TopologyInstance `json:"replicasets"` +} + +// TopologyInstance is one expected instance in cluster topology. +type TopologyInstance struct { + InstanceUUID string `json:"instance_uuid"` + InstanceName string `json:"instance_name"` + Hostname string `json:"hostname"` +} diff --git a/lib/backup/testdata/cluster_manifest.json b/lib/backup/testdata/cluster_manifest.json new file mode 100644 index 000000000..646f251b7 --- /dev/null +++ b/lib/backup/testdata/cluster_manifest.json @@ -0,0 +1,109 @@ +{ + "schema_version": 1, + "backup_id": "20260312T120000Z", + "previous_backup_id": null, + "base_full_backup_id": "20260312T120000Z", + "status": "OK", + "creation_time": "2026-03-12T12:00:02.456Z", + "creation_duration": "2.3s", + "shards": { + "11111111-1111-1111-1111-111111111111": { + "instance": { + "instance_uuid": "aaaaaaaa-0000-0000-0000-000000000001", + "instance_name": "router-001", + "hostname": "tarantool-node-01.prod.example.com", + "vclock_begin": {"1": 1500, "2": 230}, + "vclock_end": {"1": 1502, "2": 230}, + "artifact": { + "path": "20260312T120000Z-replicaset_A_uuid.tar.zst", + "size_bytes": 104857600, + "checksum_sha256": "e3b0c44298fc1c149afbf4c8996fb92400000000000000000000000000000000", + "compression": "zstd", + "files": [ + "00000000000000001500.snap", + "00000000000000001500.xlog" + ], + "recovery_points": [ + {"uuid": "550e8400-e29b-41d4-a716-446655440000", "replica_id": 1, "lsn": 1501, "timestamp": 1741780500}, + {"uuid": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", "replica_id": 1, "lsn": 1502, "timestamp": 1741780780} + ], + "type": "full" + } + } + }, + "22222222-2222-2222-2222-222222222222": { + "instance": { + "instance_uuid": "bbbbbbbb-0000-0000-0000-000000000001", + "instance_name": "storage-001", + "hostname": "tarantool-node-03.prod.example.com", + "vclock_begin": {"3": 800, "4": 110}, + "vclock_end": {"3": 801, "4": 110}, + "artifact": { + "path": "20260312T120000Z-replicaset_B_uuid.tar.zst", + "size_bytes": 98304000, + "checksum_sha256": "d7a8fbb307d7809469d49b82694fae5700000000000000000000000000000000", + "compression": "zstd", + "files": [ + "00000000000000000800.snap", + "00000000000000000800.xlog" + ], + "recovery_points": [ + {"uuid": "550e8400-e29b-41d4-a716-446655440000", "replica_id": 3, "lsn": 800, "timestamp": 1741780501}, + {"uuid": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", "replica_id": 3, "lsn": 801, "timestamp": 1741780781} + ], + "type": "full" + } + } + }, + "33333333-3333-3333-3333-333333333333": { + "error": "timeout: replicaset unreachable" + } + }, + "topology": { + "replicasets": { + "11111111-1111-1111-1111-111111111111": [ + { + "instance_uuid": "aaaaaaaa-0000-0000-0000-000000000001", + "instance_name": "router-001", + "hostname": "tarantool-node-01.prod.example.com" + }, + { + "instance_uuid": "aaaaaaaa-0000-0000-0000-000000000002", + "instance_name": "router-002", + "hostname": "tarantool-node-02.prod.example.com" + } + ], + "22222222-2222-2222-2222-222222222222": [ + { + "instance_uuid": "bbbbbbbb-0000-0000-0000-000000000001", + "instance_name": "storage-001", + "hostname": "tarantool-node-03.prod.example.com" + }, + { + "instance_uuid": "bbbbbbbb-0000-0000-0000-000000000002", + "instance_name": "storage-002", + "hostname": "tarantool-node-04.prod.example.com" + } + ], + "33333333-3333-3333-3333-333333333333": [ + { + "instance_uuid": "cccccccc-0000-0000-0000-000000000001", + "instance_name": "storage-003", + "hostname": "tarantool-node-05.prod.example.com" + }, + { + "instance_uuid": "cccccccc-0000-0000-0000-000000000002", + "instance_name": "storage-004", + "hostname": "tarantool-node-06.prod.example.com" + } + ] + } + }, + "warnings": [ + { + "code": "shard_unreachable", + "message": "replicaset C did not respond within timeout", + "details": {"replicaset_uuid": "33333333-3333-3333-3333-333333333333"} + } + ] +} diff --git a/lib/backup/testdata/fragment_a.json b/lib/backup/testdata/fragment_a.json new file mode 100644 index 000000000..654f7aa04 --- /dev/null +++ b/lib/backup/testdata/fragment_a.json @@ -0,0 +1,18 @@ +{ + "replicaset_uuid": "11111111-1111-1111-1111-111111111111", + "instance_uuid": "aaaaaaaa-0000-0000-0000-000000000001", + "instance_name": "router-001", + "hostname": "tarantool-node-01.prod.example.com", + "type": "full", + "vclock_begin": {"1": 1500, "2": 230}, + "vclock_end": {"1": 1502, "2": 230}, + "files": [ + "00000000000000001500.snap", + "00000000000000001500.xlog" + ], + "checksum_sha256": "e3b0c44298fc1c149afbf4c8996fb92400000000000000000000000000000000", + "recovery_points": [ + {"uuid": "550e8400-e29b-41d4-a716-446655440000", "replica_id": 1, "lsn": 1501, "timestamp": 1741780500}, + {"uuid": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", "replica_id": 1, "lsn": 1502, "timestamp": 1741780780} + ] +} diff --git a/lib/backup/testdata/fragment_b.json b/lib/backup/testdata/fragment_b.json new file mode 100644 index 000000000..ed0db2117 --- /dev/null +++ b/lib/backup/testdata/fragment_b.json @@ -0,0 +1,18 @@ +{ + "replicaset_uuid": "22222222-2222-2222-2222-222222222222", + "instance_uuid": "bbbbbbbb-0000-0000-0000-000000000001", + "instance_name": "storage-001", + "hostname": "tarantool-node-03.prod.example.com", + "type": "full", + "vclock_begin": {"3": 800, "4": 110}, + "vclock_end": {"3": 801, "4": 110}, + "files": [ + "00000000000000000800.snap", + "00000000000000000800.xlog" + ], + "checksum_sha256": "d7a8fbb307d7809469d49b82694fae5700000000000000000000000000000000", + "recovery_points": [ + {"uuid": "550e8400-e29b-41d4-a716-446655440000", "replica_id": 3, "lsn": 800, "timestamp": 1741780501}, + {"uuid": "6ba7b810-9dad-11d1-80b4-00c04fd430c8", "replica_id": 3, "lsn": 801, "timestamp": 1741780781} + ] +} diff --git a/lib/backup/testdata/fragment_with_empty_recovery_points.json b/lib/backup/testdata/fragment_with_empty_recovery_points.json new file mode 100644 index 000000000..0e4785a92 --- /dev/null +++ b/lib/backup/testdata/fragment_with_empty_recovery_points.json @@ -0,0 +1,15 @@ +{ + "replicaset_uuid": "11111111-1111-1111-1111-111111111111", + "instance_uuid": "aaaaaaaa-0000-0000-0000-000000000001", + "instance_name": "router-001", + "hostname": "tarantool-node-01.prod.example.com", + "type": "full", + "vclock_begin": {"0": 12, "1": 1500}, + "vclock_end": {"0": 12, "1": 1502}, + "files": [ + "00000000000000001500.snap", + "00000000000000001500.xlog" + ], + "checksum_sha256": "e3b0c44298fc1c149afbf4c8996fb92400000000000000000000000000000000", + "recovery_points": [] +} diff --git a/lib/backup/testdata/fragment_without_recovery_points.json b/lib/backup/testdata/fragment_without_recovery_points.json new file mode 100644 index 000000000..d5de47526 --- /dev/null +++ b/lib/backup/testdata/fragment_without_recovery_points.json @@ -0,0 +1,14 @@ +{ + "replicaset_uuid": "11111111-1111-1111-1111-111111111111", + "instance_uuid": "aaaaaaaa-0000-0000-0000-000000000001", + "instance_name": "router-001", + "hostname": "tarantool-node-01.prod.example.com", + "type": "full", + "vclock_begin": {"0": 12, "1": 1500}, + "vclock_end": {"0": 12, "1": 1502}, + "files": [ + "00000000000000001500.snap", + "00000000000000001500.xlog" + ], + "checksum_sha256": "e3b0c44298fc1c149afbf4c8996fb92400000000000000000000000000000000" +} diff --git a/lib/backup/types.go b/lib/backup/types.go new file mode 100644 index 000000000..4a9943308 --- /dev/null +++ b/lib/backup/types.go @@ -0,0 +1,29 @@ +package backup + +// SchemaVersion is the current cluster manifest JSON schema version. +const SchemaVersion = 1 + +const ( + // BackupTypeFull marks a complete backup chain starting point. + BackupTypeFull BackupType = "full" + // BackupTypeIncremental marks a backup based on a previous one. + BackupTypeIncremental BackupType = "incremental" +) + +const ( + // StatusOK means all expected shards were backed up without warnings. + StatusOK Status = "OK" + // StatusDegraded means some data exists, but the backup has warnings or shard errors. + StatusDegraded Status = "degraded" + // StatusFailed means no shard was successfully backed up. + StatusFailed Status = "failed" +) + +// Vclock maps replica IDs to their LSNs, including replica 0. +type Vclock map[uint32]uint64 + +// BackupType is the backup mode: full or incremental. +type BackupType string + +// Status is the aggregate health of the cluster backup. +type Status string diff --git a/lib/backup/validation.go b/lib/backup/validation.go new file mode 100644 index 000000000..8c450c4a6 --- /dev/null +++ b/lib/backup/validation.go @@ -0,0 +1,179 @@ +package backup + +import "fmt" + +// Validate checks that a fragment has required structural fields. +func (fragment Fragment) Validate() error { + if fragment.ReplicasetUUID == "" { + return fmt.Errorf("replicaset_uuid is empty") + } + if fragment.InstanceUUID == "" { + return fmt.Errorf("instance_uuid is empty") + } + if fragment.InstanceName == "" { + return fmt.Errorf("instance_name is empty") + } + if fragment.Hostname == "" { + return fmt.Errorf("hostname is empty") + } + if !isValidBackupType(fragment.Type) { + return fmt.Errorf("invalid backup type %q", fragment.Type) + } + if len(fragment.VclockBegin) == 0 { + return fmt.Errorf("vclock_begin is empty") + } + if len(fragment.VclockEnd) == 0 { + return fmt.Errorf("vclock_end is empty") + } + + return nil +} + +// Validate checks manifest structure without chain or storage verification. +func (manifest ClusterManifest) Validate() error { + if err := manifest.validateHeader(); err != nil { + return fmt.Errorf("validate manifest header: %w", err) + } + if err := manifest.validateShards(); err != nil { + return fmt.Errorf("validate manifest shards: %w", err) + } + if err := manifest.validateWarnings(); err != nil { + return fmt.Errorf("validate manifest warnings: %w", err) + } + return nil +} + +// validateHeader checks top-level manifest fields. +func (manifest ClusterManifest) validateHeader() error { + if manifest.SchemaVersion != SchemaVersion { + return fmt.Errorf("unsupported schema_version %d", manifest.SchemaVersion) + } + if manifest.BackupID == "" { + return fmt.Errorf("backup_id is empty") + } + if manifest.BaseFullBackupID == "" { + return fmt.Errorf("base_full_backup_id is empty") + } + if !isValidStatus(manifest.Status) { + return fmt.Errorf("invalid status %q", manifest.Status) + } + if manifest.Shards == nil { + return fmt.Errorf("shards is nil") + } + if manifest.Topology.Replicasets == nil { + return fmt.Errorf("topology.replicasets is nil") + } + if manifest.Warnings == nil { + return fmt.Errorf("warnings is nil") + } + return nil +} + +// validateShards checks shard keys and shard payloads. +func (manifest ClusterManifest) validateShards() error { + for replicasetUUID, shard := range manifest.Shards { + if err := manifest.validateShard(replicasetUUID, shard); err != nil { + return fmt.Errorf("validate shard %q: %w", replicasetUUID, err) + } + } + return nil +} + +// validateShard checks one shard entry. +func (manifest ClusterManifest) validateShard(replicasetUUID string, shard Shard) error { + if replicasetUUID == "" { + return fmt.Errorf("shards contains empty replicaset uuid") + } + if _, ok := manifest.Topology.Replicasets[replicasetUUID]; !ok { + return fmt.Errorf("shard %q is not present in topology", replicasetUUID) + } + + hasInstance := shard.Instance != nil + hasError := shard.Error != "" + if hasInstance == hasError { + return fmt.Errorf( + "shard %q must contain exactly one of instance or error", + replicasetUUID, + ) + } + if hasInstance { + if err := validateShardInstance(replicasetUUID, *shard.Instance); err != nil { + return fmt.Errorf("validate shard %q instance: %w", replicasetUUID, err) + } + } + return nil +} + +// validateWarnings checks warning codes. +func (manifest ClusterManifest) validateWarnings() error { + for i, warning := range manifest.Warnings { + if !isValidWarningCode(warning.Code) { + return fmt.Errorf("warnings[%d] has invalid code %q", i, warning.Code) + } + } + return nil +} + +// validateShardInstance checks successful shard metadata. +func validateShardInstance(replicasetUUID string, instance ShardInstance) error { + if instance.InstanceUUID == "" { + return fmt.Errorf("shard %q instance_uuid is empty", replicasetUUID) + } + if instance.InstanceName == "" { + return fmt.Errorf("shard %q instance_name is empty", replicasetUUID) + } + if instance.Hostname == "" { + return fmt.Errorf("shard %q hostname is empty", replicasetUUID) + } + if len(instance.VclockBegin) == 0 { + return fmt.Errorf("shard %q vclock_begin is empty", replicasetUUID) + } + if len(instance.VclockEnd) == 0 { + return fmt.Errorf("shard %q vclock_end is empty", replicasetUUID) + } + if !isValidBackupType(instance.Artifact.Type) { + return fmt.Errorf( + "shard %q has invalid backup type %q", + replicasetUUID, + instance.Artifact.Type, + ) + } + if instance.Artifact.RecoveryPoints == nil { + return fmt.Errorf("shard %q artifact.recovery_points is nil", replicasetUUID) + } + + return nil +} + +// isValidBackupType reports whether backupType is known by this schema. +func isValidBackupType(backupType BackupType) bool { + switch backupType { + case BackupTypeFull, BackupTypeIncremental: + return true + default: + return false + } +} + +// isValidStatus reports whether status is known by this schema. +func isValidStatus(status Status) bool { + switch status { + case StatusOK, StatusDegraded, StatusFailed: + return true + default: + return false + } +} + +// isValidWarningCode reports whether code is known by this schema. +func isValidWarningCode(code WarningCode) bool { + switch code { + case WarnShardPartial, + WarnShardUnreachable, + WarnRecoveryPointsUnavailable, + WarnStoragePartialUpload: + return true + default: + return false + } +} diff --git a/lib/backup/validation_test.go b/lib/backup/validation_test.go new file mode 100644 index 000000000..c8c02eeba --- /dev/null +++ b/lib/backup/validation_test.go @@ -0,0 +1,274 @@ +package backup + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const ( + testBackupID BackupID = "20260312T120000Z" + testRSA string = "11111111-1111-1111-1111-111111111111" + testRSB string = "22222222-2222-2222-2222-222222222222" + testRSC string = "33333333-3333-3333-3333-333333333333" +) + +func TestFragmentValidate(t *testing.T) { + valid := mustDecodeFragment(t, fixtureFragmentA) + + tests := []struct { + name string + fragment Fragment + wantError string + }{ + { + name: "valid", + fragment: valid, + }, + { + name: "empty replicaset uuid", + fragment: func() Fragment { + fragment := valid + fragment.ReplicasetUUID = "" + return fragment + }(), + wantError: "replicaset_uuid is empty", + }, + { + name: "empty instance uuid", + fragment: func() Fragment { + fragment := valid + fragment.InstanceUUID = "" + return fragment + }(), + wantError: "instance_uuid is empty", + }, + { + name: "invalid type", + fragment: func() Fragment { + fragment := valid + fragment.Type = BackupType("snapshot") + return fragment + }(), + wantError: "invalid backup type", + }, + { + name: "empty vclock begin", + fragment: func() Fragment { + fragment := valid + fragment.VclockBegin = nil + return fragment + }(), + wantError: "vclock_begin is empty", + }, + { + name: "empty vclock end", + fragment: func() Fragment { + fragment := valid + fragment.VclockEnd = Vclock{} + return fragment + }(), + wantError: "vclock_end is empty", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.fragment.Validate() + if tt.wantError == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tt.wantError) + } + }) + } +} + +func TestClusterManifestValidate(t *testing.T) { + valid := func() ClusterManifest { return testClusterManifest(t, StatusOK) } + + tests := []struct { + name string + manifest ClusterManifest + wantError string + }{ + { + name: "valid", + manifest: valid(), + }, + { + name: "foreign schema version", + manifest: func() ClusterManifest { + manifest := valid() + manifest.SchemaVersion = 2 + return manifest + }(), + wantError: "unsupported schema_version 2", + }, + { + name: "empty backup id", + manifest: func() ClusterManifest { + manifest := valid() + manifest.BackupID = "" + return manifest + }(), + wantError: "backup_id is empty", + }, + { + name: "invalid status", + manifest: func() ClusterManifest { + manifest := valid() + manifest.Status = Status("partial") + return manifest + }(), + wantError: "invalid status", + }, + { + name: "shard has both instance and error", + manifest: func() ClusterManifest { + manifest := valid() + shard := manifest.Shards[testRSA] + shard.Error = "unexpected error" + manifest.Shards[testRSA] = shard + return manifest + }(), + wantError: "must contain exactly one of instance or error", + }, + { + name: "shard has neither instance nor error", + manifest: func() ClusterManifest { + manifest := valid() + manifest.Shards[testRSA] = Shard{} + return manifest + }(), + wantError: "must contain exactly one of instance or error", + }, + { + name: "orphan shard outside topology", + manifest: func() ClusterManifest { + manifest := valid() + manifest.Shards["99999999-9999-9999-9999-999999999999"] = manifest.Shards[testRSA] + return manifest + }(), + wantError: "is not present in topology", + }, + { + name: "successful shard with empty vclock begin", + manifest: func() ClusterManifest { + manifest := valid() + shard := manifest.Shards[testRSA] + shard.Instance.VclockBegin = nil + manifest.Shards[testRSA] = shard + return manifest + }(), + wantError: "vclock_begin is empty", + }, + { + name: "invalid artifact type", + manifest: func() ClusterManifest { + manifest := valid() + shard := manifest.Shards[testRSA] + shard.Instance.Artifact.Type = BackupType("bad") + manifest.Shards[testRSA] = shard + return manifest + }(), + wantError: "invalid backup type", + }, + { + name: "invalid warning code", + manifest: func() ClusterManifest { + manifest := valid() + manifest.Warnings = []Warning{{Code: WarningCode("unknown")}} + return manifest + }(), + wantError: "invalid code", + }, + { + name: "nil warnings", + manifest: func() ClusterManifest { + manifest := valid() + manifest.Warnings = nil + return manifest + }(), + wantError: "warnings is nil", + }, + { + name: "nil recovery points", + manifest: func() ClusterManifest { + manifest := valid() + shard := manifest.Shards[testRSA] + shard.Instance.Artifact.RecoveryPoints = nil + manifest.Shards[testRSA] = shard + return manifest + }(), + wantError: "artifact.recovery_points is nil", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.manifest.Validate() + if tt.wantError == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tt.wantError) + } + }) + } +} + +func TestClusterManifestIsValidAndSerializable(t *testing.T) { + manifest := mustDecodeClusterManifest(t, fixtureClusterManifest) + require.NoError(t, manifest.Validate()) + + data, err := json.Marshal(manifest) + require.NoError(t, err) + + jsonText := string(data) + for _, want := range []string{ + `"schema_version":1`, + `"previous_backup_id":""`, + `"creation_duration":2300000000`, + `"status":"OK"`, + `"550e8400-e29b-41d4-a716-446655440000"`, + `"6ba7b810-9dad-11d1-80b4-00c04fd430c8"`, + `"error":"timeout: replicaset unreachable"`, + `"code":"shard_unreachable"`, + } { + require.Contains(t, jsonText, want) + } + + type clusterManifestRoundTrip struct { + SchemaVersion int `json:"schema_version"` + BackupID BackupID `json:"backup_id"` + PreviousBackupID BackupID `json:"previous_backup_id"` + BaseFullBackupID BackupID `json:"base_full_backup_id"` + Status Status `json:"status"` + CreationTime time.Time `json:"creation_time"` + CreationDuration time.Duration `json:"creation_duration"` + Shards map[string]Shard `json:"shards"` + Topology Topology `json:"topology"` + Warnings []Warning `json:"warnings"` + } + + var roundTrip clusterManifestRoundTrip + require.NoError(t, json.Unmarshal(data, &roundTrip)) + decoded := ClusterManifest(roundTrip) + require.NoError(t, decoded.Validate()) + require.Equal(t, 2300*time.Millisecond, decoded.CreationDuration) +} + +func testClusterManifest(t *testing.T, status Status) ClusterManifest { + t.Helper() + + manifest := mustDecodeClusterManifest(t, fixtureClusterManifest) + manifest.Status = status + delete(manifest.Shards, testRSB) + delete(manifest.Shards, testRSC) + delete(manifest.Topology.Replicasets, testRSB) + delete(manifest.Topology.Replicasets, testRSC) + manifest.Warnings = []Warning{} + return manifest +} diff --git a/lib/backup/warnings.go b/lib/backup/warnings.go new file mode 100644 index 000000000..3d6ab6745 --- /dev/null +++ b/lib/backup/warnings.go @@ -0,0 +1,67 @@ +package backup + +// WarningCode identifies a non-fatal manifest issue. +type WarningCode string + +const ( + // WarnShardPartial marks a shard backup that completed only partially. + WarnShardPartial WarningCode = "shard_partial" + // WarnShardUnreachable marks a shard that did not produce a fragment. + WarnShardUnreachable WarningCode = "shard_unreachable" + // WarnRecoveryPointsUnavailable marks a missing recovery_points field. + WarnRecoveryPointsUnavailable WarningCode = "recovery_points_unavailable" + // WarnStoragePartialUpload marks archives that were not fully uploaded. + WarnStoragePartialUpload WarningCode = "storage_partial_upload" +) + +// Warning is a typed, serializable non-fatal backup issue. +type Warning struct { + Code WarningCode `json:"code"` + Message string `json:"message"` + Details map[string]any `json:"details"` +} + +// NewShardPartialWarning reports a partial shard backup. +func NewShardPartialWarning(replicasetUUID, instanceUUID, reason string) Warning { + return Warning{ + Code: WarnShardPartial, + Message: reason, + Details: map[string]any{ + "replicaset_uuid": replicasetUUID, + "instance_uuid": instanceUUID, + }, + } +} + +// NewShardUnreachableWarning reports a missing shard fragment. +func NewShardUnreachableWarning(replicasetUUID string) Warning { + return Warning{ + Code: WarnShardUnreachable, + Message: "shard unreachable", + Details: map[string]any{ + "replicaset_uuid": replicasetUUID, + }, + } +} + +// NewRecoveryPointsUnavailableWarning reports unavailable recovery points. +func NewRecoveryPointsUnavailableWarning(replicasetUUID, errMsg string) Warning { + return Warning{ + Code: WarnRecoveryPointsUnavailable, + Message: errMsg, + Details: map[string]any{ + "replicaset_uuid": replicasetUUID, + }, + } +} + +// NewStoragePartialUploadWarning reports storage keys that failed to upload. +func NewStoragePartialUploadWarning(keys []string) Warning { + return Warning{ + Code: WarnStoragePartialUpload, + Message: "storage partial upload", + Details: map[string]any{ + "keys": keys, + }, + } +}