Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chimera/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ prost = "0.12"
prost-types = "0.12"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
nvml-wrapper = "0.9"

[build-dependencies]
prost-build = "0.12"
Expand Down
37 changes: 32 additions & 5 deletions chimera/agent/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use tonic::{transport::Server, Request, Response, Status};
use prost::Message;
use crate::hash::trusted_hash;
use crate::nvml::collect_metrics;
use super::super::proto::telemetry::{telemetry_server::{Telemetry, TelemetryServer}, TelemetryRequest, TelemetryReply};
use crate::nvml::{collect_metrics, TelemetryData};
use super::super::proto::telemetry::{telemetry_server::{Telemetry, TelemetryServer}, telemetry_client::TelemetryClient, TelemetryRequest, TelemetryReply, TelemetryRecord};
use tokio::time::{sleep, Duration};

pub mod telemetry {
tonic::include_proto!("telemetry");
Expand All @@ -14,10 +15,16 @@ pub struct TelemetrySvc;
#[tonic::async_trait]
impl Telemetry for TelemetrySvc {
async fn send_telemetry(&self, request: Request<TelemetryRequest>) -> Result<Response<TelemetryReply>, Status> {
let payload = request.into_inner().payload;
let hash = trusted_hash(&payload);
let req = request.into_inner();
let hash = trusted_hash(&req.payload);
let metrics = collect_metrics().unwrap_or_default();
let reply = TelemetryReply { hash, metrics };
let reply = TelemetryReply {
hash,
metrics: format!(
"gpu:{} temp:{} ecc:{} mem:{}",
metrics.gpu_index, metrics.temperature, metrics.ecc_errors, metrics.memory_used
),
};
Ok(Response::new(reply))
}
}
Expand All @@ -28,3 +35,23 @@ pub async fn serve() -> Result<(), Box<dyn std::error::Error>> {
Server::builder().add_service(TelemetryServer::new(svc)).serve(addr).await?;
Ok(())
}

pub async fn run_client() -> Result<(), Box<dyn std::error::Error>> {
let addr = std::env::var("AGGREGATOR_ADDR").unwrap_or_else(|_| "127.0.0.1:50051".into());
let mut client = TelemetryClient::connect(format!("http://{}", addr)).await?;
loop {
let data: TelemetryData = collect_metrics().unwrap_or_default();
let record = TelemetryRecord {
gpu_index: data.gpu_index,
memory_used: data.memory_used,
ecc_errors: data.ecc_errors,
temperature: data.temperature,
};
let req = TelemetryRequest {
payload: b"heartbeat".to_vec(),
record: Some(record),
};
let _ = client.send_telemetry(req).await?;
sleep(Duration::from_secs(1)).await;
}
}
5 changes: 3 additions & 2 deletions chimera/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ mod grpc;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let telemetry_task = grpc::serve().instrument(tracing::info_span!("grpc"));
tokio::try_join!(telemetry_task)?;
let telemetry_task = grpc::serve().instrument(tracing::info_span!("grpc_server"));
let client_task = grpc::run_client().instrument(tracing::info_span!("grpc_client"));
tokio::try_join!(telemetry_task, client_task)?;
Ok(())
}
19 changes: 17 additions & 2 deletions chimera/agent/src/nvml/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
use nvml_wrapper::NVML;
use std::error::Error;

