Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions examples/fib_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
from streamz import Stream
import asyncio
from tornado.platform.asyncio import AsyncIOMainLoop
AsyncIOMainLoop().install()
from streamz import Stream


source = Stream()
source = Stream(asynchronous=True)
s = source.sliding_window(2).map(sum)
L = s.sink_to_list() # store result in a list

s.rate_limit(0.5).sink(source.emit) # pipe output back to input
s.rate_limit(1.0).sink(lambda x: print(L)) # print state of L every second

source.emit(0) # seed with initial values
source.emit(1)

L = s.sink_to_list() # store result in a list

def run_asyncio_loop():
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second
s.rate_limit('500ms').connect(source) # pipe output back to input

source.emit(1) # seed with initial value, does not block thread due to Future return

run_asyncio_loop()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, asyncio.CancelledError):
pass
11 changes: 6 additions & 5 deletions examples/fib_thread.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from streamz import Stream
from tornado.ioloop import IOLoop

source = Stream()
s = source.sliding_window(2).map(sum)
L = s.sink_to_list() # store result in a list
L = s.sink_to_list() # store result in a list

s.rate_limit('500ms').sink(source.emit) # pipe output back to input
s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second
s.rate_limit('500ms').connect(source) # pipe output back to input

source.emit(0) # seed with initial values
source.emit(1)
try:
source.emit(1) # seed with initial value, blocks thread due to cycle in stream
except KeyboardInterrupt:
pass
12 changes: 7 additions & 5 deletions examples/fib_tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

source = Stream(asynchronous=True)
s = source.sliding_window(2).map(sum)
L = s.sink_to_list() # store result in a list
L = s.sink_to_list() # store result in a list

s.rate_limit('500ms').sink(source.emit) # pipe output back to input
s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second
s.rate_limit('500ms').connect(source) # pipe output back to input

source.emit(0) # seed with initial values
source.emit(1)
source.emit(1) # seed with initial value, does not block thread due to Future return

IOLoop.current().start()
try:
IOLoop.current().start()
except KeyboardInterrupt:
pass
7 changes: 6 additions & 1 deletion examples/network_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@
)

s.start()
time.sleep(600)

try:
while True:
time.sleep(600)
except KeyboardInterrupt:
pass
32 changes: 14 additions & 18 deletions examples/scrape.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
from __future__ import print_function

from time import sleep
import sys

from BeautifulSoup import BeautifulSoup # Python 2 only, sorry.
from urllib.parse import urlparse

import requests
from streamz import Stream
import toolz
import urlparse
from bs4 import BeautifulSoup

from streamz import Stream


def links_of_page((content, page)):
uri = urlparse.urlparse(page)
def links_of_page(content_page):
(content, page) = content_page
uri = urlparse(page)
domain = '%s://%s' % (uri.scheme, uri.netloc)
try:
soup = BeautifulSoup(content)
soup = BeautifulSoup(content, features="html.parser")
except:
return []
else:
links = [link.get('href') for link in soup.findAll('a')]
links = [link.get('href') for link in soup.find_all('a')]
return [domain + link
for link in links
if link
Expand All @@ -41,8 +38,8 @@ def topk_dict(d, k=10):
.map(lambda x: x.content))
links = (content.zip(pages)
.map(links_of_page)
.concat())
links.sink(source.emit)
.flatten())
links.connect(source)

"""
from nltk.corpus import stopwords
Expand All @@ -60,8 +57,7 @@ def topk_dict(d, k=10):
"""

if len(sys.argv) > 1:
source.emit(sys.argv[1])



#
try:
source.emit(sys.argv[1])
except KeyboardInterrupt:
pass
5 changes: 4 additions & 1 deletion streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,10 @@ class flatten(Stream):
def update(self, x, who=None, metadata=None):
L = []
items = chain(x)
item = next(items)
try:
item = next(items)
except StopIteration:
return L
for item_next in items:
y = self._emit(item)
item = item_next
Expand Down
6 changes: 5 additions & 1 deletion streamz/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import queue
import os
import time
from inspect import isawaitable

from tornado import gen
import weakref

Expand Down Expand Up @@ -252,7 +254,9 @@ async def handle_stream(self, stream, address):
while not self.source.stopped:
try:
data = await stream.read_until(self.source.delimiter)
await self.source._emit(data)
result = self.source._emit(data)
if isawaitable(result):
await result
except StreamClosedError:
break

Expand Down
11 changes: 11 additions & 0 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,17 @@ def test_flatten(iterators):
assert L == [1, 2, 3, 4, 5, 6, 7, 8]


def test_flatten_empty():
source = Stream()
L = source.flatten().sink_to_list()

source.emit([1, 2])
source.emit([])
source.emit([3, 4])

assert L == [1, 2, 3, 4]


def test_unique():
source = Stream()
L = source.unique().sink_to_list()
Expand Down
33 changes: 32 additions & 1 deletion streamz/tests/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from flaky import flaky
import pytest
from streamz import Source
from streamz.utils_test import wait_for, await_for, gen_test
from streamz.utils_test import free_port, wait_for, await_for, gen_test
import socket


Expand Down Expand Up @@ -47,6 +47,37 @@ def test_tcp():
sock2.close()


@flaky(max_runs=3, min_passes=1)
def test_tcp_word_count_example():
port = free_port()
s = Source.from_tcp(port)
out = s.map(bytes.split).flatten().frequencies().sink_to_list()
s.start()
wait_for(lambda: s.server is not None, 2, period=0.02)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("localhost", port))
sock.send(b'data\n')

with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock,
socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock2):
sock.connect(("localhost", port))
sock2.connect(("localhost", port))
sock.send(b'data\n')
# regression test a bug in from_tcp where a second packet from
# the same socket is dropped due to the socket handler dying
sock.send(b'data\n')
sock2.send(b'data2\n')

expected = [{b"data": 1}, {b"data": 2}, {b"data": 3}, {b"data": 3, b"data2": 1}]

def fail_func():
assert out == expected

wait_for(lambda: out == expected, 2, fail_func=fail_func, period=0.01)



@flaky(max_runs=3, min_passes=1)
@gen_test(timeout=60)
def test_tcp_async():
Expand Down
8 changes: 8 additions & 0 deletions streamz/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import shutil
import socket
import tempfile
from time import time, sleep

Expand All @@ -14,6 +15,13 @@
from .core import _io_loops, Stream


def free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]


@contextmanager
def tmpfile(extension=''):
extension = '.' + extension.lstrip('.')
Expand Down
Loading