Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ Pulsing 提供基于 Actor System 的 LLM 推理服务:

```bash
# 启动 Router (OpenAI 兼容 API)
pulsing actor --type router --addr 0.0.0.0:8000 --http_port 8080 --model_name my-llm
pulsing actor router --addr 0.0.0.0:8000 --http_port 8080 --model_name my-llm

# 启动 Transformers Worker
pulsing actor --type transformers --model gpt2 --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000
pulsing actor transformers --model gpt2 --addr 0.0.0.0:8001 --seeds 127.0.0.1:8000

# 启动 vLLM Worker
pulsing actor --type vllm --model Qwen/Qwen2.5-0.5B --addr 0.0.0.0:8002 --seeds 127.0.0.1:8000
pulsing actor vllm --model Qwen/Qwen2.5-0.5B --addr 0.0.0.0:8002 --seeds 127.0.0.1:8000

# 启动 vLLM Worker (macOS Metal 支持)
pulsing actor vllm --model Qwen/Qwen3-0.6B --mlx_device gpu --metal_memory_fraction 0.8 --addr 0.0.0.0:8002 --seeds 127.0.0.1:8000

# 运行基准测试
pulsing bench --tokenizer_name gpt2 --url http://localhost:8080
Expand Down
50 changes: 42 additions & 8 deletions bindings/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,37 +608,71 @@ impl Actor for PythonActorWrapper {
async fn on_start(&mut self, ctx: &mut ActorContext) -> anyhow::Result<()> {
let handler = Python::with_gil(|py| self.handler.clone_ref(py));
let actor_id = *ctx.id();
let event_loop = Python::with_gil(|py| self.event_loop.clone_ref(py));

python_executor()
.execute(move || {
Python::with_gil(|py| {
Python::with_gil(|py| -> PyResult<()> {
if handler.getattr(py, "on_start").is_ok() {
let py_actor_id = PyActorId { inner: actor_id };
if let Err(e) = handler.call_method1(py, "on_start", (py_actor_id,)) {
tracing::warn!("Python actor on_start error: {:?}", e);
let result = handler.call_method1(py, "on_start", (py_actor_id,))?;

// 检查返回的是否是协程,如果是则等待它完成
let asyncio = py.import("asyncio")?;
let is_coro = asyncio
.call_method1("iscoroutine", (&result,))?
.extract::<bool>()?;

if is_coro {
let run_coroutine_threadsafe =
asyncio.getattr("run_coroutine_threadsafe")?;
let future = run_coroutine_threadsafe.call1((&result, &event_loop))?;
future.call_method0("result")?;
}

Ok(())
} else {
Ok(())
}
})
})
.await
.map_err(|e| anyhow::anyhow!("Python executor error: {:?}", e))
.map_err(|e| anyhow::anyhow!("Python executor error: {:?}", e))?
.map_err(|e| anyhow::anyhow!("Python on_start error: {:?}", e))
}

async fn on_stop(&mut self, _ctx: &mut ActorContext) -> anyhow::Result<()> {
let handler = Python::with_gil(|py| self.handler.clone_ref(py));
let event_loop = Python::with_gil(|py| self.event_loop.clone_ref(py));

python_executor()
.execute(move || {
Python::with_gil(|py| {
Python::with_gil(|py| -> PyResult<()> {
if handler.getattr(py, "on_stop").is_ok() {
if let Err(e) = handler.call_method0(py, "on_stop") {
tracing::warn!("Python actor on_stop error: {:?}", e);
let result = handler.call_method0(py, "on_stop")?;

// 检查返回的是否是协程,如果是则等待它完成
let asyncio = py.import("asyncio")?;
let is_coro = asyncio
.call_method1("iscoroutine", (&result,))?
.extract::<bool>()?;

if is_coro {
let run_coroutine_threadsafe =
asyncio.getattr("run_coroutine_threadsafe")?;
let future = run_coroutine_threadsafe.call1((&result, &event_loop))?;
future.call_method0("result")?;
}

Ok(())
} else {
Ok(())
}
})
})
.await
.map_err(|e| anyhow::anyhow!("Python executor error: {:?}", e))
.map_err(|e| anyhow::anyhow!("Python executor error: {:?}", e))?
.map_err(|e| anyhow::anyhow!("Python on_stop error: {:?}", e))
}

async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result<Message> {
Expand Down
61 changes: 50 additions & 11 deletions python/pulsing/actors/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@
is_chat: bool,
) -> web.StreamResponse:
created = int(time.time())
response = web.StreamResponse(
stream_response = web.StreamResponse(
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"}
)
await response.prepare(request)
await stream_response.prepare(request)

obj_type = "chat.completion.chunk" if is_chat else "text_completion"

Expand All @@ -250,13 +250,50 @@
"GenerateStreamRequest",
{"prompt": prompt, "max_new_tokens": max_tokens},
)
response = await worker_ref.ask(req_msg)
reader = response.stream_reader()
stream_message = await worker_ref.ask(req_msg)

# 检查返回的是否是流式消息
if not stream_message.is_stream:
# 如果不是流式消息,可能是错误消息
error_data = stream_message.to_json()
error_msg = error_data.get("error", "Unknown error")
await stream_response.write(
f"data: {json.dumps({'error': error_msg})}\n\n".encode()
)
await stream_response.write(b"data: [DONE]\n\n")
return stream_response

reader = stream_message.stream_reader()

async for chunk_bytes in reader:
try:
chunk = json.loads(chunk_bytes)
finish_reason = chunk.get("finish_reason")
text = chunk.get("text", "")

# 检查是否结束
if finish_reason:
# 发送最后的 chunk(如果有文本)
if text:
data = {
"id": request_id,
"object": obj_type,
"created": created,
"model": model or self.model_name,
"choices": [
{"index": 0, "finish_reason": finish_reason}
],
}
if is_chat:
data["choices"][0]["delta"] = {"content": text}
else:
data["choices"][0]["text"] = text
await stream_response.write(
f"data: {json.dumps(data)}\n\n".encode()
)
break

# 只发送非空文本
if text:
data = {
"id": request_id,
Expand All @@ -269,13 +306,15 @@
data["choices"][0]["delta"] = {"content": text}
else:
data["choices"][0]["text"] = text
await response.write(f"data: {json.dumps(data)}\n\n".encode())
if chunk.get("finish_reason"):
break
await stream_response.write(
f"data: {json.dumps(data)}\n\n".encode()
)
except json.JSONDecodeError:
continue
except Exception as e:
await response.write(f"data: {json.dumps({'error': str(e)})}\n\n".encode())
await stream_response.write(
f"data: {json.dumps({'error': str(e)})}\n\n".encode()
)

final = {
"id": request_id,
Expand All @@ -288,9 +327,9 @@
final["choices"][0]["delta"] = {}
else:
final["choices"][0]["text"] = ""
await response.write(f"data: {json.dumps(final)}\n\n".encode())
await response.write(b"data: [DONE]\n\n")
return response
await stream_response.write(f"data: {json.dumps(final)}\n\n".encode())
await stream_response.write(b"data: [DONE]\n\n")
return stream_response


async def start_router(
Expand Down
Loading
Loading