pub fn collect_metrics() -> Result<String, Box<dyn Error>> {
#[derive(Default, Clone)]
pub struct TelemetryData {
pub gpu_index: u32,
pub memory_used: u64,
pub ecc_errors: u64,
pub temperature: u32,
}

pub fn collect_metrics() -> Result<TelemetryData, Box<dyn Error>> {
let nvml = NVML::init()?;
let device = nvml.device_by_index(0)?;
let memory = device.memory_info()?;
Ok(format!("used:{}", memory.used))
let temperature = device.temperature(0)?;
let ecc_errors = 0u64; // placeholder until NVML bindings available
Ok(TelemetryData {
gpu_index: 0,
memory_used: memory.used,
ecc_errors,
temperature,
})
}

#[cfg(test)]
Expand Down
54 changes: 34 additions & 20 deletions chimera/aggregator/cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
package main

import (
"context"
"net"
"context"
"net"
"sync"

pb "github.com/example/chimera/aggregator/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
pb "github.com/example/chimera/aggregator/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type server struct {
pb.UnimplementedTelemetryServer
log *zap.Logger
pb.UnimplementedTelemetryServer
log *zap.Logger
mu sync.Mutex
records []pb.TelemetryRecord
}

func (s *server) SendTelemetry(ctx context.Context, in *pb.TelemetryRequest) (*pb.TelemetryReply, error) {
s.log.Info("received telemetry", zap.Int("size", len(in.GetPayload())))
return &pb.TelemetryReply{Hash: "ok", Metrics: ""}, nil
rec := "nil"
if in.Record != nil {
rec = "gpu"
}
s.log.Info("received telemetry",
zap.Int("size", len(in.Payload)),
zap.String("record", rec))
if in.Record != nil {
s.mu.Lock()
s.records = append(s.records, *in.Record)
s.mu.Unlock()
}
return &pb.TelemetryReply{Hash: "ok", Metrics: "stored"}, nil
}

func main() {
logger, _ := zap.NewProduction()
lis, err := net.Listen("tcp", ":50051")
if err != nil {
logger.Fatal("failed to listen", zap.Error(err))
}
grpcServer := grpc.NewServer()
pb.RegisterTelemetryServer(grpcServer, &server{log: logger})
logger.Info("aggregator started", zap.String("addr", lis.Addr().String()))
if err := grpcServer.Serve(lis); err != nil {
logger.Fatal("serve failed", zap.Error(err))
}
logger, _ := zap.NewProduction()
lis, err := net.Listen("tcp", ":50051")
if err != nil {
logger.Fatal("failed to listen", zap.Error(err))
}
grpcServer := grpc.NewServer()
pb.RegisterTelemetryServer(grpcServer, &server{log: logger})
logger.Info("aggregator started", zap.String("addr", lis.Addr().String()))
if err := grpcServer.Serve(lis); err != nil {
logger.Fatal("serve failed", zap.Error(err))
}
}
40 changes: 37 additions & 3 deletions chimera/aggregator/cmd/server_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,41 @@
package main

import "testing"
import (
"context"
"net"
"testing"
"time"

func TestDummy(t *testing.T) {
// placeholder
pb "github.com/example/chimera/aggregator/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
)

func TestSendTelemetry(t *testing.T) {
logger, _ := zap.NewDevelopment()
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
s := grpc.NewServer()
srv := &server{log: logger}
pb.RegisterTelemetryServer(s, srv)
go s.Serve(lis)
defer s.Stop()

conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client := pb.NewTelemetryClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err = client.SendTelemetry(ctx, &pb.TelemetryRequest{
Payload: []byte("test"),
Record: &pb.TelemetryRecord{GpuIndex: 0},
})
if err != nil {
t.Fatalf("rpc failed: %v", err)
}
}
78 changes: 43 additions & 35 deletions chimera/aggregator/proto/telemetry.pb.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,70 @@
package proto

import (
context "context"
codes "google.golang.org/grpc/codes"
grpc "google.golang.org/grpc"
status "google.golang.org/grpc/status"
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)

type TelemetryRequest struct {
Payload []byte
Payload []byte
Record *TelemetryRecord
}

type TelemetryReply struct {
Hash string
Metrics string
Hash string
Metrics string
}

type TelemetryRecord struct {
GpuIndex uint32
MemoryUsed uint64
EccErrors uint64
Temperature uint32
}

type TelemetryServer interface {
SendTelemetry(context.Context, *TelemetryRequest) (*TelemetryReply, error)
SendTelemetry(context.Context, *TelemetryRequest) (*TelemetryReply, error)
}

type UnimplementedTelemetryServer struct{}

func (UnimplementedTelemetryServer) SendTelemetry(context.Context, *TelemetryRequest) (*TelemetryReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendTelemetry not implemented")
return nil, status.Errorf(codes.Unimplemented, "method SendTelemetry not implemented")
}

func RegisterTelemetryServer(s *grpc.Server, srv TelemetryServer) {
s.RegisterService(&_Telemetry_serviceDesc, srv)
s.RegisterService(&_Telemetry_serviceDesc, srv)
}

func _Telemetry_SendTelemetry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TelemetryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TelemetryServer).SendTelemetry(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/telemetry.Telemetry/SendTelemetry",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TelemetryServer).SendTelemetry(ctx, req.(*TelemetryRequest))
}
return interceptor(ctx, in, info, handler)
in := new(TelemetryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TelemetryServer).SendTelemetry(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/telemetry.Telemetry/SendTelemetry",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TelemetryServer).SendTelemetry(ctx, req.(*TelemetryRequest))
}
return interceptor(ctx, in, info, handler)
}

var _Telemetry_serviceDesc = grpc.ServiceDesc{
ServiceName: "telemetry.Telemetry",
HandlerType: (*TelemetryServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SendTelemetry",
Handler: _Telemetry_SendTelemetry_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "telemetry.proto",
ServiceName: "telemetry.Telemetry",
HandlerType: (*TelemetryServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SendTelemetry",
Handler: _Telemetry_SendTelemetry_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "telemetry.proto",
}
8 changes: 8 additions & 0 deletions chimera/proto/telemetry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@ service Telemetry {

message TelemetryRequest {
bytes payload = 1;
TelemetryRecord record = 2;
}

message TelemetryReply {
string hash = 1;
string metrics = 2;
}

message TelemetryRecord {
uint32 gpu_index = 1;
uint64 memory_used = 2;
uint64 ecc_errors = 3;
uint32 temperature = 4;
}
2 changes: 2 additions & 0 deletions chimera/scripts/integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ cd $(dirname $0)/..
docker-compose up -d --build
sleep 10
docker-compose ps
docker-compose logs aggregator
docker-compose down
Loading