From cce05401f2046c0cf6e8ed8bd55d1ab9a4a585ad Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Tue, 10 Mar 2026 02:45:21 -0400 Subject: [PATCH 1/5] init commit --- .../data/v2/stub/BigtableChannelPrimer.java | 15 ++-- .../v2/stub/EnhancedBigtableStubSettings.java | 52 ++++++++--- .../data/v2/stub/NoOpChannelPrimer.java | 12 ++- .../BigtableTransportChannelProvider.java | 90 +++++++++++++++++-- .../bigtable/gaxx/grpc/ChannelPrimer.java | 14 ++- .../gaxx/grpc/DirectAccessChecker.java | 26 ++++++ .../gaxx/grpc/UnaryDirectAccessChecker.java | 58 ++++++++++++ 7 files changed, 228 insertions(+), 39 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/DirectAccessChecker.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 3b2a169910..24ce44607a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer; import io.grpc.CallCredentials; import io.grpc.CallOptions; +import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.Deadline; import io.grpc.ManagedChannel; @@ -88,21 +89,21 @@ static BigtableChannelPrimer create( } @Override - public void primeChannel(ManagedChannel managedChannel) { + public void primeChannel(Channel channel) { try { - primeChannelUnsafe(managedChannel); + primeChannelUnsafe(channel); } catch (IOException | RuntimeException e) { LOG.log(Level.WARNING, "Unexpected error while trying to prime a channel", e); } } - private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException { - sendPrimeRequestsBlocking(managedChannel); + private void primeChannelUnsafe(Channel channel) throws IOException { + sendPrimeRequestsBlocking(channel); } - private void sendPrimeRequestsBlocking(ManagedChannel managedChannel) { + private void sendPrimeRequestsBlocking(Channel channel) { try { - sendPrimeRequestsAsync(managedChannel).get(1, TimeUnit.MINUTES); + sendPrimeRequestsAsync(channel).get(1, TimeUnit.MINUTES); } catch (Throwable e) { // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping // channels if the new @@ -111,7 +112,7 @@ private void sendPrimeRequestsBlocking(ManagedChannel managedChannel) { } } - public ApiFuture sendPrimeRequestsAsync(ManagedChannel managedChannel) { + public ApiFuture sendPrimeRequestsAsync(Channel managedChannel) { ClientCall clientCall = managedChannel.newCall( BigtableGrpc.getPingAndWarmMethod(), diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 1a416d51e4..cfb7a2ab47 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -242,6 +242,25 @@ public ClientOperationSettings getPerOpSettings() { return perOpSettings; } + /** Applies common pool, message size, and keep-alive settings to the provided builder. */ + private static InstantiatingGrpcChannelProvider.Builder commonTraits( + InstantiatingGrpcChannelProvider.Builder builder) { + return builder + .setChannelPoolSettings( + ChannelPoolSettings.builder() + .setInitialChannelCount(10) + .setMinRpcsPerChannel(1) + // Keep it conservative as we scale the channel size every 1min + // and delta is 2 channels. + .setMaxRpcsPerChannel(25) + .setPreemptiveRefreshEnabled(true) + .build()) + .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) + .setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval + .setKeepAliveTimeout( + Duration.ofSeconds(10)); // wait this long before considering the connection dead + } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { InstantiatingGrpcChannelProvider.Builder grpcTransportProviderBuilder = @@ -261,22 +280,27 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); } } - return grpcTransportProviderBuilder - .setChannelPoolSettings( - ChannelPoolSettings.builder() - .setInitialChannelCount(10) - .setMinRpcsPerChannel(1) - // Keep it conservative as we scale the channel size every 1min - // and delta is 2 channels. - .setMaxRpcsPerChannel(25) - .setPreemptiveRefreshEnabled(true) - .build()) - .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) - .setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval - .setKeepAliveTimeout( - Duration.ofSeconds(10)); // wait this long before considering the connection dead + return commonTraits(grpcTransportProviderBuilder); } + /** Applies Direct Access traits (DirectPath & ALTS) to an existing builder. */ + public static InstantiatingGrpcChannelProvider.Builder applyDirectAccessTraits( + InstantiatingGrpcChannelProvider.Builder builder) { + + builder + .setAttemptDirectPathXds() + .setAttemptDirectPath(true) + .setAllowNonDefaultServiceAccount(true); + + if (!DIRECT_PATH_BOUND_TOKEN_DISABLED) { + builder.setAllowHardBoundTokenTypes( + Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); + } + + return builder; + } + + @SuppressWarnings("WeakerAccess") public static TransportChannelProvider defaultTransportChannelProvider() { return defaultGrpcTransportProviderBuilder().build(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java index 3cb98d9dee..86c564bcc0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/NoOpChannelPrimer.java @@ -16,11 +16,11 @@ package com.google.cloud.bigtable.data.v2.stub; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; -import com.google.api.core.SettableApiFuture; import com.google.bigtable.v2.PingAndWarmResponse; import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer; -import io.grpc.ManagedChannel; +import io.grpc.Channel; @InternalApi public class NoOpChannelPrimer implements ChannelPrimer { @@ -31,14 +31,12 @@ static NoOpChannelPrimer create() { private NoOpChannelPrimer() {} @Override - public void primeChannel(ManagedChannel channel) { + public void primeChannel(Channel channel) { // No op } @Override - public ApiFuture sendPrimeRequestsAsync(ManagedChannel channel) { - SettableApiFuture future = SettableApiFuture.create(); - future.set(PingAndWarmResponse.getDefaultInstance()); - return future; + public ApiFuture sendPrimeRequestsAsync(Channel channel) { + return ApiFutures.immediateFuture(PingAndWarmResponse.getDefaultInstance()); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java index a893ba8218..a81b511909 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java @@ -24,12 +24,16 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.Credentials; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.common.base.Preconditions; +import io.grpc.Channel; import io.grpc.ManagedChannel; import java.io.IOException; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; /** @@ -38,20 +42,25 @@ */ @InternalApi public final class BigtableTransportChannelProvider implements TransportChannelProvider { + private static final Logger LOG = + Logger.getLogger(BigtableTransportChannelProvider.class.getName()); private final InstantiatingGrpcChannelProvider delegate; private final ChannelPrimer channelPrimer; @Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer; @Nullable private final ScheduledExecutorService backgroundExecutor; + @Nullable private final Map headers; private BigtableTransportChannelProvider( InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, ChannelPrimer channelPrimer, ChannelPoolMetricsTracer channelPoolMetricsTracer, - ScheduledExecutorService backgroundExecutor) { + ScheduledExecutorService backgroundExecutor, + @Nullable Map headers) { delegate = Preconditions.checkNotNull(instantiatingGrpcChannelProvider); this.channelPrimer = channelPrimer; this.channelPoolMetricsTracer = channelPoolMetricsTracer; this.backgroundExecutor = backgroundExecutor; + this.headers = headers; } @Override @@ -76,7 +85,7 @@ public BigtableTransportChannelProvider withExecutor(Executor executor) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withExecutor(executor); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); } @Override @@ -89,7 +98,7 @@ public TransportChannelProvider withBackgroundExecutor(ScheduledExecutorService InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withBackgroundExecutor(executor); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, executor); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, executor, headers); } @Override @@ -102,7 +111,7 @@ public BigtableTransportChannelProvider withHeaders(Map headers) InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withHeaders(headers); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); } @Override @@ -115,7 +124,7 @@ public TransportChannelProvider withEndpoint(String endpoint) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withEndpoint(endpoint); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); } @Deprecated @@ -130,12 +139,73 @@ public TransportChannelProvider withPoolSize(int size) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withPoolSize(size); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); + } + + // We need this for direct access checker. + private Map updateFeatureFlags( + Map originalHeaders, boolean isDirectAccessEligible) { + if (originalHeaders == null) { + return java.util.Collections.emptyMap(); + } + java.util.Map newHeaders = new java.util.HashMap<>(originalHeaders); + String encodedFlags = newHeaders.get("bigtable-features"); + + if (encodedFlags != null) { + try { + byte[] decoded = java.util.Base64.getUrlDecoder().decode(encodedFlags); + com.google.bigtable.v2.FeatureFlags flags = + com.google.bigtable.v2.FeatureFlags.parseFrom(decoded); + + com.google.bigtable.v2.FeatureFlags updatedFlags = + flags.toBuilder() + .setDirectAccessRequested(isDirectAccessEligible) + .setTrafficDirectorEnabled(isDirectAccessEligible) + .build(); + + newHeaders.put( + "bigtable-features", + java.util.Base64.getUrlEncoder().encodeToString(updatedFlags.toByteArray())); + } catch (Exception e) { + // use original headers + } + } + return newHeaders; } /** Expected to only be called once when BigtableClientContext is created */ @Override public TransportChannel getTransportChannel() throws IOException { + Map directAccessEligibleHeaders = updateFeatureFlags(this.headers, true); + + InstantiatingGrpcChannelProvider.Builder directAccessProvider = + EnhancedBigtableStubSettings.applyDirectAccessTraits(delegate.toBuilder()) + .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)); + + InstantiatingGrpcChannelProvider directAccessProviderWithHeaders = + (InstantiatingGrpcChannelProvider) + directAccessProvider.build().withHeaders(directAccessEligibleHeaders); + GrpcTransportChannel directAccessTransportChannel = + (GrpcTransportChannel) directAccessProviderWithHeaders.getTransportChannel(); + Channel maybeDirectAccessChannel = directAccessTransportChannel.getChannel(); + DirectAccessChecker directAccessChecker = UnaryDirectAccessChecker.create(channelPrimer); + boolean isDirectAccessEligible = false; + + try { + isDirectAccessEligible = directAccessChecker.check(maybeDirectAccessChannel); + } catch (Exception e) { + LOG.log(Level.INFO, "Client is not direct access eligible, using standard transport.", e); + } + + InstantiatingGrpcChannelProvider selectedProvider; + + if (isDirectAccessEligible) { + selectedProvider = directAccessProviderWithHeaders; + } else { + Map fallbackHeaders = updateFeatureFlags(this.headers, false); + selectedProvider = (InstantiatingGrpcChannelProvider) delegate.withHeaders(fallbackHeaders); + } + // This provider's main purpose is to replace the default GAX ChannelPool // with a custom BigtableChannelPool, reusing the delegate's configuration. @@ -143,7 +213,9 @@ public TransportChannel getTransportChannel() throws IOException { // We achieve this by configuring our delegate to not use its own pooling // (by setting pool size to 1) and then calling getTransportChannel() on it. InstantiatingGrpcChannelProvider singleChannelProvider = - delegate.toBuilder().setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)).build(); + selectedProvider.toBuilder() + .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)) + .build(); ChannelFactory channelFactory = () -> { @@ -187,7 +259,7 @@ public TransportChannelProvider withCredentials(Credentials credentials) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, ); } /** Creates a BigtableTransportChannelProvider. */ @@ -200,6 +272,6 @@ public static BigtableTransportChannelProvider create( instantiatingGrpcChannelProvider, channelPrimer, outstandingRpcsMetricTracker, - backgroundExecutor); + backgroundExecutor, null); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java index ea7cc70175..6234b4410c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java @@ -18,11 +18,21 @@ import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; import com.google.bigtable.v2.PingAndWarmResponse; +import io.grpc.Channel; import io.grpc.ManagedChannel; @InternalApi("For internal use by google-cloud-java clients only") public interface ChannelPrimer { - void primeChannel(ManagedChannel channel); + @Deprecated + default void primeChannel(ManagedChannel channel) { + primeChannel((Channel) channel); + } - ApiFuture sendPrimeRequestsAsync(ManagedChannel channel); + void primeChannel(Channel channel); + + @Deprecated + default ApiFuture sendPrimeRequestsAsync(ManagedChannel channel) { + return sendPrimeRequestsAsync((Channel) channel); + } + ApiFuture sendPrimeRequestsAsync(Channel channel); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/DirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/DirectAccessChecker.java new file mode 100644 index 0000000000..5ab0109b25 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/DirectAccessChecker.java @@ -0,0 +1,26 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.grpc; + +import com.google.api.core.InternalApi; +import io.grpc.Channel; + +@InternalApi +/* Evaluates whether a given channel supports Direct Access. */ +public interface DirectAccessChecker { + /// Performs a request on the provided channel to check for Direct Access eligibility. + boolean check(Channel channel ); +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java new file mode 100644 index 0000000000..138dbd245c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java @@ -0,0 +1,58 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.gaxx.grpc; + +import com.google.api.core.InternalApi; +import com.google.bigtable.v2.PeerInfo; +import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor; +import io.grpc.Channel; +import io.grpc.ClientInterceptors; + +import java.util.Optional; + +/** + * Evaluates whether a given channel has Direct Access (DirectPath) routing + * by executing a RPC and inspecting the response headers. + */ +@InternalApi +public class UnaryDirectAccessChecker implements DirectAccessChecker { + + private final ChannelPrimer channelPrimer; + + private UnaryDirectAccessChecker(ChannelPrimer channelPrimer) { + this.channelPrimer = channelPrimer; + } + + public static UnaryDirectAccessChecker create(ChannelPrimer channelPrimer) { + return new UnaryDirectAccessChecker(channelPrimer); + } + + @Override + public boolean check(Channel channel) { + MetadataExtractorInterceptor interceptor = new MetadataExtractorInterceptor(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); + channelPrimer.primeChannel(interceptedChannel); + + // Extract the sideband data populated by the interceptor + MetadataExtractorInterceptor.SidebandData sidebandData = interceptor.getSidebandData(); + + return Optional.of(sidebandData) + .map(MetadataExtractorInterceptor.SidebandData::getPeerInfo) + .map(PeerInfo::getTransportType) + .map(type -> type == PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) + .orElse(false); + } +} \ No newline at end of file From 068cdf211897a9525774ee24fe049b6da3016826 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Tue, 10 Mar 2026 16:41:46 -0400 Subject: [PATCH 2/5] fix --- .../bigtable/data/v2/stub/BigtableChannelPrimer.java | 2 +- .../gaxx/grpc/BigtableTransportChannelProvider.java | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 24ce44607a..78ee4e497d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -29,10 +29,10 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.Deadline; -import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; + import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java index a81b511909..7a41e98f97 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java @@ -206,6 +206,12 @@ public TransportChannel getTransportChannel() throws IOException { selectedProvider = (InstantiatingGrpcChannelProvider) delegate.withHeaders(fallbackHeaders); } + + if (maybeDirectAccessChannel instanceof ManagedChannel) { + // call shutdown + ((ManagedChannel) maybeDirectAccessChannel).shutdownNow(); + } + // This provider's main purpose is to replace the default GAX ChannelPool // with a custom BigtableChannelPool, reusing the delegate's configuration. @@ -259,7 +265,7 @@ public TransportChannelProvider withCredentials(Credentials credentials) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, ); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); } /** Creates a BigtableTransportChannelProvider. */ From f7a3a5bdeb9641f13b0b92f0ead70bdef80673d8 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Thu, 12 Mar 2026 03:15:32 -0400 Subject: [PATCH 3/5] fix --- .../data/v2/internal/csm/Metrics.java | 4 + .../data/v2/internal/csm/MetricsImpl.java | 11 ++ .../DefaultDirectPathCompatibleTracer.java | 42 ++++++ .../tracers/DirectPathCompatibleTracer.java | 39 +++++ .../v2/stub/BigtableChannelFactory.java} | 12 +- .../data/v2/stub/BigtableClientContext.java | 3 +- .../data/v2/stub/DirectAccessChecker.java | 34 +++++ .../v2/stub/EnhancedBigtableStubSettings.java | 8 +- .../v2/stub/MetadataExtractorInterceptor.java | 34 +++++ .../v2/stub/UnaryDirectAccessChecker.java | 84 +++++++++++ .../gaxx/grpc/BigtableChannelPool.java | 8 +- .../BigtableTransportChannelProvider.java | 118 ++++++--------- .../gaxx/grpc/UnaryDirectAccessChecker.java | 58 -------- .../v2/stub/UnaryDirectAccessCheckerTest.java | 134 ++++++++++++++++++ .../gaxx/grpc/BigtableChannelPoolTest.java | 4 +- 15 files changed, 444 insertions(+), 149 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java rename google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/{gaxx/grpc/DirectAccessChecker.java => data/v2/stub/BigtableChannelFactory.java} (68%) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java delete mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/Metrics.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/Metrics.java index 7df665c673..823458e0a4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/Metrics.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/Metrics.java @@ -18,6 +18,7 @@ import com.google.api.gax.tracing.ApiTracerFactory; import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; import io.grpc.ManagedChannelBuilder; import java.io.Closeable; import java.io.IOException; @@ -31,6 +32,9 @@ public interface Metrics extends Closeable { @Nullable ChannelPoolMetricsTracer getChannelPoolMetricsTracer(); + @Nullable + DirectPathCompatibleTracer getDirectPathCompatibleTracer(); + void start(); @Override diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java index f0efac7e96..5d4c8d0e58 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java @@ -31,6 +31,8 @@ import com.google.cloud.bigtable.data.v2.internal.csm.tracers.BuiltinMetricsTracerFactory; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.CompositeTracerFactory; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DefaultDirectPathCompatibleTracer; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.Pacemaker; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -67,6 +69,7 @@ public class MetricsImpl implements Metrics, Closeable { @Nullable private final GrpcOpenTelemetry grpcOtel; @Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer; + @Nullable private final DirectPathCompatibleTracer directPathCompatibleTracer; @Nullable private final Pacemaker pacemaker; private final List> tasks = new ArrayList<>(); @@ -94,6 +97,7 @@ public MetricsImpl( this.internalRecorder = metricRegistry.newRecorderRegistry(internalOtel.getMeterProvider()); this.pacemaker = new Pacemaker(internalRecorder, clientInfo, "background"); this.channelPoolMetricsTracer = new ChannelPoolMetricsTracer(internalRecorder, clientInfo); + this.directPathCompatibleTracer = new DefaultDirectPathCompatibleTracer(clientInfo, internalRecorder); this.grpcOtel = GrpcOpenTelemetry.newBuilder() .sdk(internalOtel) @@ -109,6 +113,7 @@ public MetricsImpl( this.grpcOtel = null; this.pacemaker = null; this.channelPoolMetricsTracer = null; + this.directPathCompatibleTracer = null; } if (userOtel != null) { @@ -171,6 +176,12 @@ public ChannelPoolMetricsTracer getChannelPoolMetricsTracer() { return channelPoolMetricsTracer; } + @Nullable + @Override + public DirectPathCompatibleTracer getDirectPathCompatibleTracer() { + return directPathCompatibleTracer; + } + public static OpenTelemetrySdk createBuiltinOtel( MetricRegistry metricRegistry, ClientInfo clientInfo, diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java new file mode 100644 index 0000000000..041b226ab2 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java @@ -0,0 +1,42 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.internal.csm.tracers; + +import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.data.v2.internal.csm.MetricRegistry; +import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo; + +@InternalApi +public class DefaultDirectPathCompatibleTracer implements DirectPathCompatibleTracer { + private final ClientInfo clientInfo; + private final MetricRegistry.RecorderRegistry recorder; + + public DefaultDirectPathCompatibleTracer( + ClientInfo clientInfo, MetricRegistry.RecorderRegistry recorder) { + this.clientInfo = clientInfo; + this.recorder = recorder; + } + + @Override + public void recordSuccess(String ipPreference) { + recorder.dpCompatGuage.recordSuccess(clientInfo, ipPreference); + } + + @Override + public void recordFailure(String reason) { + recorder.dpCompatGuage.recordFailure(clientInfo, reason); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java new file mode 100644 index 0000000000..0a385494cf --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.internal.csm.tracers; + +import com.google.api.core.InternalApi; + +/** + * Interface for recording DirectPath/DirectAccess eligibility metrics. + */ +@InternalApi +public interface DirectPathCompatibleTracer { + + /** + * Records that the environment is eligible and successfully connected via DirectPath. + * + * @param ipPreference The IP preference used (e.g., "ipv6"). + */ + void recordSuccess(String ipPreference); + + /** + * Records that the environment is not eligible or failed to connect via DirectPath. + * + * @param reason The reason for the failure (e.g., "routing_check_failed"). + */ + void recordFailure(String reason); +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/DirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java similarity index 68% rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/DirectAccessChecker.java rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java index 5ab0109b25..a4ce11b58a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/DirectAccessChecker.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java @@ -13,14 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.cloud.bigtable.gaxx.grpc; +package com.google.cloud.bigtable.data.v2.stub; import com.google.api.core.InternalApi; -import io.grpc.Channel; +import io.grpc.ManagedChannel; + +import java.io.IOException; @InternalApi -/* Evaluates whether a given channel supports Direct Access. */ -public interface DirectAccessChecker { - /// Performs a request on the provided channel to check for Direct Access eligibility. - boolean check(Channel channel ); +public interface BigtableChannelFactory { + ManagedChannel createSingleChannel() throws IOException; } \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java index 2828d67f43..dc559206cb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java @@ -166,7 +166,8 @@ public static BigtableClientContext create( transportProvider.build(), channelPrimer, metrics.getChannelPoolMetricsTracer(), - backgroundExecutor); + backgroundExecutor, metrics.getDirectPathCompatibleTracer() + ); builder.setTransportChannelProvider(btTransportProvider); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java new file mode 100644 index 0000000000..281ab7b48f --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java @@ -0,0 +1,34 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; +import io.grpc.Channel; + +import javax.annotation.Nullable; + +@InternalApi +/* Evaluates whether a given channel supports Direct Access. */ +public interface DirectAccessChecker { + /** + * Evaluates if Direct Access is available by creating a test channel. + * + * @param channelFactory A factory to create the test channel + * @return true if the channel is eligible for Direct Access + */ + boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer); +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index cfb7a2ab47..c0fecdd5cc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -633,12 +633,16 @@ private Builder() { perOpSettings = new ClientOperationSettings.Builder(); + // Note: RouteLookup evaluates and returns directpath targets + // only if Traffic Director sends the request (with grpc as target type) + // For GFE/CFE, sending setDirectAccessRequested + // is fine as GFE/CFE sends with gslb target type featureFlags = FeatureFlags.newBuilder() .setReverseScans(true) .setLastScannedRowResponses(true) - .setDirectAccessRequested(DIRECT_PATH_ENABLED) - .setTrafficDirectorEnabled(DIRECT_PATH_ENABLED) + .setDirectAccessRequested(true) + .setTrafficDirectorEnabled(true) .setPeerInfo(true); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java index 7d21a5f498..ba5794cf15 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java @@ -29,10 +29,16 @@ import io.grpc.ClientInterceptors; import io.grpc.ForwardingClientCall; import io.grpc.ForwardingClientCallListener; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.alts.AltsContextUtil; + +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.time.Duration; import java.util.Base64; import java.util.regex.Matcher; @@ -85,6 +91,12 @@ public SidebandData getSidebandData() { } public static class SidebandData { + public enum IpProtocol { + IPV4, + IPV6, + UNKNOWN + } + private static final CallOptions.Key KEY = CallOptions.Key.create("bigtable-sideband"); @@ -105,6 +117,7 @@ public static SidebandData from(CallOptions callOptions) { @Nullable private volatile ResponseParams responseParams; @Nullable private volatile PeerInfo peerInfo; @Nullable private volatile Duration gfeTiming; + @Nullable private volatile IpProtocol ipProtocol; @Nullable public ResponseParams getResponseParams() { @@ -121,16 +134,23 @@ public Duration getGfeTiming() { return gfeTiming; } + @Nullable + public IpProtocol getIpProtocol() { + return ipProtocol; + } + private void reset() { responseParams = null; peerInfo = null; gfeTiming = null; + ipProtocol = IpProtocol.UNKNOWN; } void onResponseHeaders(Metadata md, Attributes attributes) { responseParams = extractResponseParams(md); gfeTiming = extractGfeLatency(md); peerInfo = extractPeerInfo(md, gfeTiming, attributes); + ipProtocol = extractIpProtocol(attributes); } void onClose(Status status, Metadata trailers) { @@ -139,6 +159,20 @@ void onClose(Status status, Metadata trailers) { } } + @Nullable + private static IpProtocol extractIpProtocol(Attributes attributes) { + SocketAddress remoteAddr = attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + if (remoteAddr instanceof InetSocketAddress) { + InetSocketAddress inetAddr = (InetSocketAddress) remoteAddr; + if (inetAddr.getAddress() instanceof Inet4Address) { + return IpProtocol.IPV4; + } else if (inetAddr.getAddress() instanceof Inet6Address) { + return IpProtocol.IPV6; + } + } + return IpProtocol.UNKNOWN; + } + @Nullable private static Duration extractGfeLatency(Metadata metadata) { String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java new file mode 100644 index 0000000000..edb842235f --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java @@ -0,0 +1,84 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.InternalApi; +import com.google.bigtable.v2.PeerInfo; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; +import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer; +import io.grpc.Channel; +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; + +import javax.annotation.Nullable; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Evaluates whether a given channel has Direct Access (DirectPath) routing + * by executing a RPC and inspecting the response headers. + */ +@InternalApi +public class UnaryDirectAccessChecker implements DirectAccessChecker { + private static final Logger LOG = Logger.getLogger(UnaryDirectAccessChecker.class.getName()); + private final ChannelPrimer channelPrimer; + + private UnaryDirectAccessChecker(ChannelPrimer channelPrimer) { + this.channelPrimer = channelPrimer; + } + + public static UnaryDirectAccessChecker create(ChannelPrimer channelPrimer) { + return new UnaryDirectAccessChecker(channelPrimer); + } + + @Override + public boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer) { + ManagedChannel channel = null; + try { + channel = channelFactory.createSingleChannel(); + MetadataExtractorInterceptor interceptor = new MetadataExtractorInterceptor(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); + channelPrimer.primeChannel(interceptedChannel); + + // Extract the sideband data populated by the interceptor + MetadataExtractorInterceptor.SidebandData sidebandData = interceptor.getSidebandData(); + + boolean isEligible = Optional.ofNullable(sidebandData) + .map(MetadataExtractorInterceptor.SidebandData::getPeerInfo) + .map(PeerInfo::getTransportType) + .map(type -> type == PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) + .orElse(false); + + if (isEligible && tracer != null) { + String ipProtocolStr = Optional.ofNullable(sidebandData) + .map(MetadataExtractorInterceptor.SidebandData::getIpProtocol) + .map(String::valueOf) + .map(String::toLowerCase) + .orElse("unknown"); + tracer.recordSuccess(ipProtocolStr); + } + return isEligible; + } catch (Exception e) { + LOG.log(Level.FINE, "Failed to evaluate direct access eligibility.", e); + return false; + } finally { + if (channel != null) { + channel.shutdownNow(); + } + } + } +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java index 6bbfba1398..4bcaab902d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java @@ -16,8 +16,8 @@ package com.google.cloud.bigtable.gaxx.grpc; import com.google.api.core.InternalApi; -import com.google.api.gax.grpc.ChannelFactory; import com.google.bigtable.v2.PeerInfo; +import com.google.cloud.bigtable.data.v2.stub.BigtableChannelFactory; import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor; import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult; import com.google.common.annotations.VisibleForTesting; @@ -67,7 +67,7 @@ public class BigtableChannelPool extends ManagedChannel implements BigtableChann private static final java.time.Duration REFRESH_PERIOD = java.time.Duration.ofMinutes(50); private final BigtableChannelPoolSettings settings; - private final ChannelFactory channelFactory; + private final BigtableChannelFactory channelFactory; private final ChannelPrimer channelPrimer; private final ScheduledExecutorService executor; @@ -83,7 +83,7 @@ public class BigtableChannelPool extends ManagedChannel implements BigtableChann public static BigtableChannelPool create( BigtableChannelPoolSettings settings, - ChannelFactory channelFactory, + BigtableChannelFactory channelFactory, ChannelPrimer channelPrimer, ScheduledExecutorService backgroundExecutor) throws IOException { @@ -100,7 +100,7 @@ public static BigtableChannelPool create( @VisibleForTesting BigtableChannelPool( BigtableChannelPoolSettings settings, - ChannelFactory channelFactory, + BigtableChannelFactory channelFactory, ChannelPrimer channelPrimer, ScheduledExecutorService executor) throws IOException { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java index 7a41e98f97..972e12cb2f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java @@ -23,8 +23,12 @@ import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.Credentials; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; +import com.google.cloud.bigtable.data.v2.stub.BigtableChannelFactory; +import com.google.cloud.bigtable.data.v2.stub.DirectAccessChecker; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.data.v2.stub.UnaryDirectAccessChecker; import com.google.common.base.Preconditions; import io.grpc.Channel; import io.grpc.ManagedChannel; @@ -48,19 +52,18 @@ public final class BigtableTransportChannelProvider implements TransportChannelP private final ChannelPrimer channelPrimer; @Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer; @Nullable private final ScheduledExecutorService backgroundExecutor; - @Nullable private final Map headers; + DirectPathCompatibleTracer directPathCompatibleTracer; private BigtableTransportChannelProvider( InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, ChannelPrimer channelPrimer, ChannelPoolMetricsTracer channelPoolMetricsTracer, - ScheduledExecutorService backgroundExecutor, - @Nullable Map headers) { + ScheduledExecutorService backgroundExecutor, DirectPathCompatibleTracer directPathCompatibleTracer ) { delegate = Preconditions.checkNotNull(instantiatingGrpcChannelProvider); this.channelPrimer = channelPrimer; this.channelPoolMetricsTracer = channelPoolMetricsTracer; this.backgroundExecutor = backgroundExecutor; - this.headers = headers; + this.directPathCompatibleTracer = directPathCompatibleTracer; } @Override @@ -85,7 +88,7 @@ public BigtableTransportChannelProvider withExecutor(Executor executor) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withExecutor(executor); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); } @Override @@ -98,7 +101,7 @@ public TransportChannelProvider withBackgroundExecutor(ScheduledExecutorService InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withBackgroundExecutor(executor); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, executor, headers); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, executor, directPathCompatibleTracer); } @Override @@ -111,7 +114,7 @@ public BigtableTransportChannelProvider withHeaders(Map headers) InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withHeaders(headers); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); } @Override @@ -124,7 +127,7 @@ public TransportChannelProvider withEndpoint(String endpoint) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withEndpoint(endpoint); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); } @Deprecated @@ -139,77 +142,39 @@ public TransportChannelProvider withPoolSize(int size) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withPoolSize(size); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); } // We need this for direct access checker. - private Map updateFeatureFlags( - Map originalHeaders, boolean isDirectAccessEligible) { - if (originalHeaders == null) { - return java.util.Collections.emptyMap(); - } - java.util.Map newHeaders = new java.util.HashMap<>(originalHeaders); - String encodedFlags = newHeaders.get("bigtable-features"); - - if (encodedFlags != null) { - try { - byte[] decoded = java.util.Base64.getUrlDecoder().decode(encodedFlags); - com.google.bigtable.v2.FeatureFlags flags = - com.google.bigtable.v2.FeatureFlags.parseFrom(decoded); - - com.google.bigtable.v2.FeatureFlags updatedFlags = - flags.toBuilder() - .setDirectAccessRequested(isDirectAccessEligible) - .setTrafficDirectorEnabled(isDirectAccessEligible) - .build(); - - newHeaders.put( - "bigtable-features", - java.util.Base64.getUrlEncoder().encodeToString(updatedFlags.toByteArray())); - } catch (Exception e) { - // use original headers - } - } - return newHeaders; - } /** Expected to only be called once when BigtableClientContext is created */ @Override public TransportChannel getTransportChannel() throws IOException { - Map directAccessEligibleHeaders = updateFeatureFlags(this.headers, true); - - InstantiatingGrpcChannelProvider.Builder directAccessProvider = + InstantiatingGrpcChannelProvider directAccessProvider = EnhancedBigtableStubSettings.applyDirectAccessTraits(delegate.toBuilder()) - .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)); - - InstantiatingGrpcChannelProvider directAccessProviderWithHeaders = - (InstantiatingGrpcChannelProvider) - directAccessProvider.build().withHeaders(directAccessEligibleHeaders); - GrpcTransportChannel directAccessTransportChannel = - (GrpcTransportChannel) directAccessProviderWithHeaders.getTransportChannel(); - Channel maybeDirectAccessChannel = directAccessTransportChannel.getChannel(); + .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)).build(); + + BigtableChannelFactory maybeDirectAccessChannelFactory = () -> { + GrpcTransportChannel channel = + (GrpcTransportChannel) directAccessProvider.getTransportChannel(); + return (ManagedChannel) channel.getChannel(); + }; + DirectAccessChecker directAccessChecker = UnaryDirectAccessChecker.create(channelPrimer); boolean isDirectAccessEligible = false; try { - isDirectAccessEligible = directAccessChecker.check(maybeDirectAccessChannel); + isDirectAccessEligible = directAccessChecker.check(maybeDirectAccessChannelFactory, directPathCompatibleTracer ); } catch (Exception e) { - LOG.log(Level.INFO, "Client is not direct access eligible, using standard transport.", e); + LOG.log(Level.FINE, "Client is not direct access eligible, using standard transport.", e); } InstantiatingGrpcChannelProvider selectedProvider; if (isDirectAccessEligible) { - selectedProvider = directAccessProviderWithHeaders; + selectedProvider = directAccessProvider; } else { - Map fallbackHeaders = updateFeatureFlags(this.headers, false); - selectedProvider = (InstantiatingGrpcChannelProvider) delegate.withHeaders(fallbackHeaders); - } - - - if (maybeDirectAccessChannel instanceof ManagedChannel) { - // call shutdown - ((ManagedChannel) maybeDirectAccessChannel).shutdownNow(); + selectedProvider = delegate; } // This provider's main purpose is to replace the default GAX ChannelPool @@ -223,16 +188,16 @@ public TransportChannel getTransportChannel() throws IOException { .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)) .build(); - ChannelFactory channelFactory = - () -> { - try { - GrpcTransportChannel channel = - (GrpcTransportChannel) singleChannelProvider.getTransportChannel(); - return (ManagedChannel) channel.getChannel(); - } catch (IOException e) { - throw new java.io.UncheckedIOException(e); - } - }; + BigtableChannelFactory channelFactory = + () -> { + try { + GrpcTransportChannel channel = + (GrpcTransportChannel) singleChannelProvider.getTransportChannel(); + return (ManagedChannel) channel.getChannel(); + } catch (IOException e) { + throw new java.io.UncheckedIOException(e); + } + }; BigtableChannelPoolSettings btPoolSettings = BigtableChannelPoolSettings.copyFrom(delegate.getChannelPoolSettings()); @@ -265,19 +230,20 @@ public TransportChannelProvider withCredentials(Credentials credentials) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, headers); + newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); } /** Creates a BigtableTransportChannelProvider. */ public static BigtableTransportChannelProvider create( - InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, - ChannelPrimer channelPrimer, - ChannelPoolMetricsTracer outstandingRpcsMetricTracker, - ScheduledExecutorService backgroundExecutor) { + InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, + ChannelPrimer channelPrimer, + ChannelPoolMetricsTracer outstandingRpcsMetricTracker, + ScheduledExecutorService backgroundExecutor, + DirectPathCompatibleTracer directPathCompatibleTracer) { return new BigtableTransportChannelProvider( instantiatingGrpcChannelProvider, channelPrimer, outstandingRpcsMetricTracker, - backgroundExecutor, null); + backgroundExecutor, directPathCompatibleTracer); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java deleted file mode 100644 index 138dbd245c..0000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/UnaryDirectAccessChecker.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2026 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.gaxx.grpc; - -import com.google.api.core.InternalApi; -import com.google.bigtable.v2.PeerInfo; -import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor; -import io.grpc.Channel; -import io.grpc.ClientInterceptors; - -import java.util.Optional; - -/** - * Evaluates whether a given channel has Direct Access (DirectPath) routing - * by executing a RPC and inspecting the response headers. - */ -@InternalApi -public class UnaryDirectAccessChecker implements DirectAccessChecker { - - private final ChannelPrimer channelPrimer; - - private UnaryDirectAccessChecker(ChannelPrimer channelPrimer) { - this.channelPrimer = channelPrimer; - } - - public static UnaryDirectAccessChecker create(ChannelPrimer channelPrimer) { - return new UnaryDirectAccessChecker(channelPrimer); - } - - @Override - public boolean check(Channel channel) { - MetadataExtractorInterceptor interceptor = new MetadataExtractorInterceptor(); - Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); - channelPrimer.primeChannel(interceptedChannel); - - // Extract the sideband data populated by the interceptor - MetadataExtractorInterceptor.SidebandData sidebandData = interceptor.getSidebandData(); - - return Optional.of(sidebandData) - .map(MetadataExtractorInterceptor.SidebandData::getPeerInfo) - .map(PeerInfo::getTransportType) - .map(type -> type == PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) - .orElse(false); - } -} \ No newline at end of file diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java new file mode 100644 index 0000000000..5894565898 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java @@ -0,0 +1,134 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.google.bigtable.v2.PeerInfo; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; +import com.google.cloud.bigtable.gaxx.grpc.ChannelPrimer; +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class UnaryDirectAccessCheckerTest { + + @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + + @Mock private ChannelPrimer mockChannelPrimer; + @Mock private BigtableChannelFactory mockChannelFactory; + @Mock private DirectPathCompatibleTracer mockTracer; + @Mock private ManagedChannel mockChannel; + @Mock private MetadataExtractorInterceptor.SidebandData mockSidebandData; + + private UnaryDirectAccessChecker checker; + + @Before + public void setUp() throws Exception { + checker = UnaryDirectAccessChecker.create(mockChannelPrimer); + when(mockChannelFactory.createSingleChannel()).thenReturn(mockChannel); + } + + @Test + public void testEligibleForDirectAccess() { + PeerInfo peerInfo = PeerInfo.newBuilder() + .setTransportType(PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) + .build(); + when(mockSidebandData.getPeerInfo()).thenReturn(peerInfo); + + when(mockSidebandData.getIpProtocol()).thenReturn(MetadataExtractorInterceptor.SidebandData.IpProtocol.IPV6); + + try (MockedConstruction ignored = + mockConstruction( + MetadataExtractorInterceptor.class, + (mock, context) -> { + when(mock.getSidebandData()).thenReturn(mockSidebandData); + })) { + + boolean isEligible = checker.check(mockChannelFactory, mockTracer); + + assertThat(isEligible).isFalse(); + verify(mockChannelPrimer).primeChannel(any(Channel.class)); + verify(mockTracer).recordSuccess("ipv6"); + verify(mockChannel).shutdownNow(); + } + } + + @Test + public void testNotEligibleProxiedRouting() { + // 1. Setup sideband data to simulate standard CloudPath routing + PeerInfo peerInfo = PeerInfo.newBuilder() + .setTransportType(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH) + .build(); + when(mockSidebandData.getPeerInfo()).thenReturn(peerInfo); + + try (MockedConstruction mocked = + mockConstruction( + MetadataExtractorInterceptor.class, + (mock, context) -> { + when(mock.getSidebandData()).thenReturn(mockSidebandData); + })) { + + boolean isEligible = checker.check(mockChannelFactory, mockTracer); + + assertThat(isEligible).isFalse(); + verifyNoInteractions(mockTracer); + verify(mockChannel).shutdownNow(); + } + } + + @Test + public void testMissingSidebandData() { + // Interceptor failed to capture anything (returns null) + try (MockedConstruction mocked = + mockConstruction( + MetadataExtractorInterceptor.class, + (mock, context) -> { + when(mock.getSidebandData()).thenReturn(null); + })) { + + boolean isEligible = checker.check(mockChannelFactory, mockTracer); + + assertThat(isEligible).isFalse(); + verifyNoInteractions(mockTracer); + verify(mockChannel).shutdownNow(); + } + } + + @Test + public void testExceptionSafetyAndCleanup() { + doThrow(new RuntimeException("Simulated primer failure")) + .when(mockChannelPrimer) + .primeChannel(any(Channel.class)); + + boolean isEligible = checker.check(mockChannelFactory, mockTracer); + + assertThat(isEligible).isFalse(); + verifyNoInteractions(mockTracer); + verify(mockChannel).shutdownNow(); + } +} \ No newline at end of file diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolTest.java index d1059c0362..1cd98a92c4 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPoolTest.java @@ -19,7 +19,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; -import com.google.api.gax.grpc.ChannelFactory; +import com.google.cloud.bigtable.data.v2.stub.BigtableChannelFactory; import com.google.common.collect.Iterables; import io.grpc.CallOptions; import io.grpc.ClientCall; @@ -48,7 +48,7 @@ public class BigtableChannelPoolTest { @Rule public final MockitoRule mockito = MockitoJUnit.rule(); - @Mock private ChannelFactory mockChannelFactory; + @Mock private BigtableChannelFactory mockChannelFactory; @Mock private ChannelPrimer mockChannelPrimer; @Mock private ManagedChannel mockChannel; @Mock private ClientCall mockClientCall; From 24994738f26b8ecc1321533c405c68efb1c07aef Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Thu, 12 Mar 2026 12:02:27 -0400 Subject: [PATCH 4/5] fix --- .../cloud/bigtable/data/v2/stub/BigtableChannelFactory.java | 2 +- .../google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java | 2 +- .../cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java | 2 +- .../bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java index a4ce11b58a..80244b05ea 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java @@ -23,4 +23,4 @@ @InternalApi public interface BigtableChannelFactory { ManagedChannel createSingleChannel() throws IOException; -} \ No newline at end of file +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java index 281ab7b48f..b919d380bd 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java @@ -31,4 +31,4 @@ public interface DirectAccessChecker { * @return true if the channel is eligible for Direct Access */ boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer); -} \ No newline at end of file +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java index edb842235f..0c8469ee58 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java @@ -81,4 +81,4 @@ public boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPath } } } -} \ No newline at end of file +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java index 5894565898..4684c2a72f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java @@ -131,4 +131,4 @@ public void testExceptionSafetyAndCleanup() { verifyNoInteractions(mockTracer); verify(mockChannel).shutdownNow(); } -} \ No newline at end of file +} From e1334644e2fcae54aebd95faada69240842b6e58 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Thu, 12 Mar 2026 12:13:39 -0400 Subject: [PATCH 5/5] fix --- .../data/v2/internal/csm/MetricsImpl.java | 3 +- .../DefaultDirectPathCompatibleTracer.java | 30 +-- .../tracers/DirectPathCompatibleTracer.java | 30 ++- .../data/v2/stub/BigtableChannelFactory.java | 3 +- .../data/v2/stub/BigtableChannelPrimer.java | 1 - .../data/v2/stub/BigtableClientContext.java | 4 +- .../data/v2/stub/DirectAccessChecker.java | 16 +- .../v2/stub/EnhancedBigtableStubSettings.java | 39 ++-- .../v2/stub/MetadataExtractorInterceptor.java | 1 - .../v2/stub/UnaryDirectAccessChecker.java | 90 ++++----- .../BigtableTransportChannelProvider.java | 99 ++++++---- .../bigtable/gaxx/grpc/ChannelPrimer.java | 1 + .../v2/stub/UnaryDirectAccessCheckerTest.java | 179 +++++++++--------- 13 files changed, 261 insertions(+), 235 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java index 5d4c8d0e58..5c384a55fb 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/MetricsImpl.java @@ -97,7 +97,8 @@ public MetricsImpl( this.internalRecorder = metricRegistry.newRecorderRegistry(internalOtel.getMeterProvider()); this.pacemaker = new Pacemaker(internalRecorder, clientInfo, "background"); this.channelPoolMetricsTracer = new ChannelPoolMetricsTracer(internalRecorder, clientInfo); - this.directPathCompatibleTracer = new DefaultDirectPathCompatibleTracer(clientInfo, internalRecorder); + this.directPathCompatibleTracer = + new DefaultDirectPathCompatibleTracer(clientInfo, internalRecorder); this.grpcOtel = GrpcOpenTelemetry.newBuilder() .sdk(internalOtel) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java index 041b226ab2..465e147868 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DefaultDirectPathCompatibleTracer.java @@ -21,22 +21,22 @@ @InternalApi public class DefaultDirectPathCompatibleTracer implements DirectPathCompatibleTracer { - private final ClientInfo clientInfo; - private final MetricRegistry.RecorderRegistry recorder; + private final ClientInfo clientInfo; + private final MetricRegistry.RecorderRegistry recorder; - public DefaultDirectPathCompatibleTracer( - ClientInfo clientInfo, MetricRegistry.RecorderRegistry recorder) { - this.clientInfo = clientInfo; - this.recorder = recorder; - } + public DefaultDirectPathCompatibleTracer( + ClientInfo clientInfo, MetricRegistry.RecorderRegistry recorder) { + this.clientInfo = clientInfo; + this.recorder = recorder; + } - @Override - public void recordSuccess(String ipPreference) { - recorder.dpCompatGuage.recordSuccess(clientInfo, ipPreference); - } + @Override + public void recordSuccess(String ipPreference) { + recorder.dpCompatGuage.recordSuccess(clientInfo, ipPreference); + } - @Override - public void recordFailure(String reason) { - recorder.dpCompatGuage.recordFailure(clientInfo, reason); - } + @Override + public void recordFailure(String reason) { + recorder.dpCompatGuage.recordFailure(clientInfo, reason); + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java index 0a385494cf..4776785dc2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/DirectPathCompatibleTracer.java @@ -17,23 +17,21 @@ import com.google.api.core.InternalApi; -/** - * Interface for recording DirectPath/DirectAccess eligibility metrics. - */ +/** Interface for recording DirectPath/DirectAccess eligibility metrics. */ @InternalApi public interface DirectPathCompatibleTracer { - /** - * Records that the environment is eligible and successfully connected via DirectPath. - * - * @param ipPreference The IP preference used (e.g., "ipv6"). - */ - void recordSuccess(String ipPreference); + /** + * Records that the environment is eligible and successfully connected via DirectPath. + * + * @param ipPreference The IP preference used (e.g., "ipv6"). + */ + void recordSuccess(String ipPreference); - /** - * Records that the environment is not eligible or failed to connect via DirectPath. - * - * @param reason The reason for the failure (e.g., "routing_check_failed"). - */ - void recordFailure(String reason); -} \ No newline at end of file + /** + * Records that the environment is not eligible or failed to connect via DirectPath. + * + * @param reason The reason for the failure (e.g., "routing_check_failed"). + */ + void recordFailure(String reason); +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java index 80244b05ea..ce814e3073 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelFactory.java @@ -17,10 +17,9 @@ import com.google.api.core.InternalApi; import io.grpc.ManagedChannel; - import java.io.IOException; @InternalApi public interface BigtableChannelFactory { - ManagedChannel createSingleChannel() throws IOException; + ManagedChannel createSingleChannel() throws IOException; } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java index 78ee4e497d..6706fb6a5c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java @@ -32,7 +32,6 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; - import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java index dc559206cb..9f21939b10 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java @@ -166,8 +166,8 @@ public static BigtableClientContext create( transportProvider.build(), channelPrimer, metrics.getChannelPoolMetricsTracer(), - backgroundExecutor, metrics.getDirectPathCompatibleTracer() - ); + backgroundExecutor, + metrics.getDirectPathCompatibleTracer()); builder.setTransportChannelProvider(btTransportProvider); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java index b919d380bd..9c5d66c559 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/DirectAccessChecker.java @@ -17,18 +17,16 @@ import com.google.api.core.InternalApi; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; -import io.grpc.Channel; - import javax.annotation.Nullable; @InternalApi /* Evaluates whether a given channel supports Direct Access. */ public interface DirectAccessChecker { - /** - * Evaluates if Direct Access is available by creating a test channel. - * - * @param channelFactory A factory to create the test channel - * @return true if the channel is eligible for Direct Access - */ - boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer); + /** + * Evaluates if Direct Access is available by creating a test channel. + * + * @param channelFactory A factory to create the test channel + * @return true if the channel is eligible for Direct Access + */ + boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index c0fecdd5cc..752b2ab786 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -244,21 +244,21 @@ public ClientOperationSettings getPerOpSettings() { /** Applies common pool, message size, and keep-alive settings to the provided builder. */ private static InstantiatingGrpcChannelProvider.Builder commonTraits( - InstantiatingGrpcChannelProvider.Builder builder) { + InstantiatingGrpcChannelProvider.Builder builder) { return builder - .setChannelPoolSettings( - ChannelPoolSettings.builder() - .setInitialChannelCount(10) - .setMinRpcsPerChannel(1) - // Keep it conservative as we scale the channel size every 1min - // and delta is 2 channels. - .setMaxRpcsPerChannel(25) - .setPreemptiveRefreshEnabled(true) - .build()) - .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) - .setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval - .setKeepAliveTimeout( - Duration.ofSeconds(10)); // wait this long before considering the connection dead + .setChannelPoolSettings( + ChannelPoolSettings.builder() + .setInitialChannelCount(10) + .setMinRpcsPerChannel(1) + // Keep it conservative as we scale the channel size every 1min + // and delta is 2 channels. + .setMaxRpcsPerChannel(25) + .setPreemptiveRefreshEnabled(true) + .build()) + .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) + .setKeepAliveTime(Duration.ofSeconds(30)) // sends ping in this interval + .setKeepAliveTimeout( + Duration.ofSeconds(10)); // wait this long before considering the connection dead } /** Returns a builder for the default ChannelProvider for this service. */ @@ -285,22 +285,21 @@ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProvi /** Applies Direct Access traits (DirectPath & ALTS) to an existing builder. */ public static InstantiatingGrpcChannelProvider.Builder applyDirectAccessTraits( - InstantiatingGrpcChannelProvider.Builder builder) { + InstantiatingGrpcChannelProvider.Builder builder) { builder - .setAttemptDirectPathXds() - .setAttemptDirectPath(true) - .setAllowNonDefaultServiceAccount(true); + .setAttemptDirectPathXds() + .setAttemptDirectPath(true) + .setAllowNonDefaultServiceAccount(true); if (!DIRECT_PATH_BOUND_TOKEN_DISABLED) { builder.setAllowHardBoundTokenTypes( - Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); + Collections.singletonList(InstantiatingGrpcChannelProvider.HardBoundTokenTypes.ALTS)); } return builder; } - @SuppressWarnings("WeakerAccess") public static TransportChannelProvider defaultTransportChannelProvider() { return defaultGrpcTransportProviderBuilder().build(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java index ba5794cf15..6baa5577e0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java @@ -34,7 +34,6 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.alts.AltsContextUtil; - import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetSocketAddress; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java index 0c8469ee58..9549fbe403 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessChecker.java @@ -22,63 +22,65 @@ import io.grpc.Channel; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; - -import javax.annotation.Nullable; import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** - * Evaluates whether a given channel has Direct Access (DirectPath) routing - * by executing a RPC and inspecting the response headers. + * Evaluates whether a given channel has Direct Access (DirectPath) routing by executing a RPC and + * inspecting the response headers. */ @InternalApi public class UnaryDirectAccessChecker implements DirectAccessChecker { - private static final Logger LOG = Logger.getLogger(UnaryDirectAccessChecker.class.getName()); - private final ChannelPrimer channelPrimer; + private static final Logger LOG = Logger.getLogger(UnaryDirectAccessChecker.class.getName()); + private final ChannelPrimer channelPrimer; - private UnaryDirectAccessChecker(ChannelPrimer channelPrimer) { - this.channelPrimer = channelPrimer; - } + private UnaryDirectAccessChecker(ChannelPrimer channelPrimer) { + this.channelPrimer = channelPrimer; + } - public static UnaryDirectAccessChecker create(ChannelPrimer channelPrimer) { - return new UnaryDirectAccessChecker(channelPrimer); - } + public static UnaryDirectAccessChecker create(ChannelPrimer channelPrimer) { + return new UnaryDirectAccessChecker(channelPrimer); + } - @Override - public boolean check(BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer) { - ManagedChannel channel = null; - try { - channel = channelFactory.createSingleChannel(); - MetadataExtractorInterceptor interceptor = new MetadataExtractorInterceptor(); - Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); - channelPrimer.primeChannel(interceptedChannel); + @Override + public boolean check( + BigtableChannelFactory channelFactory, @Nullable DirectPathCompatibleTracer tracer) { + ManagedChannel channel = null; + try { + channel = channelFactory.createSingleChannel(); + MetadataExtractorInterceptor interceptor = new MetadataExtractorInterceptor(); + Channel interceptedChannel = ClientInterceptors.intercept(channel, interceptor); + channelPrimer.primeChannel(interceptedChannel); - // Extract the sideband data populated by the interceptor - MetadataExtractorInterceptor.SidebandData sidebandData = interceptor.getSidebandData(); + // Extract the sideband data populated by the interceptor + MetadataExtractorInterceptor.SidebandData sidebandData = interceptor.getSidebandData(); - boolean isEligible = Optional.ofNullable(sidebandData) - .map(MetadataExtractorInterceptor.SidebandData::getPeerInfo) - .map(PeerInfo::getTransportType) - .map(type -> type == PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) - .orElse(false); + boolean isEligible = + Optional.ofNullable(sidebandData) + .map(MetadataExtractorInterceptor.SidebandData::getPeerInfo) + .map(PeerInfo::getTransportType) + .map(type -> type == PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) + .orElse(false); - if (isEligible && tracer != null) { - String ipProtocolStr = Optional.ofNullable(sidebandData) - .map(MetadataExtractorInterceptor.SidebandData::getIpProtocol) - .map(String::valueOf) - .map(String::toLowerCase) - .orElse("unknown"); - tracer.recordSuccess(ipProtocolStr); - } - return isEligible; - } catch (Exception e) { - LOG.log(Level.FINE, "Failed to evaluate direct access eligibility.", e); - return false; - } finally { - if (channel != null) { - channel.shutdownNow(); - } - } + if (isEligible && tracer != null) { + String ipProtocolStr = + Optional.ofNullable(sidebandData) + .map(MetadataExtractorInterceptor.SidebandData::getIpProtocol) + .map(String::valueOf) + .map(String::toLowerCase) + .orElse("unknown"); + tracer.recordSuccess(ipProtocolStr); + } + return isEligible; + } catch (Exception e) { + LOG.log(Level.FINE, "Failed to evaluate direct access eligibility.", e); + return false; + } finally { + if (channel != null) { + channel.shutdownNow(); + } } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java index 972e12cb2f..01c1cc12f7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableTransportChannelProvider.java @@ -16,21 +16,19 @@ package com.google.cloud.bigtable.gaxx.grpc; import com.google.api.core.InternalApi; -import com.google.api.gax.grpc.ChannelFactory; import com.google.api.gax.grpc.ChannelPoolSettings; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.Credentials; +import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer; import com.google.cloud.bigtable.data.v2.internal.csm.tracers.DirectPathCompatibleTracer; import com.google.cloud.bigtable.data.v2.stub.BigtableChannelFactory; import com.google.cloud.bigtable.data.v2.stub.DirectAccessChecker; -import com.google.cloud.bigtable.data.v2.internal.csm.tracers.ChannelPoolMetricsTracer; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; import com.google.cloud.bigtable.data.v2.stub.UnaryDirectAccessChecker; import com.google.common.base.Preconditions; -import io.grpc.Channel; import io.grpc.ManagedChannel; import java.io.IOException; import java.util.Map; @@ -47,7 +45,7 @@ @InternalApi public final class BigtableTransportChannelProvider implements TransportChannelProvider { private static final Logger LOG = - Logger.getLogger(BigtableTransportChannelProvider.class.getName()); + Logger.getLogger(BigtableTransportChannelProvider.class.getName()); private final InstantiatingGrpcChannelProvider delegate; private final ChannelPrimer channelPrimer; @Nullable private final ChannelPoolMetricsTracer channelPoolMetricsTracer; @@ -58,7 +56,8 @@ private BigtableTransportChannelProvider( InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, ChannelPrimer channelPrimer, ChannelPoolMetricsTracer channelPoolMetricsTracer, - ScheduledExecutorService backgroundExecutor, DirectPathCompatibleTracer directPathCompatibleTracer ) { + ScheduledExecutorService backgroundExecutor, + DirectPathCompatibleTracer directPathCompatibleTracer) { delegate = Preconditions.checkNotNull(instantiatingGrpcChannelProvider); this.channelPrimer = channelPrimer; this.channelPoolMetricsTracer = channelPoolMetricsTracer; @@ -88,7 +87,11 @@ public BigtableTransportChannelProvider withExecutor(Executor executor) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withExecutor(executor); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); + newChannelProvider, + channelPrimer, + channelPoolMetricsTracer, + backgroundExecutor, + directPathCompatibleTracer); } @Override @@ -101,7 +104,11 @@ public TransportChannelProvider withBackgroundExecutor(ScheduledExecutorService InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withBackgroundExecutor(executor); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, executor, directPathCompatibleTracer); + newChannelProvider, + channelPrimer, + channelPoolMetricsTracer, + executor, + directPathCompatibleTracer); } @Override @@ -114,7 +121,11 @@ public BigtableTransportChannelProvider withHeaders(Map headers) InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withHeaders(headers); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); + newChannelProvider, + channelPrimer, + channelPoolMetricsTracer, + backgroundExecutor, + directPathCompatibleTracer); } @Override @@ -127,7 +138,11 @@ public TransportChannelProvider withEndpoint(String endpoint) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withEndpoint(endpoint); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); + newChannelProvider, + channelPrimer, + channelPoolMetricsTracer, + backgroundExecutor, + directPathCompatibleTracer); } @Deprecated @@ -142,7 +157,11 @@ public TransportChannelProvider withPoolSize(int size) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withPoolSize(size); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); + newChannelProvider, + channelPrimer, + channelPoolMetricsTracer, + backgroundExecutor, + directPathCompatibleTracer); } // We need this for direct access checker. @@ -151,20 +170,23 @@ public TransportChannelProvider withPoolSize(int size) { @Override public TransportChannel getTransportChannel() throws IOException { InstantiatingGrpcChannelProvider directAccessProvider = - EnhancedBigtableStubSettings.applyDirectAccessTraits(delegate.toBuilder()) - .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)).build(); + EnhancedBigtableStubSettings.applyDirectAccessTraits(delegate.toBuilder()) + .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)) + .build(); - BigtableChannelFactory maybeDirectAccessChannelFactory = () -> { - GrpcTransportChannel channel = + BigtableChannelFactory maybeDirectAccessChannelFactory = + () -> { + GrpcTransportChannel channel = (GrpcTransportChannel) directAccessProvider.getTransportChannel(); - return (ManagedChannel) channel.getChannel(); - }; + return (ManagedChannel) channel.getChannel(); + }; DirectAccessChecker directAccessChecker = UnaryDirectAccessChecker.create(channelPrimer); boolean isDirectAccessEligible = false; try { - isDirectAccessEligible = directAccessChecker.check(maybeDirectAccessChannelFactory, directPathCompatibleTracer ); + isDirectAccessEligible = + directAccessChecker.check(maybeDirectAccessChannelFactory, directPathCompatibleTracer); } catch (Exception e) { LOG.log(Level.FINE, "Client is not direct access eligible, using standard transport.", e); } @@ -184,20 +206,20 @@ public TransportChannel getTransportChannel() throws IOException { // We achieve this by configuring our delegate to not use its own pooling // (by setting pool size to 1) and then calling getTransportChannel() on it. InstantiatingGrpcChannelProvider singleChannelProvider = - selectedProvider.toBuilder() - .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)) - .build(); + selectedProvider.toBuilder() + .setChannelPoolSettings(ChannelPoolSettings.staticallySized(1)) + .build(); BigtableChannelFactory channelFactory = - () -> { - try { - GrpcTransportChannel channel = - (GrpcTransportChannel) singleChannelProvider.getTransportChannel(); - return (ManagedChannel) channel.getChannel(); - } catch (IOException e) { - throw new java.io.UncheckedIOException(e); - } - }; + () -> { + try { + GrpcTransportChannel channel = + (GrpcTransportChannel) singleChannelProvider.getTransportChannel(); + return (ManagedChannel) channel.getChannel(); + } catch (IOException e) { + throw new java.io.UncheckedIOException(e); + } + }; BigtableChannelPoolSettings btPoolSettings = BigtableChannelPoolSettings.copyFrom(delegate.getChannelPoolSettings()); @@ -230,20 +252,25 @@ public TransportChannelProvider withCredentials(Credentials credentials) { InstantiatingGrpcChannelProvider newChannelProvider = (InstantiatingGrpcChannelProvider) delegate.withCredentials(credentials); return new BigtableTransportChannelProvider( - newChannelProvider, channelPrimer, channelPoolMetricsTracer, backgroundExecutor, directPathCompatibleTracer); + newChannelProvider, + channelPrimer, + channelPoolMetricsTracer, + backgroundExecutor, + directPathCompatibleTracer); } /** Creates a BigtableTransportChannelProvider. */ public static BigtableTransportChannelProvider create( - InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, - ChannelPrimer channelPrimer, - ChannelPoolMetricsTracer outstandingRpcsMetricTracker, - ScheduledExecutorService backgroundExecutor, - DirectPathCompatibleTracer directPathCompatibleTracer) { + InstantiatingGrpcChannelProvider instantiatingGrpcChannelProvider, + ChannelPrimer channelPrimer, + ChannelPoolMetricsTracer outstandingRpcsMetricTracker, + ScheduledExecutorService backgroundExecutor, + DirectPathCompatibleTracer directPathCompatibleTracer) { return new BigtableTransportChannelProvider( instantiatingGrpcChannelProvider, channelPrimer, outstandingRpcsMetricTracker, - backgroundExecutor, directPathCompatibleTracer); + backgroundExecutor, + directPathCompatibleTracer); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java index 6234b4410c..b9fb28bcb6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/ChannelPrimer.java @@ -34,5 +34,6 @@ default void primeChannel(ManagedChannel channel) { default ApiFuture sendPrimeRequestsAsync(ManagedChannel channel) { return sendPrimeRequestsAsync((Channel) channel); } + ApiFuture sendPrimeRequestsAsync(Channel channel); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java index 4684c2a72f..275c6313ce 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/UnaryDirectAccessCheckerTest.java @@ -37,98 +37,101 @@ @RunWith(JUnit4.class) public class UnaryDirectAccessCheckerTest { - @Rule public final MockitoRule mockito = MockitoJUnit.rule(); - - @Mock private ChannelPrimer mockChannelPrimer; - @Mock private BigtableChannelFactory mockChannelFactory; - @Mock private DirectPathCompatibleTracer mockTracer; - @Mock private ManagedChannel mockChannel; - @Mock private MetadataExtractorInterceptor.SidebandData mockSidebandData; - - private UnaryDirectAccessChecker checker; - - @Before - public void setUp() throws Exception { - checker = UnaryDirectAccessChecker.create(mockChannelPrimer); - when(mockChannelFactory.createSingleChannel()).thenReturn(mockChannel); - } - - @Test - public void testEligibleForDirectAccess() { - PeerInfo peerInfo = PeerInfo.newBuilder() - .setTransportType(PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) - .build(); - when(mockSidebandData.getPeerInfo()).thenReturn(peerInfo); - - when(mockSidebandData.getIpProtocol()).thenReturn(MetadataExtractorInterceptor.SidebandData.IpProtocol.IPV6); - - try (MockedConstruction ignored = - mockConstruction( - MetadataExtractorInterceptor.class, - (mock, context) -> { - when(mock.getSidebandData()).thenReturn(mockSidebandData); - })) { - - boolean isEligible = checker.check(mockChannelFactory, mockTracer); - - assertThat(isEligible).isFalse(); - verify(mockChannelPrimer).primeChannel(any(Channel.class)); - verify(mockTracer).recordSuccess("ipv6"); - verify(mockChannel).shutdownNow(); - } + @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + + @Mock private ChannelPrimer mockChannelPrimer; + @Mock private BigtableChannelFactory mockChannelFactory; + @Mock private DirectPathCompatibleTracer mockTracer; + @Mock private ManagedChannel mockChannel; + @Mock private MetadataExtractorInterceptor.SidebandData mockSidebandData; + + private UnaryDirectAccessChecker checker; + + @Before + public void setUp() throws Exception { + checker = UnaryDirectAccessChecker.create(mockChannelPrimer); + when(mockChannelFactory.createSingleChannel()).thenReturn(mockChannel); + } + + @Test + public void testEligibleForDirectAccess() { + PeerInfo peerInfo = + PeerInfo.newBuilder() + .setTransportType(PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS) + .build(); + when(mockSidebandData.getPeerInfo()).thenReturn(peerInfo); + + when(mockSidebandData.getIpProtocol()) + .thenReturn(MetadataExtractorInterceptor.SidebandData.IpProtocol.IPV6); + + try (MockedConstruction ignored = + mockConstruction( + MetadataExtractorInterceptor.class, + (mock, context) -> { + when(mock.getSidebandData()).thenReturn(mockSidebandData); + })) { + + boolean isEligible = checker.check(mockChannelFactory, mockTracer); + + assertThat(isEligible).isFalse(); + verify(mockChannelPrimer).primeChannel(any(Channel.class)); + verify(mockTracer).recordSuccess("ipv6"); + verify(mockChannel).shutdownNow(); } - - @Test - public void testNotEligibleProxiedRouting() { - // 1. Setup sideband data to simulate standard CloudPath routing - PeerInfo peerInfo = PeerInfo.newBuilder() - .setTransportType(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH) - .build(); - when(mockSidebandData.getPeerInfo()).thenReturn(peerInfo); - - try (MockedConstruction mocked = - mockConstruction( - MetadataExtractorInterceptor.class, - (mock, context) -> { - when(mock.getSidebandData()).thenReturn(mockSidebandData); - })) { - - boolean isEligible = checker.check(mockChannelFactory, mockTracer); - - assertThat(isEligible).isFalse(); - verifyNoInteractions(mockTracer); - verify(mockChannel).shutdownNow(); - } + } + + @Test + public void testNotEligibleProxiedRouting() { + // 1. Setup sideband data to simulate standard CloudPath routing + PeerInfo peerInfo = + PeerInfo.newBuilder() + .setTransportType(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH) + .build(); + when(mockSidebandData.getPeerInfo()).thenReturn(peerInfo); + + try (MockedConstruction mocked = + mockConstruction( + MetadataExtractorInterceptor.class, + (mock, context) -> { + when(mock.getSidebandData()).thenReturn(mockSidebandData); + })) { + + boolean isEligible = checker.check(mockChannelFactory, mockTracer); + + assertThat(isEligible).isFalse(); + verifyNoInteractions(mockTracer); + verify(mockChannel).shutdownNow(); } - - @Test - public void testMissingSidebandData() { - // Interceptor failed to capture anything (returns null) - try (MockedConstruction mocked = - mockConstruction( - MetadataExtractorInterceptor.class, - (mock, context) -> { - when(mock.getSidebandData()).thenReturn(null); - })) { - - boolean isEligible = checker.check(mockChannelFactory, mockTracer); - - assertThat(isEligible).isFalse(); - verifyNoInteractions(mockTracer); - verify(mockChannel).shutdownNow(); - } + } + + @Test + public void testMissingSidebandData() { + // Interceptor failed to capture anything (returns null) + try (MockedConstruction mocked = + mockConstruction( + MetadataExtractorInterceptor.class, + (mock, context) -> { + when(mock.getSidebandData()).thenReturn(null); + })) { + + boolean isEligible = checker.check(mockChannelFactory, mockTracer); + + assertThat(isEligible).isFalse(); + verifyNoInteractions(mockTracer); + verify(mockChannel).shutdownNow(); } + } - @Test - public void testExceptionSafetyAndCleanup() { - doThrow(new RuntimeException("Simulated primer failure")) - .when(mockChannelPrimer) - .primeChannel(any(Channel.class)); + @Test + public void testExceptionSafetyAndCleanup() { + doThrow(new RuntimeException("Simulated primer failure")) + .when(mockChannelPrimer) + .primeChannel(any(Channel.class)); - boolean isEligible = checker.check(mockChannelFactory, mockTracer); + boolean isEligible = checker.check(mockChannelFactory, mockTracer); - assertThat(isEligible).isFalse(); - verifyNoInteractions(mockTracer); - verify(mockChannel).shutdownNow(); - } + assertThat(isEligible).isFalse(); + verifyNoInteractions(mockTracer); + verify(mockChannel).shutdownNow(); + } }