Skip to content

Conversation

@mperktold
Copy link

With this change, whenever the input source emit, the sampler is pulled right after passing the data to the sink.
This enables things like pulling the source one second after it emitted an item, by simply creating a pullable asyncronous sampler.

The test case simulates such a behavior.

The sampler adds pulls to a list samplerPulls, and subscribes to a separate samplerSubject.

Whenever the pipe emits a value, the test takes an item out of samplerPulls to check if the sampler was pulled correctly.

Then it passes the item to ´samplerSubject` to cause the sampler to emit that value, which will result in another pull to the source.

src/index.js Outdated

if (type === 1) {
sink(1, data)
samplerTalkback(1, data)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before that line you should check if the source hasnt been unsubscribed. It's technically possible for a sink to unsubscribe after receiving that data and before samplerTalkback gets pulled.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks! I'll fix this soon.

Copy link
Author

@mperktold mperktold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure source and sampler are never called after termination. Also, there was a problem were the sink could be delivered data before it is greeted back, so when it tries to terminate the talkback, it would raise a reference error. The last test case added in this commit is an example were this probelm would occur. Reversing the order of subscribing to the puller and greeting back the sink fixed this.

Sorry I was trying to add a commit message.

Make sure source and sampler are never called after termination.

Also, there was a problem were the sink could be delivered data before it is greeted back, so when it tries to terminate the talkback, it would raise a reference error.
The last test case added in this commit is an example were this probelm would occur.
Reversing the order of subscribing to the puller and greeting back the sink fixed this.
@mperktold
Copy link
Author

mperktold commented Jun 15, 2019

I noticed that actually, it is the second last test case for which I needed to reverse the order of subscribing to the sampler and greeting back the sink, not the last one as I wrote in the commit message.

The pattern which caused the problem is as follows:

  1. The sampler immediately emits a value upon subscription, causing a pull of the source.
  2. The source is synchronous, and therefore immediately emits a value when pulled.
  3. The operator passes the value down to the sink.
  4. The sink tries to unsubscribe when given the first value.

A simple example that matches this pattern is the following:

pipe(
  fromIter([1, 2]),
  pullWhen(of(0)),
  take(1)
)

This failed previously because passing the first value would happen inside the subscribe call to the sampler, i.e. before passing the talkback to the sink.

Now the sink is greeted back before subscribing to the sampler, so this cannot happen anymore.

@mperktold
Copy link
Author

I dont't want to push you into merging this, but is there still something missing?

If so, please let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants