utils.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. /*
  2. Copyright 2019 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpointslice
  14. import (
  15. "fmt"
  16. "reflect"
  17. "time"
  18. corev1 "k8s.io/api/core/v1"
  19. discovery "k8s.io/api/discovery/v1beta1"
  20. apiequality "k8s.io/apimachinery/pkg/api/equality"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime/schema"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/client-go/tools/cache"
  25. "k8s.io/klog"
  26. podutil "k8s.io/kubernetes/pkg/api/v1/pod"
  27. api "k8s.io/kubernetes/pkg/apis/core"
  28. "k8s.io/kubernetes/pkg/apis/discovery/validation"
  29. endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
  30. utilnet "k8s.io/utils/net"
  31. )
  32. // podEndpointChanged returns true if the results of podToEndpoint are different
  33. // for the pods passed to this function.
  34. func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
  35. endpoint1 := podToEndpoint(pod1, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})
  36. endpoint2 := podToEndpoint(pod2, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}})
  37. endpoint1.TargetRef.ResourceVersion = ""
  38. endpoint2.TargetRef.ResourceVersion = ""
  39. return !reflect.DeepEqual(endpoint1, endpoint2)
  40. }
  41. // podToEndpoint returns an Endpoint object generated from a Pod and Node.
  42. func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service) discovery.Endpoint {
  43. // Build out topology information. This is currently limited to hostname,
  44. // zone, and region, but this will be expanded in the future.
  45. topology := map[string]string{}
  46. if pod.Spec.NodeName != "" {
  47. topology["kubernetes.io/hostname"] = pod.Spec.NodeName
  48. }
  49. if node != nil {
  50. topologyLabels := []string{
  51. "topology.kubernetes.io/zone",
  52. "topology.kubernetes.io/region",
  53. }
  54. for _, topologyLabel := range topologyLabels {
  55. if node.Labels[topologyLabel] != "" {
  56. topology[topologyLabel] = node.Labels[topologyLabel]
  57. }
  58. }
  59. }
  60. ready := service.Spec.PublishNotReadyAddresses || podutil.IsPodReady(pod)
  61. ep := discovery.Endpoint{
  62. Addresses: getEndpointAddresses(pod.Status, service),
  63. Conditions: discovery.EndpointConditions{
  64. Ready: &ready,
  65. },
  66. Topology: topology,
  67. TargetRef: &corev1.ObjectReference{
  68. Kind: "Pod",
  69. Namespace: pod.ObjectMeta.Namespace,
  70. Name: pod.ObjectMeta.Name,
  71. UID: pod.ObjectMeta.UID,
  72. ResourceVersion: pod.ObjectMeta.ResourceVersion,
  73. },
  74. }
  75. if endpointutil.ShouldSetHostname(pod, service) {
  76. ep.Hostname = &pod.Spec.Hostname
  77. }
  78. return ep
  79. }
  80. // getEndpointPorts returns a list of EndpointPorts generated from a Service
  81. // and Pod.
  82. func getEndpointPorts(service *corev1.Service, pod *corev1.Pod) []discovery.EndpointPort {
  83. endpointPorts := []discovery.EndpointPort{}
  84. // Allow headless service not to have ports.
  85. if len(service.Spec.Ports) == 0 && service.Spec.ClusterIP == api.ClusterIPNone {
  86. return endpointPorts
  87. }
  88. for i := range service.Spec.Ports {
  89. servicePort := &service.Spec.Ports[i]
  90. portName := servicePort.Name
  91. portProto := servicePort.Protocol
  92. portNum, err := podutil.FindPort(pod, servicePort)
  93. if err != nil {
  94. klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
  95. continue
  96. }
  97. i32PortNum := int32(portNum)
  98. endpointPorts = append(endpointPorts, discovery.EndpointPort{
  99. Name: &portName,
  100. Port: &i32PortNum,
  101. Protocol: &portProto,
  102. })
  103. }
  104. return endpointPorts
  105. }
  106. // getEndpointAddresses returns a list of addresses generated from a pod status.
  107. func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service) []string {
  108. addresses := []string{}
  109. for _, podIP := range podStatus.PodIPs {
  110. isIPv6PodIP := utilnet.IsIPv6String(podIP.IP)
  111. if isIPv6PodIP == isIPv6Service(service) {
  112. addresses = append(addresses, podIP.IP)
  113. }
  114. }
  115. return addresses
  116. }
  117. // isIPv6Service returns true if the Service uses IPv6 addresses.
  118. func isIPv6Service(service *corev1.Service) bool {
  119. // IPFamily is not guaranteed to be set, even in an IPv6 only cluster.
  120. return (service.Spec.IPFamily != nil && *service.Spec.IPFamily == corev1.IPv6Protocol) || utilnet.IsIPv6String(service.Spec.ClusterIP)
  121. }
  122. // endpointsEqualBeyondHash returns true if endpoints have equal attributes
  123. // but excludes equality checks that would have already been covered with
  124. // endpoint hashing (see hashEndpoint func for more info).
  125. func endpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
  126. if !apiequality.Semantic.DeepEqual(ep1.Topology, ep2.Topology) {
  127. return false
  128. }
  129. if boolPtrChanged(ep1.Conditions.Ready, ep2.Conditions.Ready) {
  130. return false
  131. }
  132. if objectRefPtrChanged(ep1.TargetRef, ep2.TargetRef) {
  133. return false
  134. }
  135. return true
  136. }
  137. // newEndpointSlice returns an EndpointSlice generated from a service and
  138. // endpointMeta.
  139. func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *discovery.EndpointSlice {
  140. gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
  141. ownerRef := metav1.NewControllerRef(service, gvk)
  142. return &discovery.EndpointSlice{
  143. ObjectMeta: metav1.ObjectMeta{
  144. Labels: map[string]string{
  145. discovery.LabelServiceName: service.Name,
  146. discovery.LabelManagedBy: controllerName,
  147. },
  148. GenerateName: getEndpointSlicePrefix(service.Name),
  149. OwnerReferences: []metav1.OwnerReference{*ownerRef},
  150. Namespace: service.Namespace,
  151. },
  152. Ports: endpointMeta.Ports,
  153. AddressType: endpointMeta.AddressType,
  154. Endpoints: []discovery.Endpoint{},
  155. }
  156. }
  157. // getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name.
  158. func getEndpointSlicePrefix(serviceName string) string {
  159. // use the dash (if the name isn't too long) to make the pod name a bit prettier
  160. prefix := fmt.Sprintf("%s-", serviceName)
  161. if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 {
  162. prefix = serviceName
  163. }
  164. return prefix
  165. }
  166. // boolPtrChanged returns true if a set of bool pointers have different values.
  167. func boolPtrChanged(ptr1, ptr2 *bool) bool {
  168. if (ptr1 == nil) != (ptr2 == nil) {
  169. return true
  170. }
  171. if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
  172. return true
  173. }
  174. return false
  175. }
  176. // objectRefPtrChanged returns true if a set of object ref pointers have
  177. // different values.
  178. func objectRefPtrChanged(ref1, ref2 *corev1.ObjectReference) bool {
  179. if (ref1 == nil) != (ref2 == nil) {
  180. return true
  181. }
  182. if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) {
  183. return true
  184. }
  185. return false
  186. }
  187. // getSliceToFill will return the EndpointSlice that will be closest to full
  188. // when numEndpoints are added. If no EndpointSlice can be found, a nil pointer
  189. // will be returned.
  190. func getSliceToFill(endpointSlices []*discovery.EndpointSlice, numEndpoints, maxEndpoints int) (slice *discovery.EndpointSlice) {
  191. closestDiff := maxEndpoints
  192. var closestSlice *discovery.EndpointSlice
  193. for _, endpointSlice := range endpointSlices {
  194. currentDiff := maxEndpoints - (numEndpoints + len(endpointSlice.Endpoints))
  195. if currentDiff >= 0 && currentDiff < closestDiff {
  196. closestDiff = currentDiff
  197. closestSlice = endpointSlice
  198. if closestDiff == 0 {
  199. return closestSlice
  200. }
  201. }
  202. }
  203. return closestSlice
  204. }
  205. // getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
  206. func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
  207. if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
  208. // Enqueue all the services that the pod used to be a member of.
  209. // This is the same thing we do when we add a pod.
  210. return endpointSlice
  211. }
  212. // If we reached here it means the pod was deleted but its final state is unrecorded.
  213. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  214. if !ok {
  215. utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
  216. return nil
  217. }
  218. endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
  219. if !ok {
  220. utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
  221. return nil
  222. }
  223. return endpointSlice
  224. }
  225. // addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice
  226. func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) {
  227. if endpointSlice.Annotations == nil {
  228. endpointSlice.Annotations = make(map[string]string)
  229. }
  230. if !triggerTime.IsZero() {
  231. endpointSlice.Annotations[corev1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
  232. } else { // No new trigger time, clear the annotation.
  233. delete(endpointSlice.Annotations, corev1.EndpointsLastChangeTriggerTime)
  234. }
  235. }
  236. // serviceControllerKey returns a controller key for a Service but derived from
  237. // an EndpointSlice.
  238. func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) {
  239. if endpointSlice == nil {
  240. return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()")
  241. }
  242. serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
  243. if !ok || serviceName == "" {
  244. return "", fmt.Errorf("EndpointSlice missing %s label", discovery.LabelServiceName)
  245. }
  246. return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil
  247. }
  248. // endpointSliceEndpointLen helps sort endpoint slices by the number of
  249. // endpoints they contain.
  250. type endpointSliceEndpointLen []*discovery.EndpointSlice
  251. func (sl endpointSliceEndpointLen) Len() int { return len(sl) }
  252. func (sl endpointSliceEndpointLen) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
  253. func (sl endpointSliceEndpointLen) Less(i, j int) bool {
  254. return len(sl[i].Endpoints) > len(sl[j].Endpoints)
  255. }