Unverified Commit ed40405f authored by Kubernetes Prow Robot's avatar Kubernetes Prow Robot Committed by GitHub
Browse files

Merge pull request #1023 from yverbin/602-nodeport-trafficpolicy

602 nodeport trafficpolicy
parents 0311cec7 0d4d475a
......@@ -30,7 +30,6 @@ import (
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
......@@ -163,12 +162,6 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
services = sc.filterByServiceType(services)
}
// get the ip addresses of all the nodes and cache them for this run
nodeTargets, err := sc.extractNodeTargets()
if err != nil {
return nil, err
}
endpoints := []*endpoint.Endpoint{}
for _, svc := range services {
......@@ -180,7 +173,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
continue
}
svcEndpoints := sc.endpoints(svc, nodeTargets)
svcEndpoints := sc.endpoints(svc)
// process legacy annotations if no endpoints were returned and compatibility mode is enabled.
if len(svcEndpoints) == 0 && sc.compatibility != "" {
......@@ -189,7 +182,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
// apply template if none of the above is found
if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil {
sEndpoints, err := sc.endpointsFromTemplate(svc, nodeTargets)
sEndpoints, err := sc.endpointsFromTemplate(svc)
if err != nil {
return nil, err
}
......@@ -282,7 +275,7 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
return endpoints
}
func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service, nodeTargets endpoint.Targets) ([]*endpoint.Endpoint, error) {
func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint
// Process the whole template string
......@@ -295,21 +288,21 @@ func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service, nodeTargets endp
providerSpecific := getProviderSpecificAnnotations(svc.Annotations)
hostnameList := strings.Split(strings.Replace(buf.String(), " ", "", -1), ",")
for _, hostname := range hostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets, providerSpecific)...)
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific)...)
}
return endpoints, nil
}
// endpointsFromService extracts the endpoints from a service object
func (sc *serviceSource) endpoints(svc *v1.Service, nodeTargets endpoint.Targets) []*endpoint.Endpoint {
func (sc *serviceSource) endpoints(svc *v1.Service) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint
// Skip endpoints if we do not want entries from annotations
if !sc.ignoreHostnameAnnotation {
providerSpecific := getProviderSpecificAnnotations(svc.Annotations)
hostnameList := getHostnamesFromAnnotations(svc.Annotations)
for _, hostname := range hostnameList {
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, nodeTargets, providerSpecific)...)
endpoints = append(endpoints, sc.generateEndpoints(svc, hostname, providerSpecific)...)
}
}
return endpoints
......@@ -365,7 +358,7 @@ func (sc *serviceSource) setResourceLabel(service *v1.Service, endpoints []*endp
}
}
func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, nodeTargets endpoint.Targets, providerSpecific endpoint.ProviderSpecific) []*endpoint.Endpoint {
func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, providerSpecific endpoint.ProviderSpecific) []*endpoint.Endpoint {
hostname = strings.TrimSuffix(hostname, ".")
ttl, err := getTTLFromAnnotations(svc.Annotations)
if err != nil {
......@@ -405,8 +398,12 @@ func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string, nod
}
case v1.ServiceTypeNodePort:
// add the nodeTargets and extract an SRV endpoint
targets = append(targets, nodeTargets...)
endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, nodeTargets, hostname, ttl)...)
targets, err = sc.extractNodePortTargets(svc)
if err != nil {
log.Errorf("Unable to extract targets from service %s/%s error: %v", svc.Namespace, svc.Name, err)
return endpoints
}
endpoints = append(endpoints, sc.extractNodePortEndpoints(svc, targets, hostname, ttl)...)
}
for _, t := range targets {
......@@ -451,20 +448,44 @@ func extractLoadBalancerTargets(svc *v1.Service) endpoint.Targets {
return targets
}
func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) {
func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) {
var (
internalIPs endpoint.Targets
externalIPs endpoint.Targets
nodes []*v1.Node
err error
)
nodes, err := sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
if errors.IsForbidden(err) {
// Return an empty list because it makes sense to continue and try other sources.
log.Debugf("Unable to list nodes (Forbidden), returning empty list of targets (NodePort services will be skipped)")
return endpoint.Targets{}, nil
switch svc.Spec.ExternalTrafficPolicy {
case v1.ServiceExternalTrafficPolicyTypeLocal:
labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, err
}
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
return nil, err
}
for _, v := range pods {
if v.Status.Phase == v1.PodRunning {
node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
if err != nil {
log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
continue
}
nodes = append(nodes, node)
}
}
default:
nodes, err = sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
return nil, err
}
return nil, err
}
for _, node := range nodes {
......
......@@ -1282,6 +1282,7 @@ func TestNodePortServices(t *testing.T) {
svcNamespace string
svcName string
svcType v1.ServiceType
svcTrafficPolicy v1.ServiceExternalTrafficPolicyType
compatibility string
fqdnTemplate string
ignoreHostnameAnnotation bool
......@@ -1291,6 +1292,9 @@ func TestNodePortServices(t *testing.T) {
expected []*endpoint.Endpoint
expectError bool
nodes []*v1.Node
podnames []string
nodeIndex []int
phases []v1.PodPhase
}{
{
"annotated NodePort services return an endpoint with IP addresses of the cluster's nodes",
......@@ -1299,6 +1303,7 @@ func TestNodePortServices(t *testing.T) {
"testing",
"foo",
v1.ServiceTypeNodePort,
v1.ServiceExternalTrafficPolicyTypeCluster,
"",
"",
false,
......@@ -1333,6 +1338,9 @@ func TestNodePortServices(t *testing.T) {
},
},
}},
[]string{},
[]int{},
[]v1.PodPhase{},
},
{
"hostname annotated NodePort services are ignored",
......@@ -1341,6 +1349,7 @@ func TestNodePortServices(t *testing.T) {
"testing",
"foo",
v1.ServiceTypeNodePort,
v1.ServiceExternalTrafficPolicyTypeCluster,
"",
"",
true,
......@@ -1372,6 +1381,9 @@ func TestNodePortServices(t *testing.T) {
},
},
}},
[]string{},
[]int{},
[]v1.PodPhase{},
},
{
"non-annotated NodePort services with set fqdnTemplate return an endpoint with target IP",
......@@ -1380,6 +1392,7 @@ func TestNodePortServices(t *testing.T) {
"testing",
"foo",
v1.ServiceTypeNodePort,
v1.ServiceExternalTrafficPolicyTypeCluster,
"",
"{{.Name}}.bar.example.com",
false,
......@@ -1412,6 +1425,9 @@ func TestNodePortServices(t *testing.T) {
},
},
}},
[]string{},
[]int{},
[]v1.PodPhase{},
},
{
"annotated NodePort services return an endpoint with IP addresses of the private cluster's nodes",
......@@ -1420,6 +1436,7 @@ func TestNodePortServices(t *testing.T) {
"testing",
"foo",
v1.ServiceTypeNodePort,
v1.ServiceExternalTrafficPolicyTypeCluster,
"",
"",
false,
......@@ -1452,6 +1469,55 @@ func TestNodePortServices(t *testing.T) {
},
},
}},
[]string{},
[]int{},
[]v1.PodPhase{},
},
{
"annotated NodePort services with ExternalTrafficPolicy=Local return an endpoint with IP addresses of the cluster's nodes where pods is running only",
"",
"",
"testing",
"foo",
v1.ServiceTypeNodePort,
v1.ServiceExternalTrafficPolicyTypeLocal,
"",
"",
false,
map[string]string{},
map[string]string{
hostnameAnnotationKey: "foo.example.org.",
},
nil,
[]*endpoint.Endpoint{
{DNSName: "_30192._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV},
{DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.2"}, RecordType: endpoint.RecordTypeA},
},
false,
[]*v1.Node{{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.1"},
{Type: v1.NodeInternalIP, Address: "10.0.1.1"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.2"},
{Type: v1.NodeInternalIP, Address: "10.0.1.2"},
},
},
}},
[]string{"master-0"},
[]int{1},
[]v1.PodPhase{v1.PodRunning},
},
} {
t.Run(tc.title, func(t *testing.T) {
......@@ -1465,10 +1531,34 @@ func TestNodePortServices(t *testing.T) {
}
}
// Create pods
for i, podname := range tc.podnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{},
Hostname: podname,
NodeName: tc.nodes[tc.nodeIndex[i]].Name,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: podname,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.PodStatus{
Phase: tc.phases[i],
},
}
_, err := kubernetes.CoreV1().Pods(tc.svcNamespace).Create(pod)
require.NoError(t, err)
}
// Create a service to test against
service := &v1.Service{
Spec: v1.ServiceSpec{
Type: tc.svcType,
Type: tc.svcType,
ExternalTrafficPolicy: tc.svcTrafficPolicy,
Ports: []v1.ServicePort{
{
NodePort: 30192,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment