linux.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // +build linux
  2. /*
  3. Copyright 2015 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package bandwidth
  15. import (
  16. "bufio"
  17. "bytes"
  18. "encoding/hex"
  19. "fmt"
  20. "net"
  21. "strings"
  22. "k8s.io/apimachinery/pkg/api/resource"
  23. "k8s.io/apimachinery/pkg/util/sets"
  24. "k8s.io/utils/exec"
  25. "k8s.io/klog"
  26. )
  27. // tcShaper provides an implementation of the Shaper interface on Linux using the 'tc' tool.
  28. // In general, using this requires that the caller posses the NET_CAP_ADMIN capability, though if you
  29. // do this within an container, it only requires the NS_CAPABLE capability for manipulations to that
  30. // container's network namespace.
  31. // Uses the hierarchical token bucket queuing discipline (htb), this requires Linux 2.4.20 or newer
  32. // or a custom kernel with that queuing discipline backported.
  33. type tcShaper struct {
  34. e exec.Interface
  35. iface string
  36. }
  37. // NewTCShaper makes a new tcShaper for the given interface
  38. func NewTCShaper(iface string) Shaper {
  39. shaper := &tcShaper{
  40. e: exec.New(),
  41. iface: iface,
  42. }
  43. return shaper
  44. }
  45. func (t *tcShaper) execAndLog(cmdStr string, args ...string) error {
  46. klog.V(6).Infof("Running: %s %s", cmdStr, strings.Join(args, " "))
  47. cmd := t.e.Command(cmdStr, args...)
  48. out, err := cmd.CombinedOutput()
  49. klog.V(6).Infof("Output from tc: %s", string(out))
  50. return err
  51. }
  52. func (t *tcShaper) nextClassID() (int, error) {
  53. data, err := t.e.Command("tc", "class", "show", "dev", t.iface).CombinedOutput()
  54. if err != nil {
  55. return -1, err
  56. }
  57. scanner := bufio.NewScanner(bytes.NewBuffer(data))
  58. classes := sets.String{}
  59. for scanner.Scan() {
  60. line := strings.TrimSpace(scanner.Text())
  61. // skip empty lines
  62. if len(line) == 0 {
  63. continue
  64. }
  65. parts := strings.Split(line, " ")
  66. // expected tc line:
  67. // class htb 1:1 root prio 0 rate 1000Kbit ceil 1000Kbit burst 1600b cburst 1600b
  68. if len(parts) != 14 {
  69. return -1, fmt.Errorf("unexpected output from tc: %s (%v)", scanner.Text(), parts)
  70. }
  71. classes.Insert(parts[2])
  72. }
  73. // Make sure it doesn't go forever
  74. for nextClass := 1; nextClass < 10000; nextClass++ {
  75. if !classes.Has(fmt.Sprintf("1:%d", nextClass)) {
  76. return nextClass, nil
  77. }
  78. }
  79. // This should really never happen
  80. return -1, fmt.Errorf("exhausted class space, please try again")
  81. }
  82. // Convert a CIDR from text to a hex representation
  83. // Strips any masked parts of the IP, so 1.2.3.4/16 becomes hex(1.2.0.0)/ffffffff
  84. func hexCIDR(cidr string) (string, error) {
  85. ip, ipnet, err := net.ParseCIDR(cidr)
  86. if err != nil {
  87. return "", err
  88. }
  89. ip = ip.Mask(ipnet.Mask)
  90. hexIP := hex.EncodeToString([]byte(ip))
  91. hexMask := ipnet.Mask.String()
  92. return hexIP + "/" + hexMask, nil
  93. }
  94. // Convert a CIDR from hex representation to text, opposite of the above.
  95. func asciiCIDR(cidr string) (string, error) {
  96. parts := strings.Split(cidr, "/")
  97. if len(parts) != 2 {
  98. return "", fmt.Errorf("unexpected CIDR format: %s", cidr)
  99. }
  100. ipData, err := hex.DecodeString(parts[0])
  101. if err != nil {
  102. return "", err
  103. }
  104. ip := net.IP(ipData)
  105. maskData, err := hex.DecodeString(parts[1])
  106. if err != nil {
  107. return "", err
  108. }
  109. mask := net.IPMask(maskData)
  110. size, _ := mask.Size()
  111. return fmt.Sprintf("%s/%d", ip.String(), size), nil
  112. }
  113. func (t *tcShaper) findCIDRClass(cidr string) (classAndHandleList [][]string, found bool, err error) {
  114. data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
  115. if err != nil {
  116. return classAndHandleList, false, err
  117. }
  118. hex, err := hexCIDR(cidr)
  119. if err != nil {
  120. return classAndHandleList, false, err
  121. }
  122. spec := fmt.Sprintf("match %s", hex)
  123. scanner := bufio.NewScanner(bytes.NewBuffer(data))
  124. filter := ""
  125. for scanner.Scan() {
  126. line := strings.TrimSpace(scanner.Text())
  127. if len(line) == 0 {
  128. continue
  129. }
  130. if strings.HasPrefix(line, "filter") {
  131. filter = line
  132. continue
  133. }
  134. if strings.Contains(line, spec) {
  135. parts := strings.Split(filter, " ")
  136. // expected tc line:
  137. // filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1
  138. if len(parts) != 19 {
  139. return classAndHandleList, false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts)
  140. }
  141. resultTmp := []string{parts[18], parts[9]}
  142. classAndHandleList = append(classAndHandleList, resultTmp)
  143. }
  144. }
  145. if len(classAndHandleList) > 0 {
  146. return classAndHandleList, true, nil
  147. }
  148. return classAndHandleList, false, nil
  149. }
  150. func makeKBitString(rsrc *resource.Quantity) string {
  151. return fmt.Sprintf("%dkbit", (rsrc.Value() / 1000))
  152. }
  153. func (t *tcShaper) makeNewClass(rate string) (int, error) {
  154. class, err := t.nextClassID()
  155. if err != nil {
  156. return -1, err
  157. }
  158. if err := t.execAndLog("tc", "class", "add",
  159. "dev", t.iface,
  160. "parent", "1:",
  161. "classid", fmt.Sprintf("1:%d", class),
  162. "htb", "rate", rate); err != nil {
  163. return -1, err
  164. }
  165. return class, nil
  166. }
  167. func (t *tcShaper) Limit(cidr string, upload, download *resource.Quantity) (err error) {
  168. var downloadClass, uploadClass int
  169. if download != nil {
  170. if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil {
  171. return err
  172. }
  173. if err := t.execAndLog("tc", "filter", "add",
  174. "dev", t.iface,
  175. "protocol", "ip",
  176. "parent", "1:0",
  177. "prio", "1", "u32",
  178. "match", "ip", "dst", cidr,
  179. "flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil {
  180. return err
  181. }
  182. }
  183. if upload != nil {
  184. if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil {
  185. return err
  186. }
  187. if err := t.execAndLog("tc", "filter", "add",
  188. "dev", t.iface,
  189. "protocol", "ip",
  190. "parent", "1:0",
  191. "prio", "1", "u32",
  192. "match", "ip", "src", cidr,
  193. "flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil {
  194. return err
  195. }
  196. }
  197. return nil
  198. }
  199. // tests to see if an interface exists, if it does, return true and the status line for the interface
  200. // returns false, "", <err> if an error occurs.
  201. func (t *tcShaper) interfaceExists() (bool, string, error) {
  202. data, err := t.e.Command("tc", "qdisc", "show", "dev", t.iface).CombinedOutput()
  203. if err != nil {
  204. return false, "", err
  205. }
  206. value := strings.TrimSpace(string(data))
  207. if len(value) == 0 {
  208. return false, "", nil
  209. }
  210. // Newer versions of tc and/or the kernel return the following instead of nothing:
  211. // qdisc noqueue 0: root refcnt 2
  212. fields := strings.Fields(value)
  213. if len(fields) > 1 && fields[1] == "noqueue" {
  214. return false, "", nil
  215. }
  216. return true, value, nil
  217. }
  218. func (t *tcShaper) ReconcileCIDR(cidr string, upload, download *resource.Quantity) error {
  219. _, found, err := t.findCIDRClass(cidr)
  220. if err != nil {
  221. return err
  222. }
  223. if !found {
  224. return t.Limit(cidr, upload, download)
  225. }
  226. // TODO: actually check bandwidth limits here
  227. return nil
  228. }
  229. func (t *tcShaper) ReconcileInterface() error {
  230. exists, output, err := t.interfaceExists()
  231. if err != nil {
  232. return err
  233. }
  234. if !exists {
  235. klog.V(4).Info("Didn't find bandwidth interface, creating")
  236. return t.initializeInterface()
  237. }
  238. fields := strings.Split(output, " ")
  239. if len(fields) < 12 || fields[1] != "htb" || fields[2] != "1:" {
  240. if err := t.deleteInterface(fields[2]); err != nil {
  241. return err
  242. }
  243. return t.initializeInterface()
  244. }
  245. return nil
  246. }
  247. func (t *tcShaper) initializeInterface() error {
  248. return t.execAndLog("tc", "qdisc", "add", "dev", t.iface, "root", "handle", "1:", "htb", "default", "30")
  249. }
  250. func (t *tcShaper) Reset(cidr string) error {
  251. classAndHandle, found, err := t.findCIDRClass(cidr)
  252. if err != nil {
  253. return err
  254. }
  255. if !found {
  256. return fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface)
  257. }
  258. for i := 0; i < len(classAndHandle); i++ {
  259. if err := t.execAndLog("tc", "filter", "del",
  260. "dev", t.iface,
  261. "parent", "1:",
  262. "proto", "ip",
  263. "prio", "1",
  264. "handle", classAndHandle[i][1], "u32"); err != nil {
  265. return err
  266. }
  267. if err := t.execAndLog("tc", "class", "del",
  268. "dev", t.iface,
  269. "parent", "1:",
  270. "classid", classAndHandle[i][0]); err != nil {
  271. return err
  272. }
  273. }
  274. return nil
  275. }
  276. func (t *tcShaper) deleteInterface(class string) error {
  277. return t.execAndLog("tc", "qdisc", "delete", "dev", t.iface, "root", "handle", class)
  278. }
  279. func (t *tcShaper) GetCIDRs() ([]string, error) {
  280. data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
  281. if err != nil {
  282. return nil, err
  283. }
  284. result := []string{}
  285. scanner := bufio.NewScanner(bytes.NewBuffer(data))
  286. for scanner.Scan() {
  287. line := strings.TrimSpace(scanner.Text())
  288. if len(line) == 0 {
  289. continue
  290. }
  291. if strings.Contains(line, "match") {
  292. parts := strings.Split(line, " ")
  293. // expected tc line:
  294. // match <cidr> at <number>
  295. if len(parts) != 4 {
  296. return nil, fmt.Errorf("unexpected output: %v", parts)
  297. }
  298. cidr, err := asciiCIDR(parts[1])
  299. if err != nil {
  300. return nil, err
  301. }
  302. result = append(result, cidr)
  303. }
  304. }
  305. return result, nil
  306. }