-
Notifications
You must be signed in to change notification settings - Fork 46
[DX-3050] add chip fanout router #2504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Tofel
wants to merge
8
commits into
main
Choose a base branch
from
dx-3050-chip-fanout-router
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
6be930e
initial implementation of chip router
Tofel e824a22
add chip fanout router
Tofel b5242de
revert dep update
Tofel f9a4559
clean go mod, CR fixes
Tofel 0690aff
Merge remote-tracking branch 'origin/main' into dx-3050-chip-fanout-r…
Tofel 10062ac
bump deps
Tofel 8387697
idempotent subscriber registration; do not use local framework replac…
Tofel 86da0ce
Merge remote-tracking branch 'origin/main' into dx-3050-chip-fanout-r…
Tofel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| # Chip Router | ||
|
|
||
| `chiprouter` is a small CTF component that owns the fixed ChIP ingress port and fans incoming telemetry out to registered downstream subscribers. | ||
|
|
||
| It exists to keep the local CRE topology simple: | ||
| - Chainlink nodes always publish to a single ingress owner on `50051` | ||
| - lightweight test sinks subscribe behind the router | ||
| - real ChIP / Beholder subscribes behind the same router | ||
|
|
||
| That removes the old split where some tests bound ingress directly while others started real ChIP. | ||
|
|
||
| ## Ports | ||
|
|
||
| The component exposes: | ||
| - admin HTTP: `50050` | ||
| - ingress gRPC: `50051` | ||
|
|
||
| In the local CRE topology, real ChIP / Beholder typically subscribes downstream on `50053`. | ||
|
|
||
| ## Image Contract | ||
|
|
||
| The component runs whatever image is provided in `chip_router.image`. | ||
|
|
||
| The expected local CRE convention is: | ||
| - env TOMLs use a local alias such as `chip-router:<commit-sha>` | ||
| - setup/pull logic is responsible for making that alias exist locally | ||
| - remote ECR image names stay in setup/pull config and are retagged locally to the alias | ||
|
|
||
| ## Runtime Behavior | ||
|
|
||
| The router: | ||
| - exposes a health endpoint on `/health` | ||
| - accepts subscriber registration over its admin API | ||
| - forwards published ChIP ingress requests to all registered subscribers | ||
| - is best-effort per subscriber, so one failing downstream does not block others | ||
|
|
||
| Host-based downstream subscribers should register host-reachable endpoints. In local CRE, host-local sink endpoints are normalized to the Docker host gateway before registration. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| - Remove default value for compatibility testing's `buildcmd` param | ||
| - Add `CHiP router` component to fanout Beholder events |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| FROM golang:1.25.3 AS builder | ||
|
|
||
| WORKDIR /src | ||
|
|
||
| COPY go.mod go.sum ./ | ||
| RUN go mod download | ||
|
|
||
| COPY . . | ||
|
|
||
| ARG TARGETOS=linux | ||
| ARG TARGETARCH=amd64 | ||
| ARG CTF_LOG_LEVEL=info | ||
| ENV CTF_LOG_LEVEL=${CTF_LOG_LEVEL} | ||
|
|
||
| RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -trimpath -ldflags="-s -w" -o /out/chip-router ./cmd/chip-router | ||
|
|
||
| FROM gcr.io/distroless/static-debian12:nonroot | ||
|
|
||
| COPY --from=builder /out/chip-router /chip-router | ||
|
|
||
| EXPOSE 50050 50051 | ||
|
|
||
| ENTRYPOINT ["/chip-router"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,205 @@ | ||
| package chiprouter | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "net/http" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/docker/docker/api/types/container" | ||
| "github.com/docker/go-connections/nat" | ||
| "github.com/smartcontractkit/chainlink-testing-framework/framework" | ||
| tc "github.com/testcontainers/testcontainers-go" | ||
| tcwait "github.com/testcontainers/testcontainers-go/wait" | ||
| ) | ||
|
|
||
| const ( | ||
| DefaultGRPCPort = 50051 | ||
| DefaultAdminPort = 50050 | ||
| DefaultBeholderGRPCPort = 50053 | ||
| adminPathHealth = "/health" | ||
| ) | ||
|
|
||
| type Input struct { | ||
| Image string `toml:"image" comment:"Chip router Docker image"` | ||
| GRPCPort int `toml:"grpc_port" comment:"Chip router gRPC host/container port"` | ||
| AdminPort int `toml:"admin_port" comment:"Chip router admin HTTP host/container port"` | ||
| ContainerName string `toml:"container_name" comment:"Docker container name"` | ||
| PullImage bool `toml:"pull_image" comment:"Whether to pull Chip router image or not"` | ||
| LogLevel string `toml:"log_level" comment:"Chip router log level (trace, debug, info, warn, error)"` | ||
| Out *Output `toml:"out" comment:"Chip router output"` | ||
| } | ||
|
|
||
| type Output struct { | ||
| UseCache bool `toml:"use_cache" comment:"Whether to reuse cached output"` | ||
| ContainerName string `toml:"container_name" comment:"Docker container name"` | ||
| ExternalGRPCURL string `toml:"grpc_external_url" comment:"Host-reachable gRPC endpoint"` | ||
| InternalGRPCURL string `toml:"grpc_internal_url" comment:"Docker-network gRPC endpoint"` | ||
| ExternalAdminURL string `toml:"admin_external_url" comment:"Host-reachable admin endpoint"` | ||
| InternalAdminURL string `toml:"admin_internal_url" comment:"Docker-network admin endpoint"` | ||
| } | ||
|
|
||
| type registerSubscriberRequest struct { | ||
| Name string `json:"name"` | ||
| Endpoint string `json:"endpoint"` | ||
| } | ||
|
|
||
| type registerSubscriberResponse struct { | ||
| ID string `json:"id"` | ||
| } | ||
|
|
||
| type HealthResponse struct { | ||
| } | ||
|
|
||
| func defaults(in *Input) { | ||
| if in.GRPCPort == 0 { | ||
| in.GRPCPort = DefaultGRPCPort | ||
| } | ||
| if in.AdminPort == 0 { | ||
| in.AdminPort = DefaultAdminPort | ||
| } | ||
| if in.ContainerName == "" { | ||
| in.ContainerName = framework.DefaultTCName("chip-router") | ||
| } | ||
| } | ||
|
|
||
| func New(in *Input) (*Output, error) { | ||
| return NewWithContext(context.Background(), in) | ||
| } | ||
|
|
||
| func NewWithContext(ctx context.Context, in *Input) (*Output, error) { | ||
| if in.Out != nil && in.Out.UseCache { | ||
| return in.Out, nil | ||
| } | ||
|
|
||
| if strings.TrimSpace(in.Image) == "" { | ||
| return nil, fmt.Errorf("chip router image must be provided") | ||
| } | ||
|
|
||
| defaults(in) | ||
|
|
||
| grpcPort := fmt.Sprintf("%d/tcp", in.GRPCPort) | ||
| adminPort := fmt.Sprintf("%d/tcp", in.AdminPort) | ||
|
|
||
| req := tc.ContainerRequest{ | ||
| Name: in.ContainerName, | ||
| Image: in.Image, | ||
| AlwaysPullImage: in.PullImage, | ||
| Labels: framework.DefaultTCLabels(), | ||
| Networks: []string{framework.DefaultNetworkName}, | ||
| NetworkAliases: map[string][]string{ | ||
| framework.DefaultNetworkName: {in.ContainerName}, | ||
| }, | ||
| ExposedPorts: []string{grpcPort, adminPort}, | ||
| Env: map[string]string{ | ||
| "CHIP_ROUTER_GRPC_ADDR": fmt.Sprintf("0.0.0.0:%d", in.GRPCPort), | ||
| "CHIP_ROUTER_ADMIN_ADDR": fmt.Sprintf("0.0.0.0:%d", in.AdminPort), | ||
| "CTF_LOG_LEVEL": in.LogLevel, | ||
| }, | ||
| HostConfigModifier: func(h *container.HostConfig) { | ||
| h.PortBindings = framework.MapTheSamePort(grpcPort, adminPort) | ||
| h.ExtraHosts = append(h.ExtraHosts, "host.docker.internal:host-gateway") | ||
| }, | ||
| WaitingFor: tcwait.ForAll( | ||
| tcwait.ForListeningPort(nat.Port(grpcPort)).WithPollInterval(200*time.Millisecond), | ||
| tcwait.ForHTTP(adminPathHealth). | ||
| WithPort(nat.Port(adminPort)). | ||
| WithStartupTimeout(1*time.Minute). | ||
| WithPollInterval(200*time.Millisecond), | ||
| ), | ||
| } | ||
|
|
||
| c, err := tc.GenericContainer(ctx, tc.GenericContainerRequest{ | ||
| ContainerRequest: req, | ||
| Started: true, | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| host, err := framework.GetHostWithContext(ctx, c) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| out := &Output{ | ||
| UseCache: true, | ||
| ContainerName: in.ContainerName, | ||
| ExternalGRPCURL: fmt.Sprintf("%s:%d", host, in.GRPCPort), | ||
| InternalGRPCURL: fmt.Sprintf("%s:%d", in.ContainerName, in.GRPCPort), | ||
| ExternalAdminURL: fmt.Sprintf("http://%s:%d", host, in.AdminPort), | ||
| InternalAdminURL: fmt.Sprintf("http://%s:%d", in.ContainerName, in.AdminPort), | ||
| } | ||
| in.Out = out | ||
| return out, nil | ||
| } | ||
|
|
||
| func Health(ctx context.Context, adminURL string) (*HealthResponse, error) { | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimRight(adminURL, "/")+adminPathHealth, nil) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer resp.Body.Close() | ||
| if resp.StatusCode != http.StatusOK { | ||
| return nil, fmt.Errorf("chip router health request failed with status %s", resp.Status) | ||
| } | ||
| var out HealthResponse | ||
| if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { | ||
| return nil, err | ||
| } | ||
| return &out, nil | ||
| } | ||
|
|
||
| func RegisterSubscriber(ctx context.Context, adminURL, name, endpoint string) (string, error) { | ||
| body, err := json.Marshal(registerSubscriberRequest{Name: name, Endpoint: endpoint}) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(adminURL, "/")+"/subscribers", bytes.NewReader(body)) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| req.Header.Set("Content-Type", "application/json") | ||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| defer resp.Body.Close() | ||
| if resp.StatusCode != http.StatusOK { | ||
| return "", fmt.Errorf("chip router register request failed with status %s", resp.Status) | ||
| } | ||
| var out registerSubscriberResponse | ||
| if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { | ||
| return "", err | ||
| } | ||
| if strings.TrimSpace(out.ID) == "" { | ||
| return "", fmt.Errorf("chip router register response missing subscriber id") | ||
| } | ||
| return out.ID, nil | ||
| } | ||
|
|
||
| func UnregisterSubscriber(ctx context.Context, adminURL, id string) error { | ||
| if strings.TrimSpace(id) == "" { | ||
| return nil | ||
| } | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodDelete, strings.TrimRight(adminURL, "/")+"/subscribers/"+id, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer resp.Body.Close() | ||
| if resp.StatusCode != http.StatusNoContent { | ||
| return fmt.Errorf("chip router unregister request failed with status %s", resp.Status) | ||
| } | ||
| return nil | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.