From 60566981e5fd2f421d623ea92de01c31cdf071d1 Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 15 Jun 2025 15:53:25 +0200 Subject: [PATCH 01/14] WIP working encoding of a train clip to h264 mp4 video using gstreamer --- go.mod | 3 + go.sum | 6 ++ internal/pkg/stitch/stitch.go | 148 +++++++++++++++++++++++++++++++++- 3 files changed, 156 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 9f9e06b..8e35223 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,8 @@ require ( github.com/fatih/color v1.18.0 // indirect github.com/fatih/structtag v1.2.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-gst/go-glib v1.4.0 // indirect + github.com/go-gst/go-gst v1.4.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect @@ -68,6 +70,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-pointer v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517 // indirect github.com/mgechev/revive v1.9.0 // indirect diff --git a/go.sum b/go.sum index 5736ca1..46e00bf 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,10 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-gst/go-glib v1.4.0 h1:FB2uVfB0uqz7/M6EaDdWWlBZRQpvFAbWfL7drdw8lAE= +github.com/go-gst/go-glib v1.4.0/go.mod h1:GUIpWmkxQ1/eL+FYSjKpLDyTZx6Vgd9nNXt8dA31d5M= +github.com/go-gst/go-gst v1.4.0 h1:EikB43u4c3wc8d2RzlFRSfIGIXYzDy6Zls2vJqrG2BU= +github.com/go-gst/go-gst v1.4.0/go.mod h1:p8TLGtOxJLcrp6PCkTPdnanwWBxPZvYiHDbuSuwgO3c= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= @@ -143,6 +147,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-mjpeg v0.0.3 h1:0G/+KddrbI5Hnq83B11O1O4vP7Q6L9MsBu6aW71jhUM= github.com/mattn/go-mjpeg v0.0.3/go.mod h1:65z7Cj+u5y5K3B8Sy5NtrJFTWAhguGHs9FEkADdx6kE= +github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= +github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index b5aac3d..77a343e 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -4,11 +4,16 @@ import ( "errors" "fmt" "image" + "image/color" "image/draw" "image/gif" "math" "time" + "github.com/go-gst/go-glib/glib" + "github.com/go-gst/go-gst/gst" + "github.com/go-gst/go-gst/gst/app" + "github.com/go-gst/go-gst/gst/video" "github.com/mccutchen/palettor" "github.com/nfnt/resize" "github.com/rs/zerolog/log" @@ -184,6 +189,146 @@ func createGIF(seq sequence, stitched image.Image) (*gif.GIF, error) { return &g, nil } +func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { + // https://github.com/go-gst/go-gst/blob/v1.4.0/examples/appsrc/main.go + + // SW: x264enc + // HW on RPi: v4l2h264enc + // HW on PC AMD: va264enc + + // appsrc ! x264enc ! mp4mux ! filesync location=/tmp/test.mp4 + + gst.Init(nil) + + pipeline, err := gst.NewPipeline("") + if err != nil { + return nil, err + } + + encoder := "x264enc" + elems, err := gst.NewElementMany("appsrc", "videoconvert", encoder, "h264parse", "mp4mux" /*"autovideosink"*/, "filesink") + //elems, err := gst.NewElementMany("appsrc", "videoconvert", "autovideosink") + if err != nil { + return nil, err + } + + pipeline.AddMany(elems...) + gst.ElementLinkMany(elems...) + + src := app.SrcFromElement(elems[0]) + elems[5].SetArg("location", "/tmp/test.mp4") + //elems[4].SetArg("sync", "false") + + // Specify the format we want to provide as application into the pipeline + // by creating a video info with the given format and creating caps from it for the appsrc element. + videoInfo := video.NewInfo(). + WithFormat(video.FormatRGBA, 300, 300). /*uint(seq.frames[0].Bounds().Dx()), uint(seq.frames[0].Bounds().Dy()))*/ + WithFPS(gst.Fraction(2, 1)) // FIXME + + src.SetCaps(videoInfo.ToCaps()) + src.SetProperty("format", gst.FormatTime) + + // Initialize a frame counter + var i int + //palette := video.FormatRGB8P.Palette() + + // Since our appsrc element operates in pull mode (it asks us to provide data), + // we add a handler for the need-data callback and provide new data from there. + // In our case, we told gstreamer that we do 2 frames per second. While the + // buffers of all elements of the pipeline are still empty, this will be called + // a couple of times until all of them are filled. After this initial period, + // this handler will be called (on average) twice per second. + src.SetCallbacks(&app.SourceCallbacks{ + NeedDataFunc: func(self *app.Source, _ uint) { + + // If we've reached the end of the palette, end the stream. + if i == len(seq.frames) { + src.EndStream() + return + } + + log.Debug().Int("frame", i).Msg("Producing frame") + + // Create a buffer that can hold exactly one video RGBA frame. + buffer := gst.NewBufferWithSize(videoInfo.Size()) + + // For each frame we produce, we set the timestamp when it should be displayed + // The autovideosink will use this information to display the frame at the right time. + buffer.SetPresentationTimestamp(gst.ClockTime(time.Duration(i) * 33 * time.Millisecond)) // FIXME + + // Produce an image frame for this iteration. + pixels := seq.frames[i].(*image.RGBA).Pix + //pixels := produceImageFrame(palette[i]) + + // At this point, buffer is only a reference to an existing memory region somewhere. + // When we want to access its content, we have to map it while requesting the required + // mode of access (read, read/write). + // See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html + // + // There are convenience wrappers for building buffers directly from byte sequences as + // well. + buffer.Map(gst.MapWrite).WriteData(pixels) + buffer.Unmap() + + //buffer := gst.NewBufferFromBytes(pixels) + + // Push the buffer onto the pipeline. + self.PushBuffer(buffer) + + log.Debug().Msg("buffer pushed") + + i++ + }, + }) + + mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) + + pipeline.Ref() + defer pipeline.Unref() + + pipeline.SetState(gst.StatePlaying) + + // Retrieve the bus from the pipeline and add a watch function + pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { + log.Warn().Str("msg", msg.String()).Msg("gstreamer message") + switch msg.Type() { + case gst.MessageEOS: + //time.Sleep(5 * time.Second) + //pipeline.SetState(gst.StateNull) + //time.Sleep(5 * time.Second) + //mainLoop.Quit() + //time.Sleep(5 * time.Second) + //return false + } + //if err := handleMessage(msg); err != nil { + // fmt.Println(err) + // loop.Quit() + // return false + //} + return true + }) + + mainLoop.Run() + //mainLoop.RunError() + + return nil, errors.New("look at /tmp/test.mp4 :)") +} +func produceImageFrame(c color.Color) []uint8 { + width := 300 + height := 300 + upLeft := image.Point{0, 0} + lowRight := image.Point{width, height} + img := image.NewRGBA(image.Rectangle{upLeft, lowRight}) + + for x := 0; x < width; x++ { + for y := 0; y < height; y++ { + img.Set(x, y, c) + } + } + + return img.Pix +} + // fitAndStitch tries to stitch an image from a sequence. // Will first try to fit a constant acceleration speed model for smoothing. // Might modify seq (drops leading frames with no movement). @@ -241,7 +386,8 @@ func fitAndStitch(seq sequence, c Config) (*Train, error) { return nil, fmt.Errorf("unable to assemble image: %w", err) } - gif, err := createGIF(seq, img) + //gif, err := createGIF(seq, img) + gif, err := createH264(seq, img) if err != nil { panic(err) } From d1d51d31fa9f53938abe776c8ad6718ac916227c Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 15 Jun 2025 16:41:59 +0200 Subject: [PATCH 02/14] cleanup, still working --- internal/pkg/stitch/stitch.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index 77a343e..86256f7 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -206,8 +206,7 @@ func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { } encoder := "x264enc" - elems, err := gst.NewElementMany("appsrc", "videoconvert", encoder, "h264parse", "mp4mux" /*"autovideosink"*/, "filesink") - //elems, err := gst.NewElementMany("appsrc", "videoconvert", "autovideosink") + elems, err := gst.NewElementMany("appsrc", "videoconvert", encoder, "h264parse", "mp4mux", "filesink") if err != nil { return nil, err } @@ -217,13 +216,12 @@ func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { src := app.SrcFromElement(elems[0]) elems[5].SetArg("location", "/tmp/test.mp4") - //elems[4].SetArg("sync", "false") // Specify the format we want to provide as application into the pipeline // by creating a video info with the given format and creating caps from it for the appsrc element. videoInfo := video.NewInfo(). - WithFormat(video.FormatRGBA, 300, 300). /*uint(seq.frames[0].Bounds().Dx()), uint(seq.frames[0].Bounds().Dy()))*/ - WithFPS(gst.Fraction(2, 1)) // FIXME + WithFormat(video.FormatRGBA, uint(seq.frames[0].Bounds().Dx()), uint(seq.frames[0].Bounds().Dy())). + WithFPS(gst.Fraction(30, 1)) // FIXME src.SetCaps(videoInfo.ToCaps()) src.SetProperty("format", gst.FormatTime) From dc996be77777f14b54fdf3a698c0319a10013e3c Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 15 Jun 2025 22:21:46 +0200 Subject: [PATCH 03/14] UNTESTED store video clip properly --- cmd/trainbot/main.go | 24 +++++++++++++++---- internal/pkg/db/queries.go | 6 +++++ internal/pkg/stitch/auto.go | 1 + internal/pkg/stitch/stitch.go | 45 ++++++++++++++++++++++------------- 4 files changed, 55 insertions(+), 21 deletions(-) diff --git a/cmd/trainbot/main.go b/cmd/trainbot/main.go index ebe5712..0278b90 100644 --- a/cmd/trainbot/main.go +++ b/cmd/trainbot/main.go @@ -158,6 +158,7 @@ func detectTrainsForever(c config, trainsOut chan<- *stitch.Train) { MaxSpeedKPH: c.MaxSpeedKPH, MinLengthM: c.MinLengthM, MaxFrameCountPerSeq: c.MaxFrameCountPerSeq, + TempDestDir: c.DataDir, }) defer func() { train := stitcher.TryStitchAndReset() @@ -251,12 +252,25 @@ func processTrains(store upload.DataStore, dbx *sqlx.DB, trainsIn <-chan *stitch } // Dump GIF. - err = imutil.DumpGIF(store.GetBlobPath(dbTrain.GIFFileName()), train.GIF) - if err != nil { - log.Err(err).Send() - continue + if train.GIF != nil { + err = imutil.DumpGIF(store.GetBlobPath(dbTrain.GIFFileName()), train.GIF) + if err != nil { + log.Err(err).Send() + continue + } + log.Debug().Str("gifFileName", dbTrain.GIFFileName()).Msg("wrote GIF") + } + + // Move Train Clip + if train.TrainClip != nil { + filename := dbTrain.FileNameWithExt(filepath.Ext(train.TrainClip.Path)) + err = os.Rename(train.TrainClip.Path, store.GetBlobPath(filename)) + if err != nil { + log.Err(err).Send() + continue + } + log.Debug().Str("trainClipFileName", filename).Msg("moved train clip") } - log.Debug().Str("gifFileName", dbTrain.GIFFileName()).Msg("wrote GIF") id, err := db.InsertTrain(dbx, *train) if err != nil { diff --git a/internal/pkg/db/queries.go b/internal/pkg/db/queries.go index 81710c9..fbe2104 100644 --- a/internal/pkg/db/queries.go +++ b/internal/pkg/db/queries.go @@ -53,6 +53,12 @@ func (t *Train) GIFFileName() string { return fmt.Sprintf("train_%s.gif", tsString) } +// FileNameWithExt returns a file name for this train with the given file extension, derived from timestamp. +func (t Train) FileNameWithExt(extension string) string { + tsString := t.StartTS.Format(fileTSFormat) + return fmt.Sprintf("train_%s.%s", tsString, extension) +} + // ImgFileName returns the image file name for this train (derived from timestamp). func (t *Train) ImgFileName() string { tsString := t.StartTS.Format(fileTSFormat) diff --git a/internal/pkg/stitch/auto.go b/internal/pkg/stitch/auto.go index e6497ed..01c020c 100644 --- a/internal/pkg/stitch/auto.go +++ b/internal/pkg/stitch/auto.go @@ -29,6 +29,7 @@ type Config struct { MaxSpeedKPH float64 MinLengthM float64 MaxFrameCountPerSeq int + TempDestDir string } func (c *Config) minPxPerFrame(framePeriodS float64) int { diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index 86256f7..386f099 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -8,6 +8,7 @@ import ( "image/draw" "image/gif" "math" + "os" "time" "github.com/go-gst/go-glib/glib" @@ -119,8 +120,9 @@ type Train struct { Conf Config - Image *image.RGBA `json:"-"` - GIF *gif.GIF `json:"-"` + Image *image.RGBA `json:"-"` + GIF *gif.GIF `json:"-"` + TrainClip *TrainClip `json:"-"` } // LengthM returns the absolute length in m. @@ -189,23 +191,28 @@ func createGIF(seq sequence, stitched image.Image) (*gif.GIF, error) { return &g, nil } -func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { +func createH264(seq sequence, dest_dir string) (*TrainClip, error) { // https://github.com/go-gst/go-gst/blob/v1.4.0/examples/appsrc/main.go - // SW: x264enc - // HW on RPi: v4l2h264enc - // HW on PC AMD: va264enc - - // appsrc ! x264enc ! mp4mux ! filesync location=/tmp/test.mp4 - gst.Init(nil) + dest_file, err := os.CreateTemp(dest_dir, ".temp-createH264.mp4.*") + if err != nil { + return nil, err + } + dest_file.Close() + dest_path := dest_file.Name() + pipeline, err := gst.NewPipeline("") if err != nil { return nil, err } + // SW: x264enc + // HW on RPi: v4l2h264enc + // HW on PC AMD: va264enc encoder := "x264enc" + elems, err := gst.NewElementMany("appsrc", "videoconvert", encoder, "h264parse", "mp4mux", "filesink") if err != nil { return nil, err @@ -215,7 +222,7 @@ func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { gst.ElementLinkMany(elems...) src := app.SrcFromElement(elems[0]) - elems[5].SetArg("location", "/tmp/test.mp4") + elems[5].SetArg("location", dest_path) // Specify the format we want to provide as application into the pipeline // by creating a video info with the given format and creating caps from it for the appsrc element. @@ -241,11 +248,12 @@ func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { // If we've reached the end of the palette, end the stream. if i == len(seq.frames) { + log.Debug().Msg("all frames pushed to gstreamer appsrc") src.EndStream() return } - log.Debug().Int("frame", i).Msg("Producing frame") + log.Trace().Int("frame", i).Msg("Producing frame") // Create a buffer that can hold exactly one video RGBA frame. buffer := gst.NewBufferWithSize(videoInfo.Size()) @@ -273,7 +281,7 @@ func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { // Push the buffer onto the pipeline. self.PushBuffer(buffer) - log.Debug().Msg("buffer pushed") + log.Trace().Msg("buffer pushed") i++ }, @@ -288,7 +296,7 @@ func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { // Retrieve the bus from the pipeline and add a watch function pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { - log.Warn().Str("msg", msg.String()).Msg("gstreamer message") + log.Debug().Str("msg", msg.String()).Msg("gstreamer message") switch msg.Type() { case gst.MessageEOS: //time.Sleep(5 * time.Second) @@ -309,7 +317,7 @@ func createH264(seq sequence, stitched image.Image) (*gif.GIF, error) { mainLoop.Run() //mainLoop.RunError() - return nil, errors.New("look at /tmp/test.mp4 :)") + return &TrainClip{Path: dest_path}, nil } func produceImageFrame(c color.Color) []uint8 { width := 300 @@ -327,6 +335,10 @@ func produceImageFrame(c color.Color) []uint8 { return img.Pix } +type TrainClip struct { + Path string +} + // fitAndStitch tries to stitch an image from a sequence. // Will first try to fit a constant acceleration speed model for smoothing. // Might modify seq (drops leading frames with no movement). @@ -385,7 +397,7 @@ func fitAndStitch(seq sequence, c Config) (*Train, error) { } //gif, err := createGIF(seq, img) - gif, err := createH264(seq, img) + trainClip, err := createH264(seq, c.TempDestDir) if err != nil { panic(err) } @@ -399,6 +411,7 @@ func fitAndStitch(seq sequence, c Config) (*Train, error) { -a, c, img, - gif, + nil, + trainClip, }, nil } From cf4bf47af350c999ff32b5c813a814bcacb5aebf Mon Sep 17 00:00:00 2001 From: clonejo Date: Tue, 17 Jun 2025 19:41:48 +0200 Subject: [PATCH 04/14] UNTESTED correct frame timestamps --- internal/pkg/stitch/stitch.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index 386f099..53737e8 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -233,9 +233,7 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { src.SetCaps(videoInfo.ToCaps()) src.SetProperty("format", gst.FormatTime) - // Initialize a frame counter - var i int - //palette := video.FormatRGB8P.Palette() + frame_no := 0 // Since our appsrc element operates in pull mode (it asks us to provide data), // we add a handler for the need-data callback and provide new data from there. @@ -247,23 +245,23 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { NeedDataFunc: func(self *app.Source, _ uint) { // If we've reached the end of the palette, end the stream. - if i == len(seq.frames) { + if frame_no == len(seq.frames) { log.Debug().Msg("all frames pushed to gstreamer appsrc") src.EndStream() return } - log.Trace().Int("frame", i).Msg("Producing frame") + log.Trace().Int("frame", frame_no).Msg("Producing frame") // Create a buffer that can hold exactly one video RGBA frame. buffer := gst.NewBufferWithSize(videoInfo.Size()) // For each frame we produce, we set the timestamp when it should be displayed // The autovideosink will use this information to display the frame at the right time. - buffer.SetPresentationTimestamp(gst.ClockTime(time.Duration(i) * 33 * time.Millisecond)) // FIXME + buffer.SetPresentationTimestamp(gst.ClockTime(seq.startTS.Sub(seq.ts[frame_no]))) // Produce an image frame for this iteration. - pixels := seq.frames[i].(*image.RGBA).Pix + pixels := seq.frames[frame_no].(*image.RGBA).Pix //pixels := produceImageFrame(palette[i]) // At this point, buffer is only a reference to an existing memory region somewhere. @@ -283,7 +281,7 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { log.Trace().Msg("buffer pushed") - i++ + frame_no++ }, }) From 21f53154489b9e948c2e04edce21fbdd298ee980 Mon Sep 17 00:00:00 2001 From: clonejo Date: Fri, 10 Nov 2023 22:54:08 +0100 Subject: [PATCH 05/14] allow higher resolution --- cmd/trainbot/main.go | 2 +- internal/pkg/stitch/stitch.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/trainbot/main.go b/cmd/trainbot/main.go index 0278b90..1fdddb1 100644 --- a/cmd/trainbot/main.go +++ b/cmd/trainbot/main.go @@ -77,7 +77,7 @@ func (c *config) mustOpenDB() *sqlx.DB { const ( rectSizeMin = 100 - rectSizeMax = 500 + rectSizeMax = 1000 failedFramesMax = 50 diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index 53737e8..1410a16 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -22,7 +22,7 @@ import ( ) const ( - maxMemoryMB = 1024 * 1024 * 50 + maxMemoryMB = 1024 * 1024 * 50 * 10 ) func isign(x int) int { From ef8380b4a410e4e0db8db5088342a21f327036bd Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 20:26:15 +0200 Subject: [PATCH 06/14] fix segfault, image sometimes was a SubImage --- internal/pkg/stitch/stitch.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index 1410a16..eb18b30 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -19,6 +19,7 @@ import ( "github.com/nfnt/resize" "github.com/rs/zerolog/log" "jo-m.ch/go/trainbot/internal/pkg/prometheus" + "jo-m.ch/go/trainbot/pkg/imutil" ) const ( @@ -261,7 +262,9 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { buffer.SetPresentationTimestamp(gst.ClockTime(seq.startTS.Sub(seq.ts[frame_no]))) // Produce an image frame for this iteration. - pixels := seq.frames[frame_no].(*image.RGBA).Pix + // We can't write the pixels from image directly, since it may be a SubImage + frameRGBA := imutil.ToRGBA(seq.frames[frame_no]) + pixels := frameRGBA.Pix //pixels := produceImageFrame(palette[i]) // At this point, buffer is only a reference to an existing memory region somewhere. @@ -271,6 +274,10 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { // // There are convenience wrappers for building buffers directly from byte sequences as // well. + if len(pixels) > len(buffer.Bytes()) { + imutil.Dump("too_long.png", seq.frames[frame_no]) + panic(fmt.Errorf("image frame pixels are too long for gstreamer buffer: %v > %v", len(pixels), len(buffer.Bytes()))) + } buffer.Map(gst.MapWrite).WriteData(pixels) buffer.Unmap() From e28094da1b9548d24bc048102d727a6375c0bc6e Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 20:27:11 +0200 Subject: [PATCH 07/14] start video at t=0 --- internal/pkg/stitch/stitch.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index eb18b30..e11efb6 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -235,6 +235,7 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { src.SetProperty("format", gst.FormatTime) frame_no := 0 + startTS := seq.ts[0] // Since our appsrc element operates in pull mode (it asks us to provide data), // we add a handler for the need-data callback and provide new data from there. @@ -259,7 +260,7 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { // For each frame we produce, we set the timestamp when it should be displayed // The autovideosink will use this information to display the frame at the right time. - buffer.SetPresentationTimestamp(gst.ClockTime(seq.startTS.Sub(seq.ts[frame_no]))) + buffer.SetPresentationTimestamp(gst.ClockTime(startTS.Sub(seq.ts[frame_no]))) // Produce an image frame for this iteration. // We can't write the pixels from image directly, since it may be a SubImage From 0464acaac50972e15f12ebf0cad7dcfbeab980da Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 20:31:28 +0200 Subject: [PATCH 08/14] fix negative PTS d'oh! --- internal/pkg/stitch/stitch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index e11efb6..1712048 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -260,7 +260,7 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { // For each frame we produce, we set the timestamp when it should be displayed // The autovideosink will use this information to display the frame at the right time. - buffer.SetPresentationTimestamp(gst.ClockTime(startTS.Sub(seq.ts[frame_no]))) + buffer.SetPresentationTimestamp(gst.ClockTime(seq.ts[frame_no].Sub(startTS))) // Produce an image frame for this iteration. // We can't write the pixels from image directly, since it may be a SubImage From 6f53110378790c40812c248105e075265736c978 Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 20:41:07 +0200 Subject: [PATCH 09/14] handle multiple trains in one input video --- internal/pkg/stitch/stitch.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index 1712048..bbf0649 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -305,13 +305,11 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { log.Debug().Str("msg", msg.String()).Msg("gstreamer message") switch msg.Type() { case gst.MessageEOS: - //time.Sleep(5 * time.Second) - //pipeline.SetState(gst.StateNull) - //time.Sleep(5 * time.Second) - //mainLoop.Quit() - //time.Sleep(5 * time.Second) - //return false + pipeline.SetState(gst.StateNull) + mainLoop.Quit() + return false } + // FIXME: abort on gstreamer errors //if err := handleMessage(msg); err != nil { // fmt.Println(err) // loop.Quit() From 9ed9182665a9a3447bdf0e0a3bf081b519a25bae Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 20:56:27 +0200 Subject: [PATCH 10/14] create both .mp4 and .gif for now --- internal/pkg/stitch/stitch.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index bbf0649..c1e782a 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -400,7 +400,11 @@ func fitAndStitch(seq sequence, c Config) (*Train, error) { return nil, fmt.Errorf("unable to assemble image: %w", err) } - //gif, err := createGIF(seq, img) + gif, err := createGIF(seq, img) + if err != nil { + panic(err) + } + trainClip, err := createH264(seq, c.TempDestDir) if err != nil { panic(err) @@ -415,7 +419,7 @@ func fitAndStitch(seq sequence, c Config) (*Train, error) { -a, c, img, - nil, + gif, trainClip, }, nil } From 2a918da6e0a45bcd0bcf9590fefb4a9c9428f151 Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 20:59:35 +0200 Subject: [PATCH 11/14] fixup! UNTESTED store video clip properly --- internal/pkg/db/queries.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/pkg/db/queries.go b/internal/pkg/db/queries.go index fbe2104..e32b601 100644 --- a/internal/pkg/db/queries.go +++ b/internal/pkg/db/queries.go @@ -49,8 +49,12 @@ type Train struct { // GIFFileName returns the GIF file name for this train (derived from timestamp). func (t *Train) GIFFileName() string { - tsString := t.StartTS.Format(fileTSFormat) - return fmt.Sprintf("train_%s.gif", tsString) + return t.FileNameWithExt("gif") +} + +// ImgFileName returns the image file name for this train (derived from timestamp). +func (t *Train) ImgFileName() string { + return t.FileNameWithExt("jpg") } // FileNameWithExt returns a file name for this train with the given file extension, derived from timestamp. @@ -59,12 +63,6 @@ func (t Train) FileNameWithExt(extension string) string { return fmt.Sprintf("train_%s.%s", tsString, extension) } -// ImgFileName returns the image file name for this train (derived from timestamp). -func (t *Train) ImgFileName() string { - tsString := t.StartTS.Format(fileTSFormat) - return fmt.Sprintf("train_%s.jpg", tsString) -} - // GetNextUpload returns the next train sighting to upload from the database. func GetNextUpload(db *sqlx.DB) (*Train, error) { const q = ` From 25e152f29bdb577e6717ad83ea664e2776afc37c Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 21:07:22 +0200 Subject: [PATCH 12/14] fix .mp4 file names --- cmd/trainbot/main.go | 2 +- internal/pkg/stitch/stitch.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/trainbot/main.go b/cmd/trainbot/main.go index 1fdddb1..9e4f2df 100644 --- a/cmd/trainbot/main.go +++ b/cmd/trainbot/main.go @@ -263,7 +263,7 @@ func processTrains(store upload.DataStore, dbx *sqlx.DB, trainsIn <-chan *stitch // Move Train Clip if train.TrainClip != nil { - filename := dbTrain.FileNameWithExt(filepath.Ext(train.TrainClip.Path)) + filename := dbTrain.FileNameWithExt(train.TrainClip.Ext) err = os.Rename(train.TrainClip.Path, store.GetBlobPath(filename)) if err != nil { log.Err(err).Send() diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index c1e782a..ff55cb1 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -197,7 +197,8 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { gst.Init(nil) - dest_file, err := os.CreateTemp(dest_dir, ".temp-createH264.mp4.*") + file_extension := "mp4" + dest_file, err := os.CreateTemp(dest_dir, fmt.Sprintf(".temp-createH264.%s.*", file_extension)) if err != nil { return nil, err } @@ -321,7 +322,7 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { mainLoop.Run() //mainLoop.RunError() - return &TrainClip{Path: dest_path}, nil + return &TrainClip{Path: dest_path, Ext: "mp4"}, nil } func produceImageFrame(c color.Color) []uint8 { width := 300 @@ -341,6 +342,7 @@ func produceImageFrame(c color.Color) []uint8 { type TrainClip struct { Path string + Ext string } // fitAndStitch tries to stitch an image from a sequence. From 812d8d82db6d5ec394199af9547e0cb87e773a4b Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 22 Jun 2025 21:12:58 +0200 Subject: [PATCH 13/14] gstreamer debugging hint --- internal/pkg/stitch/stitch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index ff55cb1..06d5550 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -194,6 +194,8 @@ func createGIF(seq sequence, stitched image.Image) (*gif.GIF, error) { func createH264(seq sequence, dest_dir string) (*TrainClip, error) { // https://github.com/go-gst/go-gst/blob/v1.4.0/examples/appsrc/main.go + // + // to debug gstreamer, use the GST_DEBUG and (optionally) GST_DEBUG_FILE env vars gst.Init(nil) From c35bf71ee06ce39d53681eaa2de6a60104b86eb9 Mon Sep 17 00:00:00 2001 From: clonejo Date: Sun, 13 Jul 2025 01:44:50 +0200 Subject: [PATCH 14/14] reimplement with ffmpeg no per-frame PTS yet --- go.mod | 3 - go.sum | 6 -- internal/pkg/stitch/stitch.go | 152 +++++++++------------------------- 3 files changed, 39 insertions(+), 122 deletions(-) diff --git a/go.mod b/go.mod index 8e35223..9f9e06b 100644 --- a/go.mod +++ b/go.mod @@ -53,8 +53,6 @@ require ( github.com/fatih/color v1.18.0 // indirect github.com/fatih/structtag v1.2.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/go-gst/go-glib v1.4.0 // indirect - github.com/go-gst/go-gst v1.4.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect @@ -70,7 +68,6 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mattn/go-pointer v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517 // indirect github.com/mgechev/revive v1.9.0 // indirect diff --git a/go.sum b/go.sum index 46e00bf..5736ca1 100644 --- a/go.sum +++ b/go.sum @@ -67,10 +67,6 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/go-gst/go-glib v1.4.0 h1:FB2uVfB0uqz7/M6EaDdWWlBZRQpvFAbWfL7drdw8lAE= -github.com/go-gst/go-glib v1.4.0/go.mod h1:GUIpWmkxQ1/eL+FYSjKpLDyTZx6Vgd9nNXt8dA31d5M= -github.com/go-gst/go-gst v1.4.0 h1:EikB43u4c3wc8d2RzlFRSfIGIXYzDy6Zls2vJqrG2BU= -github.com/go-gst/go-gst v1.4.0/go.mod h1:p8TLGtOxJLcrp6PCkTPdnanwWBxPZvYiHDbuSuwgO3c= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= @@ -147,8 +143,6 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-mjpeg v0.0.3 h1:0G/+KddrbI5Hnq83B11O1O4vP7Q6L9MsBu6aW71jhUM= github.com/mattn/go-mjpeg v0.0.3/go.mod h1:65z7Cj+u5y5K3B8Sy5NtrJFTWAhguGHs9FEkADdx6kE= -github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= -github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= diff --git a/internal/pkg/stitch/stitch.go b/internal/pkg/stitch/stitch.go index 06d5550..6b464fb 100644 --- a/internal/pkg/stitch/stitch.go +++ b/internal/pkg/stitch/stitch.go @@ -7,17 +7,15 @@ import ( "image/color" "image/draw" "image/gif" + "io" "math" "os" "time" - "github.com/go-gst/go-glib/glib" - "github.com/go-gst/go-gst/gst" - "github.com/go-gst/go-gst/gst/app" - "github.com/go-gst/go-gst/gst/video" "github.com/mccutchen/palettor" "github.com/nfnt/resize" "github.com/rs/zerolog/log" + ffmpeg "github.com/u2takey/ffmpeg-go" "jo-m.ch/go/trainbot/internal/pkg/prometheus" "jo-m.ch/go/trainbot/pkg/imutil" ) @@ -193,12 +191,6 @@ func createGIF(seq sequence, stitched image.Image) (*gif.GIF, error) { } func createH264(seq sequence, dest_dir string) (*TrainClip, error) { - // https://github.com/go-gst/go-gst/blob/v1.4.0/examples/appsrc/main.go - // - // to debug gstreamer, use the GST_DEBUG and (optionally) GST_DEBUG_FILE env vars - - gst.Init(nil) - file_extension := "mp4" dest_file, err := os.CreateTemp(dest_dir, fmt.Sprintf(".temp-createH264.%s.*", file_extension)) if err != nil { @@ -207,124 +199,58 @@ func createH264(seq sequence, dest_dir string) (*TrainClip, error) { dest_file.Close() dest_path := dest_file.Name() - pipeline, err := gst.NewPipeline("") - if err != nil { - return nil, err - } + //// SW: x264enc + //// HW on RPi: v4l2h264enc + //// HW on PC AMD: va264enc + //encoder := "x264enc" - // SW: x264enc - // HW on RPi: v4l2h264enc - // HW on PC AMD: va264enc - encoder := "x264enc" + // WithFPS(gst.Fraction(30, 1)) // FIXME - elems, err := gst.NewElementMany("appsrc", "videoconvert", encoder, "h264parse", "mp4mux", "filesink") - if err != nil { - return nil, err - } + reader, writer := io.Pipe() + input_args := ffmpeg.KwArgs{"format": "rawvideo", "pix_fmt": "rgba", "video_size": fmt.Sprintf("%dx%d", uint(seq.frames[0].Bounds().Dx()), uint(seq.frames[0].Bounds().Dy()))} + input := ffmpeg.Input("pipe:", input_args).WithInput(reader) + output := input.Output(dest_path, ffmpeg.KwArgs{"format": "mp4"}).OverWriteOutput() - pipeline.AddMany(elems...) - gst.ElementLinkMany(elems...) - - src := app.SrcFromElement(elems[0]) - elems[5].SetArg("location", dest_path) - - // Specify the format we want to provide as application into the pipeline - // by creating a video info with the given format and creating caps from it for the appsrc element. - videoInfo := video.NewInfo(). - WithFormat(video.FormatRGBA, uint(seq.frames[0].Bounds().Dx()), uint(seq.frames[0].Bounds().Dy())). - WithFPS(gst.Fraction(30, 1)) // FIXME - - src.SetCaps(videoInfo.ToCaps()) - src.SetProperty("format", gst.FormatTime) - - frame_no := 0 - startTS := seq.ts[0] - - // Since our appsrc element operates in pull mode (it asks us to provide data), - // we add a handler for the need-data callback and provide new data from there. - // In our case, we told gstreamer that we do 2 frames per second. While the - // buffers of all elements of the pipeline are still empty, this will be called - // a couple of times until all of them are filled. After this initial period, - // this handler will be called (on average) twice per second. - src.SetCallbacks(&app.SourceCallbacks{ - NeedDataFunc: func(self *app.Source, _ uint) { - - // If we've reached the end of the palette, end the stream. - if frame_no == len(seq.frames) { - log.Debug().Msg("all frames pushed to gstreamer appsrc") - src.EndStream() - return - } + //startTS := seq.ts[0] + errChan := make(chan error) + go func() { + for frame_no := range len(seq.frames) { log.Trace().Int("frame", frame_no).Msg("Producing frame") - // Create a buffer that can hold exactly one video RGBA frame. - buffer := gst.NewBufferWithSize(videoInfo.Size()) - - // For each frame we produce, we set the timestamp when it should be displayed - // The autovideosink will use this information to display the frame at the right time. - buffer.SetPresentationTimestamp(gst.ClockTime(seq.ts[frame_no].Sub(startTS))) + // // For each frame we produce, we set the timestamp when it should be displayed + // // The autovideosink will use this information to display the frame at the right time. + // buffer.SetPresentationTimestamp(gst.ClockTime(seq.ts[frame_no].Sub(startTS))) // Produce an image frame for this iteration. // We can't write the pixels from image directly, since it may be a SubImage + // TODO: we could skip this copy, by writing from the SubImage directly to the writer frameRGBA := imutil.ToRGBA(seq.frames[frame_no]) pixels := frameRGBA.Pix - //pixels := produceImageFrame(palette[i]) - - // At this point, buffer is only a reference to an existing memory region somewhere. - // When we want to access its content, we have to map it while requesting the required - // mode of access (read, read/write). - // See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html - // - // There are convenience wrappers for building buffers directly from byte sequences as - // well. - if len(pixels) > len(buffer.Bytes()) { - imutil.Dump("too_long.png", seq.frames[frame_no]) - panic(fmt.Errorf("image frame pixels are too long for gstreamer buffer: %v > %v", len(pixels), len(buffer.Bytes()))) - } - buffer.Map(gst.MapWrite).WriteData(pixels) - buffer.Unmap() - - //buffer := gst.NewBufferFromBytes(pixels) - - // Push the buffer onto the pipeline. - self.PushBuffer(buffer) + _, err = writer.Write(pixels) + if err != nil { + errChan <- err + } log.Trace().Msg("buffer pushed") + } + log.Debug().Msg("all frames pushed to ffmpeg pipe") + writer.Close() + errChan <- nil + }() - frame_no++ - }, - }) - - mainLoop := glib.NewMainLoop(glib.MainContextDefault(), false) - - pipeline.Ref() - defer pipeline.Unref() - - pipeline.SetState(gst.StatePlaying) + log.Trace().Msg("Waiting for ffmpeg to finish ...") + err = output.WithErrorOutput(os.Stderr).Run() + if err != nil { + return nil, err + } + log.Trace().Msg("Waiting for raw frame writer to return ...") + err = <-errChan + if err != nil { + return nil, err + } - // Retrieve the bus from the pipeline and add a watch function - pipeline.GetPipelineBus().AddWatch(func(msg *gst.Message) bool { - log.Debug().Str("msg", msg.String()).Msg("gstreamer message") - switch msg.Type() { - case gst.MessageEOS: - pipeline.SetState(gst.StateNull) - mainLoop.Quit() - return false - } - // FIXME: abort on gstreamer errors - //if err := handleMessage(msg); err != nil { - // fmt.Println(err) - // loop.Quit() - // return false - //} - return true - }) - - mainLoop.Run() - //mainLoop.RunError() - - return &TrainClip{Path: dest_path, Ext: "mp4"}, nil + return &TrainClip{Path: dest_path, Ext: file_extension}, nil } func produceImageFrame(c color.Color) []uint8 { width := 300