Skip to content

Conversation

@aronchick
Copy link
Contributor

@aronchick aronchick commented Oct 20, 2025

Summary

This pull request introduces a streaming_file input component designed for continuously reading from files with high reliability. It functions similarly to tail -F but adds critical features for production environments, including crash recovery, seamless log rotation handling, and at-least-once delivery guarantees.

Key Features

  • Configurable Polling Mode (NEW): By default uses polling-only mode (disable_fsnotify: true) which is more CPU-efficient for high-volume logs. Based on findings from OpenAI's observability team where inotify caused 35% CPU overhead from excessive fstat calls.
  • Persistent Position Metadata: Each message includes file position metadata (path, inode, byte_offset) that can be used with Bento's cache system for custom persistence.
  • Automatic File Rotation: Uses file inodes (on Unix systems) to reliably detect file rotations. It finishes reading the old file before automatically switching to the new one.
  • Truncation Handling: Detects when a file has been truncated and correctly resets its position to the beginning of the file.
  • No Artificial Timeout: Like tail -F, blocks indefinitely waiting for data (no 30-second timeout causing spurious errors).

Configuration Options

input:
  streaming_file:
    path: /var/log/app.log
    poll_interval: 1s         # How often to poll for new data (default: 1s)
    disable_fsnotify: true    # true = polling only (CPU efficient, default)
                              # false = use fsnotify (lower latency)
    max_buffer_size: 1000     # Max lines to buffer
    max_line_size: 1048576    # Max line size in bytes (1MB)

Performance Considerations

Mode Latency CPU Use Case
disable_fsnotify: true (default) ≤ poll_interval Low High-volume production logs
disable_fsnotify: false Near-instant Higher under load Low-volume, latency-critical

Why polling-only is the default: OpenAI's Fluent Bit team found that inotify fires on every write, causing:

  • 35% CPU time spent on fstat64 calls at high write rates
  • 30,000 CPU cores saved by switching to polling-only mode

Test Plan

Quick Test

# Build
go build -o ./target/bin/bento ./cmd/bento

# Create test file
echo "hello world" > /tmp/test.log

# Run bento (Terminal 1)
./target/bin/bento -c config/examples/streaming_file_tail.yaml

# Append lines (Terminal 2)
echo "new line 1" >> /tmp/test.log
echo "new line 2" >> /tmp/test.log

Run Unit Tests

go test -v ./internal/impl/io/... -run 'TestStreamingFile'

Notes for Reviewer

Design Highlights

  1. Dual Mode Operation: Supports both polling-only (default, CPU-efficient) and fsnotify (lower latency) modes
  2. Platform-Specific Inode Handling: Uses build tags for Unix vs other platforms
  3. Graceful Degradation: If fsnotify fails to initialize, automatically falls back to polling
  4. Concurrency Model: Event loop + buffered channel decouples file I/O from downstream consumption

@gregfurman
Copy link
Collaborator

Hey @aronchick 👋 Looks like you need to run go mod tidy and include the new go.mod in your changes. See failing CI step here.

I cloned your branch and seems it's that the go.opentelemetry.io/otel/metric v1.35.0 dependency is no longer an indirect dependency.

Copy link
Collaborator

@jem-davies jem-davies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIrst thanks for the effort on this PR - many people have asked about such a thing in the past so it will be a popular feature!

Sorry about the fact that it's taken a while before I have had a look - I think that we see a large PR and think that it will take a long time to review so it get's left for longer.
Perhaps we need to start giving large PRs a quicker but cursory glance - anyway I have done that and I think I can categorise my feedback into the 3 headers below...

Bento idioms

So a couple of things: the ShutdownTimeout & Debug fields on the StreamingFileInputConfig aren't these superfluous to shutdown_timeout & logger.level?

I think that it will be confusing as it currently is to include a debug & shutdown timeout separately to them?

One of the features of this new streaming_file input is that it writes position tracking data to disk - this would be at odds with the description of Bento on the landing page:

Bento processes and acknowledges messages using an in-process transaction model with no need for any disk persisted state ... guarantee at-least-once delivery even in the event of crashes, disk corruption ... This behaviour is the default and free of caveats ...

I am conflicted here - I see the need/want to track what has been sent already - and it's not impossible to create a stream to write to disk already in Bento ... but then on the other hand it seems with this change the above quote would no longer be true?

Reduce Scope & Explicitness

Currently we don't have much to offer with regards to consuming log data - other than to say use subprocess tail - which is limited. What is the minimum feature set that we need to offer for the log data use-case? Log rotation handling & truncation?

I think it better to look at the scope of the PR and reduce the component's status level to 'experimental'.

It's the explicitness too - what OS's can we handle? fsnotify says it' won't work properly with NFS and others etc. - I think it would be good to document this in the component's description.

LLM / AI

