diff --git a/chimera/agent/Cargo.toml b/chimera/agent/Cargo.toml index 3f194902..3bcfcf84 100644 --- a/chimera/agent/Cargo.toml +++ b/chimera/agent/Cargo.toml @@ -11,6 +11,7 @@ prost = "0.12" prost-types = "0.12" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } +nvml-wrapper = "0.9" [build-dependencies] prost-build = "0.12" diff --git a/chimera/agent/src/grpc/mod.rs b/chimera/agent/src/grpc/mod.rs index 3de8a64c..8b18c817 100644 --- a/chimera/agent/src/grpc/mod.rs +++ b/chimera/agent/src/grpc/mod.rs @@ -1,8 +1,9 @@ use tonic::{transport::Server, Request, Response, Status}; use prost::Message; use crate::hash::trusted_hash; -use crate::nvml::collect_metrics; -use super::super::proto::telemetry::{telemetry_server::{Telemetry, TelemetryServer}, TelemetryRequest, TelemetryReply}; +use crate::nvml::{collect_metrics, TelemetryData}; +use super::super::proto::telemetry::{telemetry_server::{Telemetry, TelemetryServer}, telemetry_client::TelemetryClient, TelemetryRequest, TelemetryReply, TelemetryRecord}; +use tokio::time::{sleep, Duration}; pub mod telemetry { tonic::include_proto!("telemetry"); @@ -14,10 +15,16 @@ pub struct TelemetrySvc; #[tonic::async_trait] impl Telemetry for TelemetrySvc { async fn send_telemetry(&self, request: Request) -> Result, Status> { - let payload = request.into_inner().payload; - let hash = trusted_hash(&payload); + let req = request.into_inner(); + let hash = trusted_hash(&req.payload); let metrics = collect_metrics().unwrap_or_default(); - let reply = TelemetryReply { hash, metrics }; + let reply = TelemetryReply { + hash, + metrics: format!( + "gpu:{} temp:{} ecc:{} mem:{}", + metrics.gpu_index, metrics.temperature, metrics.ecc_errors, metrics.memory_used + ), + }; Ok(Response::new(reply)) } } @@ -28,3 +35,23 @@ pub async fn serve() -> Result<(), Box> { Server::builder().add_service(TelemetryServer::new(svc)).serve(addr).await?; Ok(()) } + +pub async fn run_client() -> Result<(), Box> { + let addr = std::env::var("AGGREGATOR_ADDR").unwrap_or_else(|_| "127.0.0.1:50051".into()); + let mut client = TelemetryClient::connect(format!("http://{}", addr)).await?; + loop { + let data: TelemetryData = collect_metrics().unwrap_or_default(); + let record = TelemetryRecord { + gpu_index: data.gpu_index, + memory_used: data.memory_used, + ecc_errors: data.ecc_errors, + temperature: data.temperature, + }; + let req = TelemetryRequest { + payload: b"heartbeat".to_vec(), + record: Some(record), + }; + let _ = client.send_telemetry(req).await?; + sleep(Duration::from_secs(1)).await; + } +} diff --git a/chimera/agent/src/main.rs b/chimera/agent/src/main.rs index fef275a7..055e6fe7 100644 --- a/chimera/agent/src/main.rs +++ b/chimera/agent/src/main.rs @@ -11,7 +11,8 @@ mod grpc; async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); - let telemetry_task = grpc::serve().instrument(tracing::info_span!("grpc")); - tokio::try_join!(telemetry_task)?; + let telemetry_task = grpc::serve().instrument(tracing::info_span!("grpc_server")); + let client_task = grpc::run_client().instrument(tracing::info_span!("grpc_client")); + tokio::try_join!(telemetry_task, client_task)?; Ok(()) } diff --git a/chimera/agent/src/nvml/mod.rs b/chimera/agent/src/nvml/mod.rs index 0c50fa22..a924c4c1 100644 --- a/chimera/agent/src/nvml/mod.rs +++ b/chimera/agent/src/nvml/mod.rs @@ -1,11 +1,26 @@ use nvml_wrapper::NVML; use std::error::Error; -pub fn collect_metrics() -> Result> { +#[derive(Default, Clone)] +pub struct TelemetryData { + pub gpu_index: u32, + pub memory_used: u64, + pub ecc_errors: u64, + pub temperature: u32, +} + +pub fn collect_metrics() -> Result> { let nvml = NVML::init()?; let device = nvml.device_by_index(0)?; let memory = device.memory_info()?; - Ok(format!("used:{}", memory.used)) + let temperature = device.temperature(0)?; + let ecc_errors = 0u64; // placeholder until NVML bindings available + Ok(TelemetryData { + gpu_index: 0, + memory_used: memory.used, + ecc_errors, + temperature, + }) } #[cfg(test)] diff --git a/chimera/aggregator/cmd/server.go b/chimera/aggregator/cmd/server.go index 6dc11661..e2ce6520 100644 --- a/chimera/aggregator/cmd/server.go +++ b/chimera/aggregator/cmd/server.go @@ -1,34 +1,48 @@ package main import ( - "context" - "net" + "context" + "net" + "sync" - pb "github.com/example/chimera/aggregator/proto" - "go.uber.org/zap" - "google.golang.org/grpc" + pb "github.com/example/chimera/aggregator/proto" + "go.uber.org/zap" + "google.golang.org/grpc" ) type server struct { - pb.UnimplementedTelemetryServer - log *zap.Logger + pb.UnimplementedTelemetryServer + log *zap.Logger + mu sync.Mutex + records []pb.TelemetryRecord } func (s *server) SendTelemetry(ctx context.Context, in *pb.TelemetryRequest) (*pb.TelemetryReply, error) { - s.log.Info("received telemetry", zap.Int("size", len(in.GetPayload()))) - return &pb.TelemetryReply{Hash: "ok", Metrics: ""}, nil + rec := "nil" + if in.Record != nil { + rec = "gpu" + } + s.log.Info("received telemetry", + zap.Int("size", len(in.Payload)), + zap.String("record", rec)) + if in.Record != nil { + s.mu.Lock() + s.records = append(s.records, *in.Record) + s.mu.Unlock() + } + return &pb.TelemetryReply{Hash: "ok", Metrics: "stored"}, nil } func main() { - logger, _ := zap.NewProduction() - lis, err := net.Listen("tcp", ":50051") - if err != nil { - logger.Fatal("failed to listen", zap.Error(err)) - } - grpcServer := grpc.NewServer() - pb.RegisterTelemetryServer(grpcServer, &server{log: logger}) - logger.Info("aggregator started", zap.String("addr", lis.Addr().String())) - if err := grpcServer.Serve(lis); err != nil { - logger.Fatal("serve failed", zap.Error(err)) - } + logger, _ := zap.NewProduction() + lis, err := net.Listen("tcp", ":50051") + if err != nil { + logger.Fatal("failed to listen", zap.Error(err)) + } + grpcServer := grpc.NewServer() + pb.RegisterTelemetryServer(grpcServer, &server{log: logger}) + logger.Info("aggregator started", zap.String("addr", lis.Addr().String())) + if err := grpcServer.Serve(lis); err != nil { + logger.Fatal("serve failed", zap.Error(err)) + } } diff --git a/chimera/aggregator/cmd/server_test.go b/chimera/aggregator/cmd/server_test.go index 2832e71b..bfc54706 100644 --- a/chimera/aggregator/cmd/server_test.go +++ b/chimera/aggregator/cmd/server_test.go @@ -1,7 +1,41 @@ package main -import "testing" +import ( + "context" + "net" + "testing" + "time" -func TestDummy(t *testing.T) { - // placeholder + pb "github.com/example/chimera/aggregator/proto" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +func TestSendTelemetry(t *testing.T) { + logger, _ := zap.NewDevelopment() + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + s := grpc.NewServer() + srv := &server{log: logger} + pb.RegisterTelemetryServer(s, srv) + go s.Serve(lis) + defer s.Stop() + + conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + client := pb.NewTelemetryClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err = client.SendTelemetry(ctx, &pb.TelemetryRequest{ + Payload: []byte("test"), + Record: &pb.TelemetryRecord{GpuIndex: 0}, + }) + if err != nil { + t.Fatalf("rpc failed: %v", err) + } } diff --git a/chimera/aggregator/proto/telemetry.pb.go b/chimera/aggregator/proto/telemetry.pb.go index 50002beb..d7df7086 100644 --- a/chimera/aggregator/proto/telemetry.pb.go +++ b/chimera/aggregator/proto/telemetry.pb.go @@ -1,62 +1,70 @@ package proto import ( - context "context" - codes "google.golang.org/grpc/codes" - grpc "google.golang.org/grpc" - status "google.golang.org/grpc/status" + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" ) type TelemetryRequest struct { - Payload []byte + Payload []byte + Record *TelemetryRecord } type TelemetryReply struct { - Hash string - Metrics string + Hash string + Metrics string +} + +type TelemetryRecord struct { + GpuIndex uint32 + MemoryUsed uint64 + EccErrors uint64 + Temperature uint32 } type TelemetryServer interface { - SendTelemetry(context.Context, *TelemetryRequest) (*TelemetryReply, error) + SendTelemetry(context.Context, *TelemetryRequest) (*TelemetryReply, error) } type UnimplementedTelemetryServer struct{} func (UnimplementedTelemetryServer) SendTelemetry(context.Context, *TelemetryRequest) (*TelemetryReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method SendTelemetry not implemented") + return nil, status.Errorf(codes.Unimplemented, "method SendTelemetry not implemented") } func RegisterTelemetryServer(s *grpc.Server, srv TelemetryServer) { - s.RegisterService(&_Telemetry_serviceDesc, srv) + s.RegisterService(&_Telemetry_serviceDesc, srv) } func _Telemetry_SendTelemetry_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TelemetryRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TelemetryServer).SendTelemetry(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/telemetry.Telemetry/SendTelemetry", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TelemetryServer).SendTelemetry(ctx, req.(*TelemetryRequest)) - } - return interceptor(ctx, in, info, handler) + in := new(TelemetryRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TelemetryServer).SendTelemetry(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/telemetry.Telemetry/SendTelemetry", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TelemetryServer).SendTelemetry(ctx, req.(*TelemetryRequest)) + } + return interceptor(ctx, in, info, handler) } var _Telemetry_serviceDesc = grpc.ServiceDesc{ - ServiceName: "telemetry.Telemetry", - HandlerType: (*TelemetryServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "SendTelemetry", - Handler: _Telemetry_SendTelemetry_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "telemetry.proto", + ServiceName: "telemetry.Telemetry", + HandlerType: (*TelemetryServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SendTelemetry", + Handler: _Telemetry_SendTelemetry_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "telemetry.proto", } diff --git a/chimera/proto/telemetry.proto b/chimera/proto/telemetry.proto index dc3c8ec4..e439fedd 100644 --- a/chimera/proto/telemetry.proto +++ b/chimera/proto/telemetry.proto @@ -7,9 +7,17 @@ service Telemetry { message TelemetryRequest { bytes payload = 1; + TelemetryRecord record = 2; } message TelemetryReply { string hash = 1; string metrics = 2; } + +message TelemetryRecord { + uint32 gpu_index = 1; + uint64 memory_used = 2; + uint64 ecc_errors = 3; + uint32 temperature = 4; +} diff --git a/chimera/scripts/integration-test.sh b/chimera/scripts/integration-test.sh index 01b32a18..ee5f2983 100644 --- a/chimera/scripts/integration-test.sh +++ b/chimera/scripts/integration-test.sh @@ -4,3 +4,5 @@ cd $(dirname $0)/.. docker-compose up -d --build sleep 10 docker-compose ps +docker-compose logs aggregator +docker-compose down