Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -11080,10 +11080,10 @@ def go_deps():
name = "io_storj_drpc",
build_file_proto_mode = "disable_global",
importpath = "storj.io/drpc",
sha256 = "089481375939240f2fd5c2caa87ecde398bb2a9fb7d7998a2cee8ad0624a7ea1",
strip_prefix = "github.com/cockroachdb/drpc@v0.0.0-20260406142218-6c77a9e470d3",
sha256 = "a10c1417ed565a0e63f47d08beeb484a3fd01bf350f923d9c9ac89ee449c9f10",
strip_prefix = "github.com/cockroachdb/drpc@v0.0.0-20260514062819-a5d8e9502652",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20260406142218-6c77a9e470d3.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20260514062819-a5d8e9502652.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/crlfmt/com_github_cockroachdb_crlfmt-v0.0.0-20221214225007-b2fc5c302548.zip": "fedc01bdd6d964da0425d5eaac8efadc951e78e13f102292cc0774197f09ab63",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/crlib/com_github_cockroachdb_crlib-v0.0.0-20251122031428-fe658a2dbda1.zip": "fa361e52b072ce18ac1d103e2556851906477361b5fe688745d29b1efabff3cb",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/datadriven/com_github_cockroachdb_datadriven-v1.0.3-0.20251123150250-ddff6747b112.zip": "6c1ae8a9550d3a92f85d7371e5c0caf922dc0ff1425e299359814b6a9a587f14",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20260406142218-6c77a9e470d3.zip": "089481375939240f2fd5c2caa87ecde398bb2a9fb7d7998a2cee8ad0624a7ea1",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/drpc/com_github_cockroachdb_drpc-v0.0.0-20260514062819-a5d8e9502652.zip": "a10c1417ed565a0e63f47d08beeb484a3fd01bf350f923d9c9ac89ee449c9f10",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/errors/com_github_cockroachdb_errors-v1.12.1-0.20251010171200-64801262cd6f.zip": "4df66cc44791d4290071696abf179dc6df7b94b4cb5d29a20f39c6bf522c60ee",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/go-test-teamcity/com_github_cockroachdb_go_test_teamcity-v0.0.0-20191211140407-cff980ad0a55.zip": "bac30148e525b79d004da84d16453ddd2d5cd20528e9187f1d7dac708335674b",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gogoproto/com_github_cockroachdb_gogoproto-v1.3.3-0.20241216150617-2358cdb156a1.zip": "bf052c9a7f9e23fb3ec7e9f3b7201cfc264c18ed6da0d662952d276dbc339003",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ replace github.com/docker/docker => github.com/moby/moby v25.0.14+incompatible

replace github.com/gogo/protobuf => github.com/cockroachdb/gogoproto v1.3.3-0.20241216150617-2358cdb156a1

replace storj.io/drpc => github.com/cockroachdb/drpc v0.0.0-20260406142218-6c77a9e470d3
replace storj.io/drpc => github.com/cockroachdb/drpc v0.0.0-20260514062819-a5d8e9502652

// Note: This forked dependency adds a commit that opens up some
// private APIs to enable us to make some perf improvements to
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,8 @@ github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:z
github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
github.com/cockroachdb/datadriven v1.0.3-0.20251123150250-ddff6747b112 h1:T1++5Vt0/4/IWZ1mHmUYl7fhQnz50QhNWIY+ITvLLIM=
github.com/cockroachdb/datadriven v1.0.3-0.20251123150250-ddff6747b112/go.mod h1:jsaKMvD3RBCATk1/jbUZM8C9idWBJME9+VRZ5+Liq1g=
github.com/cockroachdb/drpc v0.0.0-20260406142218-6c77a9e470d3 h1:Tn7JOQhDXUACzk732VOFSlh9jOlTYEcpZS23MHwyorI=
github.com/cockroachdb/drpc v0.0.0-20260406142218-6c77a9e470d3/go.mod h1:jeOpcnHh3/TvQ7K2kYXyLUMOMoMpuxp5vtdmtNPVW+8=
github.com/cockroachdb/drpc v0.0.0-20260514062819-a5d8e9502652 h1:0UxS8TskEe5J1uiDgt/GWSRdYt3vf9EYamJW0a/wMw8=
github.com/cockroachdb/drpc v0.0.0-20260514062819-a5d8e9502652/go.mod h1:WPZ/jH+vcb2j1rHZHk0mUG9QWT9gxM8ilEsaSZu0zbg=
github.com/cockroachdb/errors v1.12.1-0.20251010171200-64801262cd6f h1:lUmJxzb2/ukuRIvKTaNkvuj5LwlX4u/KxnI3zmx1SSw=
github.com/cockroachdb/errors v1.12.1-0.20251010171200-64801262cd6f/go.mod h1:SvzfYNNBshAVbZ8wzNc/UPK3w1vf0dKDUP41ucAIf7g=
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 h1:YqzBA7tf8Gv8Oz0BbBsPenqkyjiohS7EUIwi7p1QJCU=
Expand Down
9 changes: 2 additions & 7 deletions pkg/server/apiinternal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "apiinternal",
srcs = [
"api_internal.go",
"api_internal_admin.go",
"api_internal_status.go",
"api_internal_ts.go",
],
srcs = ["api_internal.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/server/apiinternal",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -20,9 +15,9 @@ go_library(
"//pkg/ts/tspb",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
"@com_github_gorilla_mux//:mux",
"@com_github_gorilla_schema//:schema",
"@io_storj_drpc//:drpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
Expand Down
162 changes: 79 additions & 83 deletions pkg/server/apiinternal/api_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"github.com/gorilla/schema"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// httpMethod represents HTTP methods supported by the API.
type httpMethod string

// Supported HTTP methods for the internal API.
const (
GET httpMethod = http.MethodGet
POST httpMethod = http.MethodPost
"storj.io/drpc"
)

var decoder = func() *schema.Decoder {
Expand All @@ -43,39 +34,30 @@ var decoder = func() *schema.Decoder {
return d
}()

// route defines a REST endpoint with its handler and HTTP method.
type route struct {
method httpMethod
path string
handler http.HandlerFunc
}

// apiInternalServer provides REST endpoints that proxy to RPC services. It
// serves as a bridge between HTTP REST clients and internal RPC services.
type apiInternalServer struct {
mux *mux.Router
status serverpb.RPCStatusClient
admin serverpb.RPCAdminClient
timeseries tspb.RPCTimeSeriesClient
mux *mux.Router
}

// NewAPIInternalServer creates a new REST API server that proxies to internal
// RPC services. It establishes connections to the RPC services and registers
// all REST endpoints.
// all REST endpoints. Routes are auto-generated from google.api.http proto
// annotations via protoc-gen-go-drpc.
func NewAPIInternalServer(
ctx context.Context, nd rpcbase.NodeDialer, localNodeID roachpb.NodeID, useDRPC bool,
) (*apiInternalServer, error) {
status, err := serverpb.DialStatusClient(nd, ctx, localNodeID, useDRPC)
statusClient, err := serverpb.DialStatusClient(nd, ctx, localNodeID, useDRPC)
if err != nil {
return nil, err
}

admin, err := serverpb.DialAdminClient(nd, ctx, localNodeID, useDRPC)
adminClient, err := serverpb.DialAdminClient(nd, ctx, localNodeID, useDRPC)
if err != nil {
return nil, err
}

timeseries, err := rpcbase.DialRPCClient(
tsClient, err := rpcbase.DialRPCClient(
nd,
ctx,
localNodeID,
Expand All @@ -87,83 +69,97 @@ func NewAPIInternalServer(
if err != nil {
return nil, err
}
r := &apiInternalServer{
status: status,
admin: admin,
timeseries: timeseries,
mux: mux.NewRouter(),
}

r.registerStatusRoutes()
r.registerAdminRoutes()
r.registerTimeSeriesRoutes()
r := &apiInternalServer{mux: mux.NewRouter()}
r.registerRoutes(serverpb.DRPCStatusGatewayRoutes(statusClient))
r.registerRoutes(serverpb.DRPCAdminGatewayRoutes(adminClient))
r.registerRoutes(tspb.DRPCTimeSeriesGatewayRoutes(tsClient))

return r, nil
}

// ServeHTTP implements http.Handler interface
// ServeHTTP implements http.Handler interface.
func (r *apiInternalServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.mux.ServeHTTP(w, req)
}

// createHandler creates an HTTP handler function that proxies requests to the
// given RPC method.
func createHandler[TReq, TResp protoutil.Message](
rpcMethod func(context.Context, TReq) (TResp, error),
) http.HandlerFunc {
var zero TReq
msgName := proto.MessageName(zero)
msgType := proto.MessageType(msgName)
if msgType == nil {
panic(errors.AssertionFailedf("failed to determine request protobuf type: %s", msgName))
}
return func(w http.ResponseWriter, req *http.Request) {
newReq := reflect.New(msgType.Elem()).Interface().(TReq)
if err := executeRPC(w, req, rpcMethod, newReq); err != nil {
ctx := req.Context()
apiutil.WriteHTTPError(ctx, w, req, err)
}
func (r *apiInternalServer) registerRoutes(routes []drpc.HTTPRoute) {
for _, route := range routes {
r.mux.HandleFunc(route.Path, createHandlerFromRoute(route.Handler)).
Methods(route.Method)
}
}

// executeRPC is a generic function that handles the common pattern of:
// 1. Decoding HTTP request parameters (query string for GET, body for POST)
// 2. Forwarding HTTP auth information to the RPC context
// 3. Calling the RPC method
// 4. Writing the response back to the HTTP client
// createHandlerFromRoute creates an HTTP handler from a drpc.HTTPRoute.Handler,
// which is typed as any but holds a func(context.Context, *TReq) (*TResp, error).
// Type safety is guaranteed by the generated DRPC*GatewayRoutes functions; the
// reflection-based dispatch here lets a single helper serve every RPC without
// generating per-method handler code.
//
// This eliminates boilerplate code across all endpoint handlers.
func executeRPC[TReq, TResp protoutil.Message](
w http.ResponseWriter,
req *http.Request,
rpcMethod func(context.Context, TReq) (TResp, error),
rpcReq TReq,
) error {
ctx := req.Context()
ctx = authserver.ForwardHTTPAuthInfoToRPCCalls(ctx, req)
ctx = rpc.MarkDRPCGatewayRequest(ctx)

if err := decoder.Decode(rpcReq, req.URL.Query()); err != nil {
return err
}
if err := decodePathVars(rpcReq, mux.Vars(req)); err != nil {
return err
// As a concrete example, take the Nodes RPC. The handler stored on the route is
// the bound method
//
// statusClient.Nodes: func(context.Context, *serverpb.NodesRequest) (*serverpb.NodesResponse, error)
//
// For an incoming `GET /_status/nodes?...` request, this function:
//
// 1. Allocates a fresh *serverpb.NodesRequest via reflection.
// 2. Populates it from the URL query string and path vars (and the body
// for POST requests).
// 3. Invokes statusClient.Nodes(ctx, req) via fn.Call.
// 4. Writes the *serverpb.NodesResponse (or error) back to the HTTP client.
func createHandlerFromRoute(rpcMethod any) http.HandlerFunc {
fn := reflect.ValueOf(rpcMethod)
fnType := fn.Type()
if fnType.Kind() != reflect.Func || fnType.NumIn() != 2 || fnType.NumOut() != 2 {
panic(errors.AssertionFailedf(
"expected func(context.Context, *TReq) (*TResp, error), got %T", rpcMethod))
}
// For POST requests, decode the request body (JSON or protobuf)
if req.Method == http.MethodPost {
if err := apiutil.DecodeRequest(req, rpcReq); err != nil {
return status.Errorf(codes.InvalidArgument, "failed to decode request body: %v", err)
// fnType.In(1) is the request pointer (e.g. *NodesRequest); strip the
// pointer so reqType is the struct itself, which is what reflect.New takes
// to allocate a fresh request value per HTTP request below.
reqType := fnType.In(1).Elem()
return func(w http.ResponseWriter, req *http.Request) {
// Allocate a zero request value (e.g. &NodesRequest{}) to populate
// from the HTTP request.
newReq := reflect.New(reqType)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments around the reflection calls so the code will be easy to follow/read?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

rpcReq := newReq.Interface().(protoutil.Message)
ctx := req.Context()
ctx = authserver.ForwardHTTPAuthInfoToRPCCalls(ctx, req)
ctx = rpc.MarkDRPCGatewayRequest(ctx)

if err := decoder.Decode(rpcReq, req.URL.Query()); err != nil {
apiutil.WriteHTTPError(ctx, w, req, err)
return
}
if err := decodePathVars(rpcReq, mux.Vars(req)); err != nil {
apiutil.WriteHTTPError(ctx, w, req, err)
return
}
// For POST requests, decode the request body (JSON or protobuf).
if req.Method == http.MethodPost {
if err := apiutil.DecodeRequest(req, rpcReq); err != nil {
apiutil.WriteHTTPError(ctx, w, req,
status.Errorf(codes.InvalidArgument, "failed to decode request body: %v", err))
return
}
}
}

resp, err := rpcMethod(ctx, rpcReq)
if err != nil {
return err
// Invoke the RPC, e.g. statusClient.Nodes(ctx, req); results are
// the (*NodesResponse, error) return values.
results := fn.Call([]reflect.Value{reflect.ValueOf(ctx), newReq})
if !results[1].IsNil() {
apiutil.WriteHTTPError(ctx, w, req, results[1].Interface().(error))
return
}
resp := results[0].Interface().(protoutil.Message)
if err := apiutil.WriteResponse(ctx, w, req, http.StatusOK, resp); err != nil {
apiutil.WriteHTTPError(ctx, w, req, err)
}
}
return apiutil.WriteResponse(ctx, w, req, http.StatusOK, resp)
}

func decodePathVars[TReq protoutil.Message](rpcReq TReq, vars map[string]string) error {
func decodePathVars(rpcReq any, vars map[string]string) error {
pathParams := make(url.Values)
for k, v := range vars {
pathParams[k] = []string{v}
Expand Down
59 changes: 0 additions & 59 deletions pkg/server/apiinternal/api_internal_admin.go

This file was deleted.

Loading
Loading