Has an LLM been used to create the PR / PR description? I ask without prejudice.
It's just looking at the code - it would appear (but I cannot be 100% sure) - that an LLM has been used at least for some portion of the PR.

  • code comments such as: NewStreamingFileInput creates a new streaming file input
  • json tags on StreamingFileInputConfig - why do we need these?
  • PR description mentions shutdown_timeout is configurable - but then isn't?

It's not a big deal if an LLM has been used but what I find is, it can lead to over-engineered solutions which are harder to understand as a PR reviewer, and also on my part a reluctance to accept the code as a liability for future potential github issues.


I guess that's my review - seems like we could do a big reduction in the total number of lines of code by:

  • making use of Bento's already existing internal approaches to Debug logging & shutdown
  • simply rethinking the minimum of what we need from a streaming_file input component

aronchick added a commit to aronchick/bento that referenced this pull request Dec 17, 2025
Addresses all reviewer feedback from PR warpstreamlabs#514:

- Remove ShutdownTimeout field (use Bento's shutdown_timeout)
- Remove Debug field (use Bento's logger.level)
- Change status to experimental (default)
- Improve Description with platform limitations and trade-offs
- Remove generic comments that just restate function names
- Remove json tags from StreamingFileInputConfig (not serialized)
- Remove unused HealthStatus struct
- Remove unused lastInode and lastSize fields
- Use context deadline for shutdown instead of custom timeout
@aronchick
Copy link
Contributor Author

@jem-davies thank you SO MUCH for the thorough analysis/feedback. I've taken them all and made updates in isolated PRs which are above and all squashed.

To your direct question, I wrote this code and then passed it all through several GPTs with the criteria to find race conditions, hotspots, etc and took many (but not all) of their feedback because they seemed appropriate and logical. And, for sure, i had it flesh out the PR description (which i'm generally terrible at).

I've also tried to reduce the scope of the PR as well - including making it experimental.

Would love your feedback!

@jem-davies
Copy link
Collaborator

OK great - thanks for such a quick reply!

I'll look to get a more detailed review done and then we should merge soon 😄

To your direct question, I wrote this code and then passed it all through several GPTs with the criteria to find race conditions, hotspots, etc and took many (but not all) of their feedback because they seemed appropriate and logical. And, for sure, i had it flesh out the PR description (which i'm generally terrible at).

OK - yeah I find that it is a good way to work with LLMs too 👍

@aronchick
Copy link
Contributor Author

If there was one thing I think we could possibly strip out to get to the MVP, it would be the state tracking. I feel like this is a nice feature but if your process really does crash mid-stream, then I think it's okay to start the tail at the start of the file again and just allow people to handle their dedupe on their own. State tracking seems really advanced but it felt like something that I would want to have

@aronchick
Copy link
Contributor Author

On reflection - I really think we should pull state maintenance out. It's quite complex and is a big leap over what tail -F offers.

Thoughts?

@jem-davies
Copy link
Collaborator

jem-davies commented Dec 18, 2025

On reflection - I really think we should pull state maintenance out. It's quite complex and is a big leap over what tail -F offers.

If you think that then I think we could take it out (at least for now) ... I can see why it would be useful.

For now maybe what we could try is to include the FilePosition struct as metadata, and then a stream pipeline could make use of a cache and implement it's own logic around discarding lines already read?

Also I think that more could come out so:

Metrics

Need to look at the metrics this input is exposing and if they conflict with ones the Bento stream engine already exposes listed here. Also I notice that in this PR we have a metricsFlusher() func - is this needed - could we take an approach similar to processor_metric.go?

EDIT: - unsure of another component that does actually emit it's own metrics - would consider removing them entirely at this time.

Logging

It's safe to assume that the *service.Logger is not nil so we can just remove nil checks around the logger:

		if logger != nil {
			logger.Warnf("Failed to load previous position: %v", err)
		}

and:

func (sfi *StreamingFileInput) logDebugf(format string, args ...interface{}) {
	if sfi.logger != nil {
		sfi.logger.Debugf(format, args...)
	}
}

I'll hold off on a finer detail review until we can decide on scope of the new streaming_file input.

@aronchick
Copy link
Contributor Author

aronchick commented Dec 18, 2025

Ok! I've done a really hard core stripping out :)

I think it's much cleaner now - and only 600 lines. Want to look again?

@jem-davies
Copy link
Collaborator

I think it's much cleaner now - and only 600 lines. Want to look again?

Ok great - I'll do a more finer review - and then we can look to merge 😄

@jem-davies
Copy link
Collaborator

Started to do a more in-depth review but fyi I went to setup a simple config:

log.txt:

hello Alice

config.yaml:

input:
  streaming_file:
    path: ./log.txt

output:
  stdout: {}

append to the log.txt

echo "hello Bob\n" >> log.txt
ERRO Failed to read message: context deadline exceeded  @service=bento label="" path=root.input

I will continue to review but just wanted to make sure that ☝️ is right - and I am not missing something obvious?

@aronchick
Copy link
Contributor Author

UH. No that definitely worked on my machine. Let me see if i can repro.

How are you setting up and using bento? Do you have a test app/framework you're using i can copy?

@aronchick
Copy link
Contributor Author

OK. I figured it out. I was putting a ReadTimeout in for the purposes of being defensive, but that's actually silly - tail doesn't have a read timeout, it listens forever.

So dropped it and all good. Let me do some more testing, and i'll check in.

@jem-davies
Copy link
Collaborator

OK. I figured it out. I was putting a ReadTimeout in for the purposes of being defensive, but that's actually silly - tail doesn't have a read timeout, it listens forever.

Ok it's working now 😄

@aronchick
Copy link
Contributor Author

I went a little nuts and created some scripts to test in a bunch of ways (one is super aggressive - 50% drop rate is expected (5.5M rows in 3/sec ;)).

Adds a new input component that monitors files for new content in real-time,
similar to 'tail -f'. Features include:

- Inode-based file identity tracking for rotation handling
- Optional fsnotify integration for efficient file watching
- Configurable polling intervals and batch sizes
- Automatic checkpoint management for resumption
- Cross-platform support (Unix inodes, Windows fallback)

Includes unit tests, example config, stress test script, and documentation.
aronchick and others added 8 commits December 22, 2025 18:33
Signed-off-by: Jem Davies <jemsot@gmail.com>
Remove Stable() call to use the default experimental status for this
component. This sets appropriate expectations for users as the feature
matures.

Also simplified the Summary text to be more concise.
Address reviewer feedback by documenting:

- Core features: log rotation and truncation handling
- Position tracking trade-off vs Bento's "no disk state" philosophy
- Platform limitations (NFS, fsnotify constraints)
- Delivery semantics clarification

Also removed Stable() to use default experimental status and
simplified the summary.
Signed-off-by: Jem Davies <jemsot@gmail.com>
- Replace manual shutdown coordination (stopCh, readLoopDone, connected,
  connMutex, bufferClosed, inFlightCount) with shutdown.Signaller
- Add configurable backoff for file rotation retries using
  retries.CommonRetryBackOffFields (50ms init, 1s max, 5s elapsed)
- Simplify Close() method from ~70 lines to 15 lines
- Add config lint rule for path validation
- Add streamingFileMetadataDescription() helper for consistent docs
- Use service.ErrEndOfInput instead of io.EOF
- Remove unnecessary inFlightCount tracking (handled at pipeline level)

Net reduction of 66 lines while adding configurable retry behavior.
@aronchick
Copy link
Contributor Author

i cut a bit more of the complexity thanks to @gregfurman's comments on my other PR #624 :)

@jem-davies
Copy link
Collaborator

Hey I am still looking at this - been a bit busy with the holidays

@jem-davies
Copy link
Collaborator

jem-davies commented Jan 8, 2026

I have taken a good look at various options and have ended up writing this: #645

@aronchick
Copy link
Contributor Author

gotcha! Was there anything i did that i could do better so you didn't have to rewrite? would you like me to close this?

@jem-davies
Copy link
Collaborator

jem-davies commented Jan 9, 2026

gotcha! Was there anything i did that i could do better so you didn't have to rewrite?

I was looking through various Go tailing options, because I noticed that your solution uses build tags for the inodeOf() func, which is better to avoid if you can imo. I looked at various options, fileBeat, mTail, fsnotify - as I did a while back... But then I also this time saw this video - Implementing 'tail -f' with Go. Which kind of explained a similar solution to what you had in this PR - except it was using os.SameFile rather than the inodes - which shunts the complexity of checking if a file is the same as before to the Go standard library.

I then had a go and implementing a simple Go tail program based on the video and it turned out to be only about 100 lines. Then I just implemented it as a Bento input really.

That said this PR isn't all that far away from the general approach but I feel that what pushed me to rewrite was: it appears overly complex - or in general it would be difficult to review.

The things that difficult to review (imo) :

Concurrency synchronisation

We have: a waitGroup, a couple of mutexs, closing of channels. All of these things can be used to coordinate concurrent code but using them all makes the code complicated.

In the approach in #645 - it separates the Bento input impl, from the tail - and we use a channel to communicate, and Bento's context to signal shutdown. (It might be that we do need to make changes in #645 because the fti.Close() isn't waiting for the watch goroutine to finish before we call fti.tail.file.Close())

Difficult to understand code

I felt that it was difficult to understand the approach - the flow is difficult to understand - a lot of functions all accessing the shared state and doing similar things like checking for rotation/truncation, draining data, and updating position in different combinations, which made it hard to trace what actually happens in any given scenario.

Litany of small things

There would be quite a few small things that would come out of code review - such as:

  • TestStreamingFileInput_FilePositionStruct - this test is just checking that struct field assignment works.
  • FilePosition struct - has json tags - but is never un/marshalled to json
  • Tracking the position manually rather than using t.file.Seek(0, io.SeekCurrent)

would you like me to close this?

Yes I think so - thanks for the effort on this - again this PR approach isn't all dissimilar from #645, and it's an input that people want and have asked for in the past!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants