diff --git a/go.mod b/go.mod index 67c214789..3dce92a3f 100644 --- a/go.mod +++ b/go.mod @@ -35,12 +35,20 @@ require ( k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 ) +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect + github.com/spf13/pflag v1.0.5 // indirect +) + require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect + github.com/go-sql-driver/mysql v1.8.1 github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -52,6 +60,8 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/sevlyar/go-daemon v0.1.6 + github.com/spf13/cobra v1.8.1 github.com/tklauser/go-sysconf v0.3.11 // indirect github.com/tklauser/numcpus v0.6.0 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/go.sum b/go.sum index 8c3980ced..ace2d980b 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/AlekSi/pointer v1.2.0 h1:glcy/gc4h8HnG2Z3ZECSzZ1IX1x2JxRVuDzaJwQE0+w= github.com/AlekSi/pointer v1.2.0/go.mod h1:gZGfd3dpW4vEc/UlyfKKi1roIqcCgwOIvb0tSNSBle0= github.com/Ladicle/tabwriter v1.0.0 h1:DZQqPvMumBDwVNElso13afjYLNp0Z7pHqHnu0r4t9Dg= @@ -17,6 +19,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -30,6 +33,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -51,8 +56,12 @@ github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUq github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef h1:A9HsByNhogrvm9cWb28sjiS3i7tcKCkflWFEkHfuAgM= github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef/go.mod h1:lADxMC39cJJqL93Duh1xhAs4I2Zs8mKS89XWXFGp9cs= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= +github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= @@ -91,10 +100,15 @@ github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99 github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sevlyar/go-daemon v0.1.6 h1:EUh1MDjEM4BI109Jign0EaknA2izkOyi0LV3ro3QQGs= +github.com/sevlyar/go-daemon v0.1.6/go.mod h1:6dJpPatBT9eUwM5VCw9Bt6CdX9Tk6UWvhW3MebLDRKE= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= +github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/src/go/pt-stalk/README.md b/src/go/pt-stalk/README.md new file mode 100644 index 000000000..bf36f2262 --- /dev/null +++ b/src/go/pt-stalk/README.md @@ -0,0 +1,138 @@ +# pt-stalk (Go Version) + +A Go implementation of the Percona pt-stalk tool for collecting MySQL and system metrics. + +## Installation + +Install using go: + + go install github.com/percona/pt-stalk@latest + +## Usage Examples + +### Basic MySQL Monitoring + +Collect only MySQL metrics: + + pt-stalk --collectors=mysql \ + --mysql-host=localhost \ + --mysql-user=root \ + --mysql-password=secret \ + --dest=/var/log/mysql/samples \ + --interval=1 + +### MySQL and System Monitoring + +Collect both MySQL and system metrics: + + pt-stalk --collectors=mysql,system \ + --mysql-host=localhost \ + --mysql-user=root \ + --mysql-password=secret \ + --collect-gdb=true \ + --collect-tcpdump=true \ + --dest=/var/log/mysql/samples \ + --interval=1 + +### Running as a Daemon + +Run pt-stalk in the background: + + pt-stalk --collectors=mysql,system \ + --mysql-host=localhost \ + --mysql-user=root \ + --mysql-password=secret \ + --daemonize=true \ + --pid=/var/run/pt-stalk.pid \ + --log=/var/log/pt-stalk.log \ + --dest=/var/log/mysql/samples + +### Using a Custom Plugin + +Run with a custom plugin script: + + pt-stalk --collectors=mysql \ + --mysql-host=localhost \ + --mysql-user=root \ + --mysql-password=secret \ + --plugin=/path/to/custom/plugin.sh \ + --dest=/var/log/mysql/samples + +Example plugin script (plugin.sh): + + #!/bin/bash + # Environment variables available: + # PT_DEST - destination directory + # PT_PREFIX - file prefix + # PT_INTERVAL - check interval + # PT_RUNTIME - collection duration + + echo "Custom collection started" > "$PT_DEST/${PT_PREFIX}_custom.txt" + # Add your custom collection logic here + +### Basic MongoDB Monitoring + +Collect MongoDB metrics: + + pt-stalk --collectors=mongodb \ + --mongodb-host=localhost \ + --mongodb-user=myuser \ + --mongodb-password=secret \ + --dest=/var/log/mongodb/samples \ + --interval=1 + +## Configuration Options + +### Common Options +- --collectors: Comma-separated list of collectors to enable (mysql,system) +- --interval: Check interval in seconds (default: 1) +- --run-time: How long to collect data in seconds (default: 30) +- --sleep: Sleep time between collections in seconds (default: 1) +- --dest: Destination directory for collected data (default: /var/lib/pt-stalk) +- --prefix: Filename prefix for samples +- --daemonize: Run as daemon (default: false) + +### MySQL Collector Options +- --mysql-host: MySQL host (default: localhost) +- --mysql-port: MySQL port (default: 3306) +- --mysql-user: MySQL user +- --mysql-password: MySQL password +- --mysql-socket: MySQL socket file +- --mysql-defaults-file: MySQL configuration file + +### System Collector Options +- --collect-gdb: Collect GDB stacktraces (default: false) +- --collect-oprofile: Collect OProfile data (default: false) +- --collect-strace: Collect strace data (default: false) +- --collect-tcpdump: Collect tcpdump data (default: false) + +### Retention Options +- --retention-time: Days to retain samples (default: 30) +- --retention-count: Number of samples to retain (default: 0) +- --retention-size: Maximum size in MB to retain (default: 0) +- --disk-bytes-free: Minimum bytes free (default: 100MB) +- --disk-pct-free: Minimum percent free (default: 5) + +### Notification Options +- --notify-by-email: Email address for notifications +- --verbose: Verbosity level (0-3) (default: 2) + +### MongoDB Collector Options +- --mongodb-host: MongoDB host (default: localhost) +- --mongodb-port: MongoDB port (default: 27017) +- --mongodb-user: MongoDB user +- --mongodb-password: MongoDB password + +## Output Files + +Each collection creates files with the specified prefix and timestamps: +- {prefix}_status.txt: MySQL status variables +- {prefix}_variables.txt: MySQL system variables +- {prefix}_processlist.txt: MySQL process list +- {prefix}_diskstats.txt: System disk statistics +- {prefix}_meminfo.txt: System memory information +- {prefix}_loadavg.txt: System load average +- {prefix}_plugin.txt: Custom plugin output (if configured) +- {prefix}_server_status.txt: MongoDB server status metrics +- {prefix}_current_op.txt: MongoDB currently running operations +- {prefix}_db_stats.txt: MongoDB database statistics \ No newline at end of file diff --git a/src/go/pt-stalk/collect.go b/src/go/pt-stalk/collect.go new file mode 100644 index 000000000..07e40f2eb --- /dev/null +++ b/src/go/pt-stalk/collect.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + + "github.com/spf13/cobra" +) + +type CollectorRegistration struct { + Name string + AddFlags func(*cobra.Command, map[string]interface{}) + NewCollector func(*Config) Collector +} + +var registeredCollectors = make(map[string]CollectorRegistration) + +func RegisterCollector(reg CollectorRegistration) { + registeredCollectors[reg.Name] = reg +} + +// Base interface that all collectors must implement +type Collector interface { + Collect(ctx context.Context) error +} diff --git a/src/go/pt-stalk/collect_mongodb.go b/src/go/pt-stalk/collect_mongodb.go new file mode 100644 index 000000000..c2f099dc2 --- /dev/null +++ b/src/go/pt-stalk/collect_mongodb.go @@ -0,0 +1,140 @@ +package main + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/spf13/cobra" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type MongoDBCollector struct { + stalker *Stalker + client *mongo.Client + outDir string + prefix string + wg sync.WaitGroup + mongoCfg *MongoDBConfig +} + +func NewMongoDBCollector(config *Config) Collector { + mongoCfg := config.CollectorConfigs["mongodb"].(*MongoDBConfig) + return &MongoDBCollector{ + stalker: nil, + client: nil, + outDir: config.Dest, + prefix: config.Prefix, + mongoCfg: mongoCfg, + } +} + +func (c *MongoDBCollector) Collect(ctx context.Context) error { + if c.client == nil { + uri := fmt.Sprintf("mongodb://%s:%s@%s:%d", + c.mongoCfg.User, + c.mongoCfg.Password, + c.mongoCfg.Host, + c.mongoCfg.Port, + ) + + client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri)) + if err != nil { + return fmt.Errorf("failed to connect to MongoDB: %v", err) + } + c.client = client + defer client.Disconnect(ctx) + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.collectServerStatus(ctx) + c.collectCurrentOp(ctx) + c.collectDatabaseStats(ctx) + }() + + c.wg.Wait() + return nil +} + +func (c *MongoDBCollector) collectServerStatus(ctx context.Context) error { + result := bson.M{} + err := c.client.Database("admin").RunCommand(ctx, bson.D{{Key: "serverStatus", Value: 1}}).Decode(&result) + if err != nil { + return err + } + return c.writeResults(result, c.prefix+"_server_status.txt") +} + +func (c *MongoDBCollector) collectCurrentOp(ctx context.Context) error { + result := bson.M{} + err := c.client.Database("admin").RunCommand(ctx, bson.D{{Key: "currentOp", Value: 1}}).Decode(&result) + if err != nil { + return err + } + return c.writeResults(result, c.prefix+"_current_op.txt") +} + +func (c *MongoDBCollector) collectDatabaseStats(ctx context.Context) error { + dbs, err := c.client.ListDatabaseNames(ctx, bson.D{}) + if err != nil { + return err + } + + stats := make(map[string]bson.M) + for _, dbName := range dbs { + result := bson.M{} + err := c.client.Database(dbName).RunCommand(ctx, bson.D{{Key: "dbStats", Value: 1}}).Decode(&result) + if err != nil { + return err + } + stats[dbName] = result + } + return c.writeResults(stats, c.prefix+"_db_stats.txt") +} + +func (c *MongoDBCollector) writeResults(data interface{}, filename string) error { + f, err := os.Create(filepath.Join(c.outDir, filename)) + if err != nil { + return err + } + defer f.Close() + + formatted, err := bson.MarshalExtJSON(data, true, false) + if err != nil { + return err + } + + _, err = f.Write(formatted) + return err +} + +type MongoDBConfig struct { + Host string + Port int + User string + Password string +} + +func addMongoDBFlags(cmd *cobra.Command, cfg map[string]interface{}) { + mongoCfg := &MongoDBConfig{} + cfg["mongodb"] = mongoCfg + + cmd.PersistentFlags().StringVar(&mongoCfg.Host, "mongodb-host", "localhost", "MongoDB host") + cmd.PersistentFlags().IntVar(&mongoCfg.Port, "mongodb-port", 27017, "MongoDB port") + cmd.PersistentFlags().StringVar(&mongoCfg.User, "mongodb-user", "", "MongoDB user") + cmd.PersistentFlags().StringVar(&mongoCfg.Password, "mongodb-password", "", "MongoDB password") +} + +func init() { + RegisterCollector(CollectorRegistration{ + Name: "mongodb", + AddFlags: addMongoDBFlags, + NewCollector: NewMongoDBCollector, + }) +} diff --git a/src/go/pt-stalk/collect_mongodb_test.go b/src/go/pt-stalk/collect_mongodb_test.go new file mode 100644 index 000000000..a88db2976 --- /dev/null +++ b/src/go/pt-stalk/collect_mongodb_test.go @@ -0,0 +1,86 @@ +package main + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMongoDBCollector(t *testing.T) { + // Skip if no MongoDB connection available + mongoURI := os.Getenv("TEST_MONGODB_URI") + if mongoURI == "" { + t.Skip("Skipping MongoDB tests: TEST_MONGODB_URI not set") + } + + // Create temp directory for test outputs + tmpDir, err := os.MkdirTemp("", "mongodb-collector-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // Create test config + cfg := &Config{ + Dest: tmpDir, + Prefix: "test", + CollectorConfigs: map[string]interface{}{ + "mongodb": &MongoDBConfig{ + Host: "localhost", + Port: 27017, + User: "testuser", + Password: "testpass", + }, + }, + } + + // Create collector + collector := NewMongoDBCollector(cfg) + assert.NotNil(t, collector) + + // Test collection + ctx := context.Background() + err = collector.Collect(ctx) + assert.NoError(t, err) + + // Verify output files exist + expectedFiles := []string{ + "test_server_status.txt", + "test_current_op.txt", + "test_db_stats.txt", + } + + for _, file := range expectedFiles { + path := filepath.Join(tmpDir, file) + _, err := os.Stat(path) + assert.NoError(t, err, "Expected file %s to exist", file) + + // Verify file is not empty + content, err := os.ReadFile(path) + assert.NoError(t, err) + assert.NotEmpty(t, content) + } +} + +func TestMongoDBCollectorConnection(t *testing.T) { + // Test invalid connection + cfg := &Config{ + Dest: os.TempDir(), + Prefix: "test", + CollectorConfigs: map[string]interface{}{ + "mongodb": &MongoDBConfig{ + Host: "nonexistent", + Port: 27017, + User: "invalid", + Password: "invalid", + }, + }, + } + + collector := NewMongoDBCollector(cfg) + err := collector.Collect(context.Background()) + assert.Error(t, err) +} diff --git a/src/go/pt-stalk/collect_mysql.go b/src/go/pt-stalk/collect_mysql.go new file mode 100644 index 000000000..a83da745a --- /dev/null +++ b/src/go/pt-stalk/collect_mysql.go @@ -0,0 +1,152 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/spf13/cobra" +) + +type MySQLCollector struct { + stalker *Stalker + db *sql.DB + outDir string + prefix string + wg sync.WaitGroup + mysqlCfg *MySQLConfig +} + +func NewMySQLCollector(config *Config) Collector { + mysqlCfg := config.CollectorConfigs["mysql"].(*MySQLConfig) + return &MySQLCollector{ + stalker: nil, + db: nil, + outDir: config.Dest, + prefix: config.Prefix, + mysqlCfg: mysqlCfg, + } +} + +func (c *MySQLCollector) Collect(ctx context.Context) error { + if c.db == nil { + mysqlCfg := c.mysqlCfg + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/", mysqlCfg.User, mysqlCfg.Password, mysqlCfg.Host, mysqlCfg.Port) + + db, err := sql.Open("mysql", dsn) + if err != nil { + return fmt.Errorf("failed to connect to MySQL: %v", err) + } + c.db = db + defer db.Close() + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.collectStatus(ctx) + c.collectVariables(ctx) + c.collectProcesslist(ctx) + }() + + c.wg.Wait() + return nil +} + +func (c *MySQLCollector) collectStatus(ctx context.Context) error { + rows, err := c.db.QueryContext(ctx, "SHOW GLOBAL STATUS") + if err != nil { + return err + } + defer rows.Close() + + return c.writeResults(rows, c.prefix+"_status.txt") +} + +func (c *MySQLCollector) collectVariables(ctx context.Context) error { + rows, err := c.db.QueryContext(ctx, "SHOW GLOBAL VARIABLES") + if err != nil { + return err + } + defer rows.Close() + + return c.writeResults(rows, c.prefix+"_variables.txt") +} + +func (c *MySQLCollector) collectProcesslist(ctx context.Context) error { + rows, err := c.db.QueryContext(ctx, "SHOW FULL PROCESSLIST") + if err != nil { + return err + } + defer rows.Close() + + return c.writeProcesslist(rows, c.prefix+"_processlist.txt") +} + +func (c *MySQLCollector) writeResults(rows *sql.Rows, filename string) error { + f, err := os.Create(filepath.Join(c.outDir, filename)) + if err != nil { + return err + } + defer f.Close() + + for rows.Next() { + var name, value string + if err := rows.Scan(&name, &value); err != nil { + return err + } + fmt.Fprintf(f, "%s\t%s\n", name, value) + } + return rows.Err() +} + +func (c *MySQLCollector) writeProcesslist(rows *sql.Rows, filename string) error { + f, err := os.Create(filepath.Join(c.outDir, filename)) + if err != nil { + return err + } + defer f.Close() + + for rows.Next() { + var id, user, host, db, command, time, state, info sql.NullString + if err := rows.Scan(&id, &user, &host, &db, &command, &time, &state, &info); err != nil { + return err + } + fmt.Fprintf(f, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", + id.String, user.String, host.String, db.String, + command.String, time.String, state.String, info.String) + } + return rows.Err() +} + +type MySQLConfig struct { + Host string + Port int + User string + Password string + Socket string + DefaultsFile string +} + +func addMySQLFlags(cmd *cobra.Command, cfg map[string]interface{}) { + mysqlCfg := &MySQLConfig{} + cfg["mysql"] = mysqlCfg + + cmd.PersistentFlags().StringVar(&mysqlCfg.Host, "mysql-host", "", "MySQL host") + cmd.PersistentFlags().IntVar(&mysqlCfg.Port, "mysql-port", 3306, "MySQL port") + cmd.PersistentFlags().StringVar(&mysqlCfg.User, "mysql-user", "", "MySQL user") + cmd.PersistentFlags().StringVar(&mysqlCfg.Password, "mysql-password", "", "MySQL password") + cmd.PersistentFlags().StringVar(&mysqlCfg.Socket, "mysql-socket", "", "MySQL socket") + cmd.PersistentFlags().StringVar(&mysqlCfg.DefaultsFile, "mysql-defaults-file", "", "MySQL defaults file") +} + +func init() { + RegisterCollector(CollectorRegistration{ + Name: "mysql", + AddFlags: addMySQLFlags, + NewCollector: NewMySQLCollector, + }) +} diff --git a/src/go/pt-stalk/collect_mysql_test.go b/src/go/pt-stalk/collect_mysql_test.go new file mode 100644 index 000000000..805e853f9 --- /dev/null +++ b/src/go/pt-stalk/collect_mysql_test.go @@ -0,0 +1,48 @@ +package main + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +func TestMySQLCollector(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "pt-stalk-mysql-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + cfg := &Config{ + Dest: tmpDir, + Prefix: "test", + CollectorConfigs: map[string]interface{}{ + "mysql": &MySQLConfig{ + Host: "localhost", + Port: 3306, + User: "root", + }, + }, + } + + collector := NewMySQLCollector(cfg) + err = collector.Collect(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Verify MySQL specific files + expectedFiles := []string{ + "test_status.txt", + "test_variables.txt", + "test_processlist.txt", + } + + for _, file := range expectedFiles { + path := filepath.Join(tmpDir, file) + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Errorf("Expected file not found: %s", file) + } + } +} diff --git a/src/go/pt-stalk/collect_system.go b/src/go/pt-stalk/collect_system.go new file mode 100644 index 000000000..1518d9fba --- /dev/null +++ b/src/go/pt-stalk/collect_system.go @@ -0,0 +1,103 @@ +package main + +import ( + "context" + "os" + "path/filepath" + "sync" + + "github.com/spf13/cobra" +) + +type SystemCollector struct { + stalker *Stalker + outDir string + prefix string + wg sync.WaitGroup + systemCfg *SystemConfig +} + +func NewSystemCollector(config *Config) Collector { + systemCfg := config.CollectorConfigs["system"].(*SystemConfig) + return &SystemCollector{ + stalker: nil, + outDir: config.Dest, + prefix: config.Prefix, + systemCfg: systemCfg, + } +} + +func (c *SystemCollector) Collect(ctx context.Context) error { + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.collectDiskStats(ctx) + c.collectMemInfo(ctx) + c.collectLoadAvg(ctx) + if c.systemCfg.CollectGDB { + c.collectGDB(ctx) + } + if c.systemCfg.CollectTcpdump { + c.collectTcpdump(ctx) + } + }() + + c.wg.Wait() + return nil +} + +func (c *SystemCollector) collectDiskStats(ctx context.Context) error { + return c.readAndWriteFile("/proc/diskstats", c.prefix+"_diskstats.txt") +} + +func (c *SystemCollector) collectMemInfo(ctx context.Context) error { + return c.readAndWriteFile("/proc/meminfo", c.prefix+"_meminfo.txt") +} + +func (c *SystemCollector) collectLoadAvg(ctx context.Context) error { + return c.readAndWriteFile("/proc/loadavg", c.prefix+"_loadavg.txt") +} + +func (c *SystemCollector) collectGDB(ctx context.Context) error { + // GDB collection implementation + return nil +} + +func (c *SystemCollector) collectTcpdump(ctx context.Context) error { + // Tcpdump collection implementation + return nil +} + +func (c *SystemCollector) readAndWriteFile(srcPath, destName string) error { + content, err := os.ReadFile(srcPath) + if err != nil { + return err + } + + return os.WriteFile(filepath.Join(c.outDir, destName), content, 0644) +} + +type SystemConfig struct { + CollectGDB bool + CollectOProfile bool + CollectStrace bool + CollectTcpdump bool +} + +func addSystemFlags(cmd *cobra.Command, cfg map[string]interface{}) { + systemCfg := &SystemConfig{} + cfg["system"] = systemCfg + + cmd.PersistentFlags().BoolVar(&systemCfg.CollectGDB, "collect-gdb", false, "Collect GDB stacktraces") + cmd.PersistentFlags().BoolVar(&systemCfg.CollectOProfile, "collect-oprofile", false, "Collect OProfile data") + cmd.PersistentFlags().BoolVar(&systemCfg.CollectStrace, "collect-strace", false, "Collect strace data") + cmd.PersistentFlags().BoolVar(&systemCfg.CollectTcpdump, "collect-tcpdump", false, "Collect tcpdump data") +} + +func init() { + RegisterCollector(CollectorRegistration{ + Name: "system", + AddFlags: addSystemFlags, + NewCollector: NewSystemCollector, + }) +} diff --git a/src/go/pt-stalk/collect_system_test.go b/src/go/pt-stalk/collect_system_test.go new file mode 100644 index 000000000..220f8367d --- /dev/null +++ b/src/go/pt-stalk/collect_system_test.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +func TestSystemCollector(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "pt-stalk-system-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + cfg := &Config{ + Dest: tmpDir, + Prefix: "test", + CollectorConfigs: map[string]interface{}{ + "system": &SystemConfig{ + CollectGDB: true, + CollectTcpdump: true, + }, + }, + } + + collector := NewSystemCollector(cfg) + err = collector.Collect(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Verify system specific files + expectedFiles := []string{ + "test_diskstats.txt", + "test_meminfo.txt", + "test_loadavg.txt", + } + + for _, file := range expectedFiles { + path := filepath.Join(tmpDir, file) + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Errorf("Expected file not found: %s", file) + } + } + + // Verify optional collectors + if cfg.CollectorConfigs["system"].(*SystemConfig).CollectGDB { + gdbFile := filepath.Join(tmpDir, "test_gdb.txt") + if _, err := os.Stat(gdbFile); os.IsNotExist(err) { + t.Error("Expected GDB file not found") + } + } + + if cfg.CollectorConfigs["system"].(*SystemConfig).CollectTcpdump { + tcpdumpFile := filepath.Join(tmpDir, "test_tcpdump.cap") + if _, err := os.Stat(tcpdumpFile); os.IsNotExist(err) { + t.Error("Expected tcpdump file not found") + } + } +} diff --git a/src/go/pt-stalk/collect_test.go b/src/go/pt-stalk/collect_test.go new file mode 100644 index 000000000..53b6228b9 --- /dev/null +++ b/src/go/pt-stalk/collect_test.go @@ -0,0 +1,20 @@ +package main + +import ( + "testing" +) + +func TestCollectorRegistry(t *testing.T) { + // Test collector registration + if len(registeredCollectors) == 0 { + t.Error("No collectors registered") + } + + // Verify expected collectors are registered + expectedCollectors := []string{"mysql", "system"} + for _, name := range expectedCollectors { + if _, ok := registeredCollectors[name]; !ok { + t.Errorf("Expected collector %s not registered", name) + } + } +} diff --git a/src/go/pt-stalk/examples/plugins/sample.sh b/src/go/pt-stalk/examples/plugins/sample.sh new file mode 100644 index 000000000..c33665c1f --- /dev/null +++ b/src/go/pt-stalk/examples/plugins/sample.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +# This is a sample plugin for pt-stalk that demonstrates the available hooks + +before_stalk() { + echo "Starting stalker with:" + echo " Function: $PT_FUNCTION" + echo " Variable: $PT_VARIABLE" + echo " Threshold: $PT_THRESHOLD" +} + +before_collect() { + local prefix="$1" + echo "About to collect metrics with prefix: $prefix" + echo "Output directory: $PT_DEST/$prefix" +} + +after_collect() { + local prefix="$1" + echo "Finished collecting metrics with prefix: $prefix" + + # Example: Calculate total size of collected data + du -sh "$PT_DEST/$prefix" +} + +after_collect_sleep() { + echo "Finished sleeping after collection" +} + +after_interval_sleep() { + echo "Finished interval sleep" +} + +after_stalk() { + echo "Stalker finished" +} \ No newline at end of file diff --git a/src/go/pt-stalk/logger.go b/src/go/pt-stalk/logger.go new file mode 100644 index 000000000..8d35425bd --- /dev/null +++ b/src/go/pt-stalk/logger.go @@ -0,0 +1,73 @@ +package main + +import ( + "fmt" + "log" + "os" + "time" +) + +type Logger struct { + logger *log.Logger + verbose int + filename string +} + +func NewLogger(filename string, verbose int) (*Logger, error) { + var output *os.File + var err error + + if filename == "" || filename == "-" { + output = os.Stdout + } else { + output, err = os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open log file: %v", err) + } + } + + return &Logger{ + logger: log.New(output, "", 0), + verbose: verbose, + filename: filename, + }, nil +} + +func (l *Logger) formatMessage(level string, format string, args ...interface{}) string { + timestamp := time.Now().Format("2006-01-02 15:04:05") + message := fmt.Sprintf(format, args...) + return fmt.Sprintf("%s [%s] %s", timestamp, level, message) +} + +func (l *Logger) Error(format string, args ...interface{}) { + if l.verbose >= 0 { + l.logger.Println(l.formatMessage("ERROR", format, args...)) + } +} + +func (l *Logger) Warn(format string, args ...interface{}) { + if l.verbose >= 1 { + l.logger.Println(l.formatMessage("WARN", format, args...)) + } +} + +func (l *Logger) Info(format string, args ...interface{}) { + if l.verbose >= 2 { + l.logger.Println(l.formatMessage("INFO", format, args...)) + } +} + +func (l *Logger) Debug(format string, args ...interface{}) { + if l.verbose >= 3 { + l.logger.Println(l.formatMessage("DEBUG", format, args...)) + } +} + +func (l *Logger) Close() error { + if l.filename != "" && l.filename != "-" { + if logger, ok := l.logger.Writer().(*os.File); ok { + return logger.Close() + } + } + return nil +} diff --git a/src/go/pt-stalk/main.go b/src/go/pt-stalk/main.go new file mode 100644 index 000000000..f4da9bf10 --- /dev/null +++ b/src/go/pt-stalk/main.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "log" + "os" + + "github.com/spf13/cobra" +) + +type Config struct { + // Common configuration only + Collectors string + Interval int + RunTime int + Sleep int + SleepCollect int + Dest string + Prefix string + Log string + Pid string + Daemonize bool + RetentionTime int + RetentionCount int + RetentionSize int + DiskBytesFree int64 + DiskPctFree int + NotifyByEmail string + Verbose int + Plugin string + + // Collector configs + CollectorConfigs map[string]interface{} +} + +func newRootCmd() *cobra.Command { + rootCmd := &cobra.Command{ + Use: "pt-stalk", + Short: "MySQL and system metrics collector", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := cmd.Context().Value("config").(*Config) + logger := log.New(os.Stderr, "", log.LstdFlags) + + stalker, err := NewStalker(cfg, logger) + if err != nil { + return fmt.Errorf("failed to initialize stalker: %v", err) + } + + return stalker.Run(cmd.Context()) + }, + } + + cfg := &Config{ + CollectorConfigs: make(map[string]interface{}), + } + + rootCmd.PersistentFlags().StringVar(&cfg.Collectors, "collectors", "", "Comma-separated list of collectors to enable (mysql,system)") + rootCmd.PersistentFlags().IntVar(&cfg.Interval, "interval", 1, "Check interval in seconds") + rootCmd.PersistentFlags().IntVar(&cfg.RunTime, "run-time", 30, "How long to collect data in seconds") + rootCmd.PersistentFlags().IntVar(&cfg.Sleep, "sleep", 1, "Sleep time between collections in seconds") + rootCmd.PersistentFlags().StringVar(&cfg.Dest, "dest", "/var/lib/pt-stalk", "Destination directory for collected data") + rootCmd.PersistentFlags().StringVar(&cfg.Prefix, "prefix", "", "Filename prefix for samples") + rootCmd.PersistentFlags().StringVar(&cfg.Log, "log", "/var/log/pt-stalk.log", "Log file when daemonized") + rootCmd.PersistentFlags().StringVar(&cfg.Pid, "pid", "/var/run/pt-stalk.pid", "PID file") + rootCmd.PersistentFlags().BoolVar(&cfg.Daemonize, "daemonize", false, "Run as daemon") + rootCmd.PersistentFlags().IntVar(&cfg.RetentionTime, "retention-time", 30, "Days to retain samples") + rootCmd.PersistentFlags().IntVar(&cfg.RetentionCount, "retention-count", 0, "Number of samples to retain") + rootCmd.PersistentFlags().IntVar(&cfg.RetentionSize, "retention-size", 0, "Maximum size in MB to retain") + rootCmd.PersistentFlags().Int64Var(&cfg.DiskBytesFree, "disk-bytes-free", 100*1024*1024, "Minimum bytes free") + rootCmd.PersistentFlags().IntVar(&cfg.DiskPctFree, "disk-pct-free", 5, "Minimum percent free") + rootCmd.PersistentFlags().StringVar(&cfg.NotifyByEmail, "notify-by-email", "", "Email address for notifications") + rootCmd.PersistentFlags().IntVar(&cfg.Verbose, "verbose", 2, "Verbosity level (0-3)") + rootCmd.PersistentFlags().StringVar(&cfg.Plugin, "plugin", "", "Path to plugin script") + + // Add collector-specific flags + for _, reg := range registeredCollectors { + reg.AddFlags(rootCmd, cfg.CollectorConfigs) + } + + return rootCmd +} + +func main() { + cmd := newRootCmd() + if err := cmd.Execute(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} diff --git a/src/go/pt-stalk/main_test.go b/src/go/pt-stalk/main_test.go new file mode 100644 index 000000000..7b37f74ef --- /dev/null +++ b/src/go/pt-stalk/main_test.go @@ -0,0 +1,143 @@ +package main + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" +) + +func TestMainCommand(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "pt-stalk-main-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // Test configuration + cfg := &Config{ + Collectors: "mysql,system", + Interval: 1, + RunTime: 2, + Sleep: 1, + Dest: tmpDir, + Prefix: "test", + CollectorConfigs: map[string]interface{}{ + "mysql": &MySQLConfig{ + Host: "localhost", + Port: 3306, + User: os.Getenv("MYSQL_TEST_USER"), + Password: os.Getenv("MYSQL_TEST_PASS"), + }, + "system": &SystemConfig{ + CollectGDB: true, + }, + }, + } + + // Create context with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Set up command + cmd := newRootCmd() + cmd.SetContext(context.WithValue(ctx, "config", cfg)) + + // Execute command + if err := cmd.Execute(); err != nil && err != context.DeadlineExceeded { + t.Errorf("Command execution failed: %v", err) + } + + // Verify output directory structure + files, err := os.ReadDir(tmpDir) + if err != nil { + t.Fatalf("Failed to read output directory: %v", err) + } + + if len(files) == 0 { + t.Error("No output files were created") + } + + // Check for collector outputs + expectedFiles := map[string]bool{ + "mysql": false, + "system": false, + } + + for _, file := range files { + if file.IsDir() { + for collector := range expectedFiles { + if _, err := os.Stat(filepath.Join(tmpDir, file.Name(), collector)); !os.IsNotExist(err) { + expectedFiles[collector] = true + } + } + } + } + + for collector, found := range expectedFiles { + if !found { + t.Errorf("Expected output for %s collector not found", collector) + } + } +} + +func TestMainCommandFlags(t *testing.T) { + cmd := newRootCmd() + + // Test required flags + if cmd.Flag("collectors") == nil { + t.Error("Required flag 'collectors' not found") + } + + // Test MySQL collector flags + if cmd.Flag("mysql-host") == nil { + t.Error("MySQL flag 'mysql-host' not found") + } + if cmd.Flag("mysql-port") == nil { + t.Error("MySQL flag 'mysql-port' not found") + } + + // Test System collector flags + if cmd.Flag("collect-gdb") == nil { + t.Error("System flag 'collect-gdb' not found") + } + if cmd.Flag("collect-tcpdump") == nil { + t.Error("System flag 'collect-tcpdump' not found") + } +} + +func TestMainCommandValidation(t *testing.T) { + tests := []struct { + name string + args []string + wantErr bool + }{ + { + name: "no collectors", + args: []string{}, + wantErr: true, + }, + { + name: "invalid collector", + args: []string{"--collectors=invalid"}, + wantErr: true, + }, + { + name: "valid collectors", + args: []string{"--collectors=mysql,system"}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmd := newRootCmd() + cmd.SetArgs(tt.args) + err := cmd.Execute() + if (err != nil) != tt.wantErr { + t.Errorf("Command execution error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/src/go/pt-stalk/plugin.go b/src/go/pt-stalk/plugin.go new file mode 100644 index 000000000..3812a5763 --- /dev/null +++ b/src/go/pt-stalk/plugin.go @@ -0,0 +1,75 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" +) + +type Plugin struct { + path string + config *Config + env map[string]string +} + +func NewPlugin(path string, config *Config) (*Plugin, error) { + if path == "" { + return nil, nil // No plugin configured + } + + if _, err := os.Stat(path); err != nil { + return nil, fmt.Errorf("plugin not found: %v", err) + } + + return &Plugin{ + path: path, + config: config, + env: make(map[string]string), + }, nil +} + +func (p *Plugin) SetEnv(key, value string) { + p.env[key] = value +} + +func (p *Plugin) Execute(ctx context.Context) error { + if p == nil { + return nil // No plugin configured + } + + cmd := exec.CommandContext(ctx, p.path) + + // Set up environment + cmd.Env = os.Environ() // Start with current environment + for k, v := range p.env { + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v)) + } + + // Add standard plugin environment variables + cmd.Env = append(cmd.Env, + fmt.Sprintf("PT_DEST=%s", p.config.Dest), + fmt.Sprintf("PT_PREFIX=%s", p.config.Prefix), + fmt.Sprintf("PT_INTERVAL=%d", p.config.Interval), + fmt.Sprintf("PT_RUNTIME=%d", p.config.RunTime), + ) + + // Set up output + outputFile := filepath.Join(p.config.Dest, p.config.Prefix+"_plugin.txt") + output, err := os.Create(outputFile) + if err != nil { + return fmt.Errorf("failed to create plugin output file: %v", err) + } + defer output.Close() + + cmd.Stdout = output + cmd.Stderr = output + + // Execute plugin + if err := cmd.Run(); err != nil { + return fmt.Errorf("plugin execution failed: %v", err) + } + + return nil +} diff --git a/src/go/pt-stalk/plugin_test.go b/src/go/pt-stalk/plugin_test.go new file mode 100644 index 000000000..34e80191b --- /dev/null +++ b/src/go/pt-stalk/plugin_test.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +func TestPlugin(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "pt-stalk-plugin-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // Create test plugin + pluginContent := `#!/bin/sh +echo "Test plugin output" +echo "PT_DEST=$PT_DEST" +echo "PT_PREFIX=$PT_PREFIX" +` + pluginPath := filepath.Join(tmpDir, "test-plugin.sh") + if err := os.WriteFile(pluginPath, []byte(pluginContent), 0755); err != nil { + t.Fatal(err) + } + + cfg := &Config{ + Dest: tmpDir, + Prefix: "test", + Interval: 1, + RunTime: 30, + } + + // Test plugin creation + plugin, err := NewPlugin(pluginPath, cfg) + if err != nil { + t.Fatal(err) + } + + // Test plugin environment + plugin.SetEnv("TEST_VAR", "test_value") + + // Test plugin execution + err = plugin.Execute(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Verify plugin output + outputFile := filepath.Join(tmpDir, "test_plugin.txt") + content, err := os.ReadFile(outputFile) + if err != nil { + t.Fatal(err) + } + + if len(content) == 0 { + t.Error("Plugin output is empty") + } +} + +func TestPluginNotFound(t *testing.T) { + cfg := &Config{ + Dest: "/tmp", + Prefix: "test", + } + + _, err := NewPlugin("/nonexistent/plugin", cfg) + if err == nil { + t.Error("Expected error for nonexistent plugin") + } +} + +func TestNoPlugin(t *testing.T) { + cfg := &Config{ + Dest: "/tmp", + Prefix: "test", + } + + plugin, err := NewPlugin("", cfg) + if err != nil { + t.Fatal(err) + } + if plugin != nil { + t.Error("Expected nil plugin when no path provided") + } +} diff --git a/src/go/pt-stalk/stalk.go b/src/go/pt-stalk/stalk.go new file mode 100644 index 000000000..eff32fb5d --- /dev/null +++ b/src/go/pt-stalk/stalk.go @@ -0,0 +1,77 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "strings" + "time" +) + +type Stalker struct { + config *Config + logger *log.Logger + plugin *Plugin +} + +func NewStalker(config *Config, logger *log.Logger) (*Stalker, error) { + s := &Stalker{ + config: config, + logger: logger, + } + + // Initialize plugin if configured + if config.Plugin != "" { + plugin, err := NewPlugin(config.Plugin, config) + if err != nil { + return nil, fmt.Errorf("failed to initialize plugin: %v", err) + } + s.plugin = plugin + } + + return s, nil +} + +func (s *Stalker) Run(ctx context.Context) error { + // Create destination directory if it doesn't exist + if err := os.MkdirAll(s.config.Dest, 0755); err != nil { + return fmt.Errorf("failed to create destination directory: %v", err) + } + + // Main collection loop + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if err := s.runCollectors(ctx); err != nil { + s.logger.Printf("Collection error: %v", err) + } + time.Sleep(time.Duration(s.config.Sleep) * time.Second) + } + } +} + +func (s *Stalker) runCollectors(ctx context.Context) error { + // Run collectors + enabledCollectors := strings.Split(s.config.Collectors, ",") + for _, name := range enabledCollectors { + name = strings.TrimSpace(name) + if reg, ok := registeredCollectors[name]; ok { + collector := reg.NewCollector(s.config) + if err := collector.Collect(ctx); err != nil { + return fmt.Errorf("collector %s failed: %v", name, err) + } + } + } + + // Run plugin if configured + if s.plugin != nil { + if err := s.plugin.Execute(ctx); err != nil { + return fmt.Errorf("plugin execution failed: %v", err) + } + } + + return nil +} diff --git a/src/go/pt-stalk/stalk_test.go b/src/go/pt-stalk/stalk_test.go new file mode 100644 index 000000000..62ae5c914 --- /dev/null +++ b/src/go/pt-stalk/stalk_test.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "log" + "os" + "path/filepath" + "testing" + "time" +) + +func TestStalkerBasicOperation(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "pt-stalk-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + logger := log.New(os.Stderr, "", log.LstdFlags) + + cfg := &Config{ + Collectors: "mysql,system", + Interval: 1, + Sleep: 1, + Dest: tmpDir, + Prefix: "test", + CollectorConfigs: map[string]interface{}{ + "mysql": &MySQLConfig{ + Host: "localhost", + Port: 3306, + }, + "system": &SystemConfig{ + CollectGDB: true, + }, + }, + } + + stalker, err := NewStalker(cfg, logger) + if err != nil { + t.Fatal(err) + } + + // Run stalker with timeout + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err = stalker.Run(ctx) + if err != context.DeadlineExceeded { + t.Errorf("Expected deadline exceeded error, got: %v", err) + } + + // Verify destination directory was created + if _, err := os.Stat(tmpDir); os.IsNotExist(err) { + t.Error("Destination directory was not created") + } +} + +func TestStalkerWithPluginExecution(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "pt-stalk-plugin-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // Create test plugin + pluginContent := `#!/bin/sh +echo "Test plugin output" +` + pluginPath := filepath.Join(tmpDir, "test-plugin.sh") + if err := os.WriteFile(pluginPath, []byte(pluginContent), 0755); err != nil { + t.Fatal(err) + } + + logger := log.New(os.Stderr, "", log.LstdFlags) + + cfg := &Config{ + Collectors: "mysql", + Interval: 1, + Sleep: 1, + Dest: tmpDir, + Prefix: "test", + Plugin: pluginPath, + CollectorConfigs: map[string]interface{}{ + "mysql": &MySQLConfig{ + Host: "localhost", + Port: 3306, + }, + }, + } + + stalker, err := NewStalker(cfg, logger) + if err != nil { + t.Fatal(err) + } + + // Run stalker with timeout + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err = stalker.Run(ctx) + if err != context.DeadlineExceeded { + t.Errorf("Expected deadline exceeded error, got: %v", err) + } + + // Verify plugin output file exists + pluginOutput := filepath.Join(tmpDir, "test_plugin.txt") + if _, err := os.Stat(pluginOutput); os.IsNotExist(err) { + t.Error("Plugin output file was not created") + } +} diff --git a/src/go/pt-stalk/utils.go b/src/go/pt-stalk/utils.go new file mode 100644 index 000000000..d4e547ba0 --- /dev/null +++ b/src/go/pt-stalk/utils.go @@ -0,0 +1,169 @@ +package main + +import ( + "database/sql" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "strings" + "syscall" + "time" +) + +// Common size units for parsing +const ( + _ = iota + KB = 1 << (10 * iota) + MB + GB + TB +) + +// Regular expression for parsing size strings (e.g., "100M", "1.5G") +var sizeRegex = regexp.MustCompile(`^(\d+(?:\.\d+)?)\s*([kKmMgGtT])?[bB]?$`) + +// ParseSize converts a human-readable size string to bytes +func ParseSize(size string) (int64, error) { + matches := sizeRegex.FindStringSubmatch(strings.TrimSpace(size)) + if matches == nil { + return 0, fmt.Errorf("invalid size format: %s", size) + } + + value, err := strconv.ParseFloat(matches[1], 64) + if err != nil { + return 0, fmt.Errorf("invalid size value: %s", matches[1]) + } + + var multiplier int64 = 1 + if len(matches) > 2 && matches[2] != "" { + switch strings.ToUpper(matches[2]) { + case "K": + multiplier = KB + case "M": + multiplier = MB + case "G": + multiplier = GB + case "T": + multiplier = TB + } + } + + return int64(value * float64(multiplier)), nil +} + +// FormatSize converts bytes to a human-readable string +func FormatSize(bytes int64) string { + switch { + case bytes >= TB: + return fmt.Sprintf("%.2fTB", float64(bytes)/float64(TB)) + case bytes >= GB: + return fmt.Sprintf("%.2fGB", float64(bytes)/float64(GB)) + case bytes >= MB: + return fmt.Sprintf("%.2fMB", float64(bytes)/float64(MB)) + case bytes >= KB: + return fmt.Sprintf("%.2fKB", float64(bytes)/float64(KB)) + default: + return fmt.Sprintf("%dB", bytes) + } +} + +// GetDirectorySize calculates the total size of a directory +func GetDirectorySize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + size += info.Size() + } + return nil + }) + return size, err +} + +// IsProcessRunning checks if a process with the given PID is running +func IsProcessRunning(pid int) bool { + process, err := os.FindProcess(pid) + if err != nil { + return false + } + + // On Unix systems, FindProcess always succeeds, so we need to send + // signal 0 to actually check if the process exists + err = process.Signal(syscall.Signal(0)) + return err == nil +} + +// EnsureDirectoryExists creates a directory if it doesn't exist +func EnsureDirectoryExists(path string) error { + if _, err := os.Stat(path); os.IsNotExist(err) { + return os.MkdirAll(path, 0755) + } + return nil +} + +// ReadPIDFile reads a PID from a file +func ReadPIDFile(path string) (int, error) { + content, err := os.ReadFile(path) + if err != nil { + return 0, err + } + + pid, err := strconv.Atoi(strings.TrimSpace(string(content))) + if err != nil { + return 0, fmt.Errorf("invalid PID in file: %v", err) + } + + return pid, nil +} + +// WritePIDFile writes the current process PID to a file +func WritePIDFile(path string) error { + return os.WriteFile(path, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0644) +} + +// CleanOldFiles removes files older than the specified retention time +func CleanOldFiles(dir string, retentionDays int) error { + if retentionDays <= 0 { + return nil + } + + cutoff := time.Now().AddDate(0, 0, -retentionDays) + return filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() && info.ModTime().Before(cutoff) { + if err := os.Remove(path); err != nil { + return fmt.Errorf("failed to remove old file %s: %v", path, err) + } + } + return nil + }) +} + +// SendEmail sends a notification email +func SendEmail(to, subject, body string) error { + if to == "" { + return nil + } + + cmd := exec.Command("mail", "-s", subject, to) + cmd.Stdin = strings.NewReader(body) + return cmd.Run() +} + +// GetMySQLProcessID gets the process ID of the MySQL server +func GetMySQLProcessID(db *sql.DB) (int, error) { + var pid int + err := db.QueryRow("SELECT @@pid").Scan(&pid) + if err != nil { + return 0, fmt.Errorf("failed to get MySQL PID: %v", err) + } + return pid, nil +}