Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Changes

### Fixes

- Kept large Git snapshot imports and FTS rebuilds from exhausting memory on small hosts by using file-backed SQLite temp storage and a bounded import cache. (#65) Thanks @hxy91819.

## 0.7.2 - 2026-05-11

### Changes
Expand Down
220 changes: 220 additions & 0 deletions internal/share/import_memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package share

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/openclaw/discrawl/internal/store"
)

func TestImportRealSnapshot(t *testing.T) {
repo := strings.TrimSpace(os.Getenv("DISCRAWL_REAL_REPO"))
if repo == "" {
t.Skip("set DISCRAWL_REAL_REPO to run real snapshot import validation")
}

ctx := context.Background()
dst, err := store.Open(ctx, filepath.Join(t.TempDir(), "dst.db"))
require.NoError(t, err)
defer func() { _ = dst.Close() }()

_, changed, err := ImportIfChanged(ctx, dst, Options{
RepoPath: repo,
Branch: "main",
Progress: func(p ImportProgress) {
if p.Phase == "start" || p.Phase == "rebuild_fts" || p.Phase == "done" {
t.Logf("import progress phase=%s total_rows=%d", p.Phase, p.TotalRows)
}
},
})
require.NoError(t, err)
require.True(t, changed)

var messageCount int
require.NoError(t, dst.DB().QueryRowContext(ctx, `select count(*) from messages`).Scan(&messageCount))
require.Positive(t, messageCount)
var ftsCount int
require.NoError(t, dst.DB().QueryRowContext(ctx, `select count(*) from message_fts`).Scan(&ftsCount))
require.Equal(t, messageCount, ftsCount)
}

func TestImportMemoryBounded(t *testing.T) {
if os.Getenv("DISCRAWL_OOM_REGRESSION") != "1" {
t.Skip("set DISCRAWL_OOM_REGRESSION=1 to run the memory-bounded import regression")
}

ctx := context.Background()
repo := t.TempDir()
messageRows := envInt(t, "DISCRAWL_OOM_ROWS", 80000)
textBytes := envInt(t, "DISCRAWL_OOM_TEXT_BYTES", 2048)
t.Logf("building synthetic snapshot rows=%d text_bytes=%d", messageRows, textBytes)
writeSyntheticMemorySnapshot(t, repo, messageRows, textBytes)
t.Log("synthetic snapshot built; starting import")

dst, err := store.Open(ctx, filepath.Join(t.TempDir(), "dst.db"))
require.NoError(t, err)
defer func() { _ = dst.Close() }()

var progress []ImportProgress
_, changed, err := ImportIfChanged(ctx, dst, Options{
RepoPath: repo,
Branch: "main",
Progress: func(p ImportProgress) { progress = append(progress, p) },
})
require.NoError(t, err)
require.True(t, changed)
require.Contains(t, progressPhases(progress), "rebuild_fts")

needle := "oomunique000042"
results, err := dst.SearchMessages(ctx, store.SearchOptions{Query: needle, Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)
require.Contains(t, results[0].Content, needle)
}

func writeSyntheticMemorySnapshot(t *testing.T, repo string, messageRows, textBytes int) {
t.Helper()
generatedAt := time.Now().UTC()
updatedAt := generatedAt.Format(time.RFC3339Nano)
guildFile := "tables/guilds/000000.jsonl.gz"
channelFile := "tables/channels/000000.jsonl.gz"
memberFile := "tables/members/000000.jsonl.gz"
messageFile := "tables/messages/000000.jsonl.gz"

writeJSONLGzip(t, repo, guildFile, func(enc *json.Encoder) int {
require.NoError(t, enc.Encode(map[string]any{
"id": "g1",
"name": "Guild",
"icon": "",
"raw_json": `{}`,
"updated_at": updatedAt,
}))
return 1
})
writeJSONLGzip(t, repo, channelFile, func(enc *json.Encoder) int {
require.NoError(t, enc.Encode(map[string]any{
"id": "c1",
"guild_id": "g1",
"parent_id": "",
"kind": "text",
"name": "general",
"topic": "",
"position": 0,
"is_nsfw": false,
"is_archived": false,
"is_locked": false,
"is_private_thread": false,
"thread_parent_id": "",
"archive_timestamp": "",
"raw_json": `{}`,
"updated_at": updatedAt,
}))
return 1
})
writeJSONLGzip(t, repo, memberFile, func(enc *json.Encoder) int {
require.NoError(t, enc.Encode(map[string]any{
"guild_id": "g1",
"user_id": "u1",
"username": "peter",
"global_name": "",
"display_name": "Peter",
"nick": "",
"discriminator": "",
"avatar": "",
"bot": false,
"joined_at": "",
"role_ids_json": `[]`,
"raw_json": `{"bio":"memory regression profile"}`,
"updated_at": updatedAt,
}))
return 1
})
writeJSONLGzip(t, repo, messageFile, func(enc *json.Encoder) int {
for i := range messageRows {
messageID := strconv.FormatInt(1456744319972282449+int64(i), 10)
unique := fmt.Sprintf("oomunique%06d", i)
content := syntheticMemoryContent(unique, textBytes)
require.NoError(t, enc.Encode(map[string]any{
"id": messageID,
"guild_id": "g1",
"channel_id": "c1",
"author_id": "u1",
"message_type": 0,
"created_at": updatedAt,
"edited_at": "",
"deleted_at": "",
"content": content,
"normalized_content": content,
"reply_to_message_id": "",
"pinned": false,
"has_attachments": false,
"raw_json": `{"author":{"username":"Peter"}}`,
"updated_at": updatedAt,
}))
}
return messageRows
})

manifest := Manifest{
Version: 1,
GeneratedAt: generatedAt,
Tables: []TableManifest{
{Name: "guilds", Files: []string{guildFile}, Columns: []string{"id", "name", "icon", "raw_json", "updated_at"}, Rows: 1},
{Name: "channels", Files: []string{channelFile}, Columns: []string{"id", "guild_id", "parent_id", "kind", "name", "topic", "position", "is_nsfw", "is_archived", "is_locked", "is_private_thread", "thread_parent_id", "archive_timestamp", "raw_json", "updated_at"}, Rows: 1},
{Name: "members", Files: []string{memberFile}, Columns: []string{"guild_id", "user_id", "username", "global_name", "display_name", "nick", "discriminator", "avatar", "bot", "joined_at", "role_ids_json", "raw_json", "updated_at"}, Rows: 1},
{Name: "messages", Files: []string{messageFile}, Columns: []string{"id", "guild_id", "channel_id", "author_id", "message_type", "created_at", "edited_at", "deleted_at", "content", "normalized_content", "reply_to_message_id", "pinned", "has_attachments", "raw_json", "updated_at"}, Rows: messageRows},
},
}
body, err := json.MarshalIndent(manifest, "", " ")
require.NoError(t, err)
require.NoError(t, os.WriteFile(filepath.Join(repo, ManifestName), append(body, '\n'), 0o600))
}

func writeJSONLGzip(t *testing.T, repo, rel string, writeRows func(*json.Encoder) int) {
t.Helper()
path := filepath.Join(repo, filepath.FromSlash(rel))
require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755))
file, err := os.Create(path)
require.NoError(t, err)
gz := gzip.NewWriter(file)
enc := json.NewEncoder(gz)
rows := writeRows(enc)
require.Positive(t, rows)
require.NoError(t, gz.Close())
require.NoError(t, file.Close())
}

