perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name.
ret := make(map[string]dataAndErrTuple) // Return result, a map from resource name to either resource data or error.
for _, r := range resp.resources {
- result, err := rType.Decoder.Decode(AnyProto{
- TypeURL: r.GetTypeUrl(),
- Value: r.GetValue(),
- }, *opts)
+ result, err := rType.Decoder.Decode(NewAnyProto(r), *opts)
// Name field of the result is left unpopulated only when resource
// deserialization fails.
}
const (
- defaultTestWatchExpiryTimeout = 100 * time.Millisecond
- defaultTestTimeout = 5 * time.Second
- defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
+ defaultTestTimeout = 5 * time.Second
+ defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
// listenerResourceTypeName represents the transport agnostic name for the
// listener resource.
listenerResourceTypeName = "ListenerResource"
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
-func (listenerDecoder) Decode(resource AnyProto, _ DecodeOptions) (*DecodeResult, error) {
- rProto := &anypb.Any{
- TypeUrl: resource.TypeURL,
- Value: resource.Value,
- }
- name, listener, err := unmarshalListenerResource(rProto)
+func (listenerDecoder) Decode(resource *AnyProto, _ DecodeOptions) (*DecodeResult, error) {
+ name, listener, err := unmarshalListenerResource(resource.ToAny())
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
package xdsclient
+import "google.golang.org/protobuf/types/known/anypb"
+
// ResourceType wraps all resource-type specific functionality. Each supported
// resource type needs to provide an implementation of the Decoder.
type ResourceType struct {
//
// If unmarshalling or validation fails, it returns a non-nil error.
// Otherwise, returns a fully populated DecodeResult.
- Decode(resource AnyProto, options DecodeOptions) (*DecodeResult, error)
+ Decode(resource *AnyProto, options DecodeOptions) (*DecodeResult, error)
}
// AnyProto contains the type URL and serialized proto data of an xDS resource.
type AnyProto struct {
- TypeURL string
- Value []byte
+ typeURL string
+ value []byte
+}
+
+// NewAnyProto creates an AnyProto from an anypb.Any. Must be called with a
+// non-nil argument.
+func NewAnyProto(a *anypb.Any) *AnyProto {
+ return &AnyProto{
+ typeURL: a.TypeUrl,
+ value: a.Value,
+ }
+}
+
+// ToAny converts an AnyProto to an anypb.Any. Never returns nil.
+func (a *AnyProto) ToAny() *anypb.Any {
+ return &anypb.Any{
+ TypeUrl: a.typeURL,
+ Value: a.value,
+ }
}
// DecodeOptions wraps the options required by ResourceType implementations for
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
-func (listenerDecoder) Decode(resource xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
- rProto := &anypb.Any{
- TypeUrl: resource.TypeURL,
- Value: resource.Value,
- }
- name, listener, err := unmarshalListenerResource(rProto)
+func (listenerDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
+ name, listener, err := unmarshalListenerResource(resource.ToAny())
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
return lw
}
-func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
- handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
+func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
+ handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}
ldsResourceName string
listenerWatcher *listenerWatcher
listenerUpdateRecvd bool
- currentListener xdsresource.ListenerUpdate
+ currentListener *xdsresource.ListenerUpdate
rdsResourceName string
routeConfigWatcher *routeConfigWatcher
}
// Only executed in the context of a serializer callback.
-func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate) {
+func (r *xdsResolver) onListenerResourceUpdate(update *xdsresource.ListenerUpdate) {
if r.logger.V(2) {
r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update))
}
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
+ "google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)
// the listenerWrapper.
type XDSClient interface {
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
+ WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
BootstrapConfig() *bootstrap.Config
}
name string
}
-func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
+func (lw *ldsWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
defer onDone()
if lw.parent.closed.HasFired() {
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
return
}
if lw.logger.V(2) {
- lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
+ lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update)
}
l := lw.parent
- ilc := update.Resource.InboundListenerCfg
+ ilc := update.InboundListenerCfg
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
// check is done here instead of at the XDSClient layer because of the
return endpoint
}
-// ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType.
-var ResourceTypeMapForTesting map[string]any
-
// UnknownCSMLabels are TelemetryLabels emitted from CDS if CSM Telemetry Label
// data is not present in the CDS Resource.
var UnknownCSMLabels = map[string]string{
import (
"context"
- v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients/lrsclient"
+ "google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
+
+ v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)
// XDSClient is a full fledged gRPC client which queries a set of discovery APIs
// the watcher is canceled. Callers need to handle this case.
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
+ // WatchResourceV2 matches the API of the external xdsclient interface.
+ // Once all users of xdsclient have been moved to this watch API, we can
+ // remove the WatchResource API above, and rename this to WatchResource.
+ WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
+
ReportLoad(*bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context))
BootstrapConfig() *bootstrap.Config
Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, Locality: clients.Locality{Region: node.Locality.Region, Zone: node.Locality.Zone, SubZone: node.Locality.SubZone}, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
Authorities: map[string]xdsclient.Authority{},
ResourceTypes: map[string]xdsclient.ResourceType{
- version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)},
version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
Authorities: map[string]xdsclient.Authority{"auth1": {XDSServers: []xdsclient.ServerConfig{expTopLevelS}}, "auth2": {XDSServers: []xdsclient.ServerConfig{expAuth2S}}},
ResourceTypes: map[string]xdsclient.ResourceType{
- version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gSCfgMap)},
version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
Authorities: map[string]xdsclient.Authority{},
ResourceTypes: map[string]xdsclient.ResourceType{
- version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)},
version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
Node: clients.Node{ID: node.GetId(), Cluster: node.GetCluster(), Metadata: node.Metadata, UserAgentName: node.UserAgentName, UserAgentVersion: node.GetUserAgentVersion()},
Authorities: map[string]xdsclient.Authority{},
ResourceTypes: map[string]xdsclient.ResourceType{
- version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(c)},
+ version.V3ListenerURL: {TypeURL: version.V3ListenerURL, TypeName: xdsresource.ListenerResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewListenerResourceTypeDecoder(c)},
version.V3RouteConfigURL: {TypeURL: version.V3RouteConfigURL, TypeName: xdsresource.RouteConfigTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder()},
version.V3ClusterURL: {TypeURL: version.V3ClusterURL, TypeName: xdsresource.ClusterResourceTypeName, AllResourcesRequiredInSotW: true, Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(c, gServerCfgMap)},
version.V3EndpointsURL: {TypeURL: version.V3EndpointsURL, TypeName: xdsresource.EndpointsResourceTypeName, AllResourcesRequiredInSotW: false, Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder()},
package xdsclient
import (
+ "google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)
func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) {
return c.XDSClient.WatchResource(rType.TypeURL(), resourceName, xdsresource.GenericResourceWatcher(watcher))
}
+
+func (c *clientImpl) WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func()) {
+ return c.XDSClient.WatchResource(typeURL, resourceName, watcher)
+}
type noopListenerWatcher struct{}
-func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
onDone()
}
TypeURL: version.V3ListenerURL,
TypeName: xdsresource.ListenerResourceTypeName,
AllResourcesRequiredInSotW: true,
- Decoder: xdsresource.NewGenericListenerResourceTypeDecoder(config),
+ Decoder: xdsresource.NewListenerResourceTypeDecoder(config),
},
version.V3RouteConfigURL: {
TypeURL: version.V3RouteConfigURL,
// Verify the update received by the watcher.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: routeConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the update received by the watcher.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: routeConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the update received by the watcher.
wantListenerUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: routeConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: "rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
type noopListenerWatcher struct{}
-func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (noopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
onDone()
}
func (noopListenerWatcher) ResourceError(_ error, onDone func()) {
}
type listenerUpdateErrTuple struct {
- update xdsresource.ListenerUpdate
+ update *xdsresource.ListenerUpdate
err error
}
return &listenerWatcher{updateCh: testutils.NewChannel()}
}
-func (lw *listenerWatcher) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
- lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
+func (lw *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
+ lw.updateCh.Send(listenerUpdateErrTuple{update: update})
onDone()
}
return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(size)}
}
-func (lw *listenerWatcherMultiple) ResourceChanged(update *xdsresource.ListenerResourceData, onDone func()) {
- lw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
+func (lw *listenerWatcherMultiple) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
+ lw.updateCh.Send(listenerUpdateErrTuple{update: update})
onDone()
}
updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"),
notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsName),
wantUpdate: listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"),
notWatchedResource: e2e.DefaultClientListener("unsubscribed-lds-resource", rdsNameNewStyle),
wantUpdate: listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsNameNewStyle,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
watchedResource: e2e.DefaultClientListener(ldsName, rdsName),
updatedWatchedResource: e2e.DefaultClientListener(ldsName, "new-rds-resource"),
wantUpdateV1: listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
},
wantUpdateV2: listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: "new-rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
watchedResource: e2e.DefaultClientListener(ldsNameNewStyle, rdsNameNewStyle),
updatedWatchedResource: e2e.DefaultClientListener(ldsNameNewStyle, "new-rds-resource"),
wantUpdateV1: listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsNameNewStyle,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
},
wantUpdateV2: listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: "new-rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// resources returned differ only in the resource name. Therefore the
// expected update is the same for all the watchers.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the contents of the received update.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the contents of the received update.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// resources returned differ only in the resource name. Therefore the
// expected update is the same for both watchers.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
t.Fatal(err)
}
wantUpdate = listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: "new-rds-resource",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the contents of the received update for existing watch.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the contents of the received update.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify that the watcher watching the good resource receives a good
// update.
wantUpdate := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the contents of the received update for first watcher.
wantUpdate1 := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
// Verify the contents of the received update for the second watcher.
wantUpdate2 := listenerUpdateErrTuple{
- update: xdsresource.ListenerUpdate{
+ update: &xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
desc string
resourceName string
managementServerResponse *v3discoverypb.DiscoveryResponse
- wantUpdate xdsresource.ListenerUpdate
+ wantUpdate *xdsresource.ListenerUpdate
wantErr string
wantGenericXDSConfig []*v3statuspb.ClientConfig_GenericXdsConfig
}{
VersionInfo: "1",
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)},
},
- wantUpdate: xdsresource.ListenerUpdate{
+ wantUpdate: &xdsresource.ListenerUpdate{
RouteConfigName: "route-configuration-name",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
VersionInfo: "1",
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
},
- wantUpdate: xdsresource.ListenerUpdate{
+ wantUpdate: &xdsresource.ListenerUpdate{
RouteConfigName: "route-configuration-name",
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
package xdsresource
import (
+ "bytes"
"fmt"
- "google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/xds/bootstrap"
- xdsclient "google.golang.org/grpc/internal/xds/clients/xdsclient"
+ "google.golang.org/grpc/internal/xds/clients/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
- "google.golang.org/protobuf/proto"
- "google.golang.org/protobuf/types/known/anypb"
)
-const (
- // ListenerResourceTypeName represents the transport agnostic name for the
- // listener resource.
- ListenerResourceTypeName = "ListenerResource"
-)
+// ListenerResourceTypeName is a human friendly name for the listener resource.
+const ListenerResourceTypeName = "ListenerResource"
+
+// listenerResourceDecoder is an implementation of the xdsclient.Decoder
+// interface for listener resources.
+type listenerResourceDecoder struct {
+ bootstrapConfig *bootstrap.Config
+}
-var (
- // Compile time interface checks.
- _ Type = listenerResourceType{}
-
- // Singleton instantiation of the resource type implementation.
- listenerType = listenerResourceType{
- resourceTypeState: resourceTypeState{
- typeURL: version.V3ListenerURL,
- typeName: ListenerResourceTypeName,
- allResourcesRequiredInSotW: true,
- },
+func (d *listenerResourceDecoder) Decode(resource *xdsclient.AnyProto, _ xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
+ name, listener, err := unmarshalListenerResource(resource.ToAny())
+ if name == "" {
+ // Name is unset only when protobuf deserialization fails.
+ return nil, err
+ }
+ if err != nil {
+ // Protobuf deserialization succeeded, but resource validation failed.
+ return &xdsclient.DecodeResult{
+ Name: name,
+ Resource: &ListenerResourceData{Resource: ListenerUpdate{}},
+ }, err
+ }
+
+ // Perform extra validation here.
+ if err := listenerValidator(d.bootstrapConfig, listener); err != nil {
+ return &xdsclient.DecodeResult{
+ Name: name,
+ Resource: &ListenerResourceData{Resource: ListenerUpdate{}},
+ }, err
}
-)
-// listenerResourceType provides the resource-type specific functionality for a
-// Listener resource.
-//
-// Implements the Type interface.
-type listenerResourceType struct {
- resourceTypeState
+ return &xdsclient.DecodeResult{
+ Name: name,
+ Resource: &ListenerResourceData{Resource: listener},
+ }, nil
}
func securityConfigValidator(bc *bootstrap.Config, sc *SecurityConfig) error {
})
}
-// Decode deserializes and validates an xDS resource serialized inside the
-// provided `Any` proto, as received from the xDS management server.
-func (listenerResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
- name, listener, err := unmarshalListenerResource(resource)
- switch {
- case name == "":
- // Name is unset only when protobuf deserialization fails.
- return nil, err
- case err != nil:
- // Protobuf deserialization succeeded, but resource validation failed.
- return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err
- }
-
- // Perform extra validation here.
- if err := listenerValidator(opts.BootstrapConfig, listener); err != nil {
- return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err
- }
-
- return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: listener}}, nil
-}
-
-// ListenerResourceData wraps the configuration of a Listener resource as
-// received from the management server.
-//
-// Implements the ResourceData interface.
+// ListenerResourceData is an implementation of the xdsclient.ResourceData
+// interface for listener resources.
type ListenerResourceData struct {
- ResourceData
-
- // TODO: We have always stored update structs by value. See if this can be
- // switched to a pointer?
Resource ListenerUpdate
}
-// RawEqual returns true if other is equal to l.
-func (l *ListenerResourceData) RawEqual(other ResourceData) bool {
- if l == nil && other == nil {
- return true
- }
- if (l == nil) != (other == nil) {
+// Equal returns true if other is equal to l.
+func (l *ListenerResourceData) Equal(other xdsclient.ResourceData) bool {
+ if other == nil {
return false
}
- return proto.Equal(l.Resource.Raw, other.Raw())
-}
-
-// ToJSON returns a JSON string representation of the resource data.
-func (l *ListenerResourceData) ToJSON() string {
- return pretty.ToJSON(l.Resource)
+ return bytes.Equal(l.Bytes(), other.Bytes())
}
-// Raw returns the underlying raw protobuf form of the listener resource.
-func (l *ListenerResourceData) Raw() *anypb.Any {
- return l.Resource.Raw
+// Bytes returns the protobuf serialized bytes of the listener resource proto.
+func (l *ListenerResourceData) Bytes() []byte {
+ return l.Resource.Raw.GetValue()
}
// ListenerWatcher wraps the callbacks to be invoked for different
// contains an exhaustive list of what method is invoked under what conditions.
type ListenerWatcher interface {
// ResourceChanged indicates a new version of the resource is available.
- ResourceChanged(resource *ListenerResourceData, done func())
+ ResourceChanged(resource *ListenerUpdate, done func())
// ResourceError indicates an error occurred while trying to fetch or
// decode the associated resource. The previous version of the resource
watcher ListenerWatcher
}
-func (d *delegatingListenerWatcher) ResourceChanged(data ResourceData, onDone func()) {
+func (d *delegatingListenerWatcher) ResourceChanged(data xdsclient.ResourceData, onDone func()) {
l := data.(*ListenerResourceData)
- d.watcher.ResourceChanged(l, onDone)
+ d.watcher.ResourceChanged(&l.Resource, onDone)
}
func (d *delegatingListenerWatcher) ResourceError(err error, onDone func()) {
d.watcher.ResourceError(err, onDone)
// WatchListener uses xDS to discover the configuration associated with the
// provided listener resource name.
-func WatchListener(p Producer, name string, w ListenerWatcher) (cancel func()) {
- delegator := &delegatingListenerWatcher{watcher: w}
- return p.WatchResource(listenerType, name, delegator)
+func WatchListener(p ProducerV2, name string, w ListenerWatcher) (cancel func()) {
+ return p.WatchResourceV2(version.V3ListenerURL, name, &delegatingListenerWatcher{watcher: w})
}
-// NewGenericListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps
+// NewListenerResourceTypeDecoder returns a xdsclient.Decoder that wraps
// the xdsresource.listenerType.
-func NewGenericListenerResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder {
- return &GenericResourceTypeDecoder{ResourceType: listenerType, BootstrapConfig: bc}
+func NewListenerResourceTypeDecoder(bc *bootstrap.Config) xdsclient.Decoder {
+ return &listenerResourceDecoder{bootstrapConfig: bc}
}
import (
"fmt"
- xdsinternal "google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients/xdsclient"
- "google.golang.org/grpc/internal/xds/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/anypb"
)
-func init() {
- xdsinternal.ResourceTypeMapForTesting = make(map[string]any)
- xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL] = listenerType
- xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = routeConfigType
- xdsinternal.ResourceTypeMapForTesting[version.V3ClusterURL] = clusterType
- xdsinternal.ResourceTypeMapForTesting[version.V3EndpointsURL] = endpointsType
-}
-
// Producer contains a single method to discover resource configuration from a
// remote management server using xDS APIs.
//
WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func())
}
+// ProducerV2 is like Producer, but uses the external xdsclient API.
+//
+// Once all resource type implementations have been migrated to use the external
+// xdsclient API, this interface will be renamed to Producer and the existing
+// Producer interface will be deleted.
+type ProducerV2 interface {
+ WatchResourceV2(typeURL, resourceName string, watcher xdsclient.ResourceWatcher) (cancel func())
+}
+
// ResourceWatcher is notified of the resource updates and errors that are
// received by the xDS client from the management server.
//
// Decode deserialize and validate resource bytes of an xDS resource received
// from the xDS management server.
-func (gd *GenericResourceTypeDecoder) Decode(resource xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
- rProto := &anypb.Any{
- TypeUrl: resource.TypeURL,
- Value: resource.Value,
- }
+func (gd *GenericResourceTypeDecoder) Decode(resource *xdsclient.AnyProto, gOpts xdsclient.DecodeOptions) (*xdsclient.DecodeResult, error) {
opts := &DecodeOptions{BootstrapConfig: gd.BootstrapConfig}
if gOpts.ServerConfig != nil {
opts.ServerConfig = gd.ServerConfigMap[*gOpts.ServerConfig]
}
- result, err := gd.ResourceType.Decode(opts, rProto)
+ result, err := gd.ResourceType.Decode(opts, resource.ToAny())
if result == nil {
return nil, err
}
type nopListenerWatcher struct{}
-func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (nopListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
onDone()
}
func (nopListenerWatcher) ResourceError(_ error, onDone func()) {
}
}
-func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerResourceData, onDone func()) {
+func (w *blockingListenerWatcher) ResourceChanged(_ *xdsresource.ListenerUpdate, onDone func()) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) ResourceError(_ error, onDone func()) {