Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions __tests__/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,121 @@ test('works', () => {
expect(actual).toEqual([10, 20, 30, 40, 50])
})
})

test('works with pullable sampler', () => {
const actual = []

const samplerPulls = []
const samplerSubject = subject()
const sampler = (start, sink) => {
if (start !== 0) return
sink(0, (t, d) => {
if (t === 1) samplerPulls.push(d)
})
samplerSubject(0, (t, d) => {
if (t === 1) sink(1, d)
})
}

pipe(
fromIter([10, 20, 30]),
pullWhen(sampler),
)(0, (t, d) => {
if (t === 1) actual.push(d)
})

expect(actual).toEqual([])
samplerSubject(1)
for (let i = 1; i <= 3; i++) {
expect(samplerPulls).toEqual([10 * i])
samplerSubject(1, samplerPulls.pop())
}
expect(actual).toEqual([10, 20, 30])
})

test('does not pull sampler after it terminated', () => {
const actual = []

const sampler = (start, sink) => {
if (start !== 0) return
let terminated = false
sink(0, (t, d) => {
expect(terminated).toBeFalsy()
terminated = true
if (t !== 2) sink(2)
})
sink(1)
}

pipe(
fromIter([1, 2]),
pullWhen(sampler),
)(0, (t, d) => {
if (t === 1) actual.push(d)
})

expect(actual).toEqual([1])
})

test('terminates sampler when source terminated', () => {
const actual = []
let sourceTerminated = false
let samplerTerminated = false

const sampler = (start, sink) => {
if (start !== 0) return
sink(0, (t, d) => {
expect(samplerTerminated).toBeFalsy()
if (t === 2) samplerTerminated = true
sink(t, d)
})
sink(1)
}

let talkback
pipe(
fromIter([1, 2]),
pullWhen(sampler),
)(0, (t, d) => {
if (t === 0) talkback = d
if (t === 1) {
actual.push(d)
sourceTerminated = true
talkback(2)
}
})

expect(sourceTerminated).toBeTruthy()
expect(samplerTerminated).toBeTruthy()
expect(actual).toEqual([1])
})

test('terminates source when sampler terminated', () => {
const actual = []
let sourceTerminated = false
let samplerTerminated = false

const sampler = (start, sink) => {
if (start !== 0) return
sink(0, (t, d) => {
expect(samplerTerminated).toBeFalsy()
if (t === 1) {
samplerTerminated = true
sink(2)
} else sink(t, d)
})
sink(1)
}

pipe(
fromIter([1, 2]),
pullWhen(sampler),
)(0, (t, d) => {
if (t === 1) actual.push(d)
if (t === 2) sourceTerminated = true
})

expect(sourceTerminated).toBeTruthy()
expect(samplerTerminated).toBeTruthy()
expect(actual).toEqual([1])
})
20 changes: 13 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@ export default function pullWhen(sampler) {

let pullableTalkback
let samplerTalkback
let terminated = false

pullable(0, (type, data) => {
if (type === 0) {
pullableTalkback = data

sink(0, end => {
if (end !== 2) return

terminated = true
pullableTalkback(2)
samplerTalkback(2)
})
if (terminated) return

sampler(0, (type, data) => {
if (type === 0) {
samplerTalkback = data
Expand All @@ -21,27 +31,23 @@ export default function pullWhen(sampler) {
}

if (type === 2) {
terminated = true
pullableTalkback(2)
sink(2)
return
}
})

sink(0, end => {
if (end !== 2) return

pullableTalkback(2)
samplerTalkback(2)
})
return
}

if (type === 1) {
sink(1, data)
if (!terminated) samplerTalkback(1, data)
return
}

if (type === 2) {
terminated = true
samplerTalkback(2)
sink(2)
return
Expand Down