func syntheticMemoryContent(unique string, size int) string {
if size <= len(unique) {
return unique
}
var b strings.Builder
b.Grow(size)
b.WriteString(unique)
for i := 0; b.Len() < size; i++ {
_, _ = fmt.Fprintf(&b, " token_%s_%04d", unique, i)
}
return b.String()[:size]
}

func envInt(t *testing.T, name string, fallback int) int {
t.Helper()
raw := strings.TrimSpace(os.Getenv(name))
if raw == "" {
return fallback
}
value, err := strconv.Atoi(raw)
require.NoError(t, err)
require.Positive(t, value)
return value
}
7 changes: 4 additions & 3 deletions internal/share/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,11 @@ func Import(ctx context.Context, s *store.Store, opts Options) (Manifest, error)
func applyImportPragmas(ctx context.Context, db *sql.DB) (func(context.Context) error, error) {
// Snapshot imports touch most of the archive. Keep SQLite's crash recovery
// enabled; journal_mode=off can leave the live DB malformed if the process
// or host dies mid-import.
// or host dies mid-import. Keep temporary storage file-backed and bound the
// page cache so large imports and FTS rebuilds do not exhaust small hosts.
for _, stmt := range []string{
`pragma temp_store = memory`,
`pragma cache_size = -262144`,
`pragma temp_store = file`,
`pragma cache_size = -32768`,
`pragma journal_mode = wal`,
`pragma synchronous = normal`,
} {
Expand Down
10 changes: 9 additions & 1 deletion internal/share/share_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func TestImportIfChangedInfersLegacyManifestFilesFromGit(t *testing.T) {
require.Len(t, results, 1)
}

func TestApplyImportPragmasKeepCrashRecoveryEnabled(t *testing.T) {
func TestApplyImportPragmasBoundImportMemory(t *testing.T) {
ctx := context.Background()
s := seedStore(t, filepath.Join(t.TempDir(), "dst.db"))
defer func() { _ = s.Close() }()
Expand All @@ -320,6 +320,14 @@ func TestApplyImportPragmasKeepCrashRecoveryEnabled(t *testing.T) {
require.NoError(t, err)
defer func() { require.NoError(t, restore(ctx)) }()

var tempStore int
require.NoError(t, s.DB().QueryRowContext(ctx, `pragma temp_store`).Scan(&tempStore))
require.Equal(t, 1, tempStore, "snapshot imports should use file-backed temporary storage")

var cacheSize int
require.NoError(t, s.DB().QueryRowContext(ctx, `pragma cache_size`).Scan(&cacheSize))
require.GreaterOrEqual(t, cacheSize, -65536)

var journalMode string
require.NoError(t, s.DB().QueryRowContext(ctx, `pragma journal_mode`).Scan(&journalMode))
require.NotEqual(t, "off", strings.ToLower(journalMode))
Expand Down