kube_docker_client.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package libdocker
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/base64"
  18. "encoding/json"
  19. "fmt"
  20. "io"
  21. "io/ioutil"
  22. "regexp"
  23. "sync"
  24. "time"
  25. "k8s.io/klog"
  26. dockertypes "github.com/docker/docker/api/types"
  27. dockercontainer "github.com/docker/docker/api/types/container"
  28. dockerimagetypes "github.com/docker/docker/api/types/image"
  29. dockerapi "github.com/docker/docker/client"
  30. dockermessage "github.com/docker/docker/pkg/jsonmessage"
  31. dockerstdcopy "github.com/docker/docker/pkg/stdcopy"
  32. )
  33. // kubeDockerClient is a wrapped layer of docker client for kubelet internal use. This layer is added to:
  34. // 1) Redirect stream for exec and attach operations.
  35. // 2) Wrap the context in this layer to make the Interface cleaner.
  36. type kubeDockerClient struct {
  37. // timeout is the timeout of short running docker operations.
  38. timeout time.Duration
  39. // If no pulling progress is made before imagePullProgressDeadline, the image pulling will be cancelled.
  40. // Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
  41. // between progress updates.
  42. imagePullProgressDeadline time.Duration
  43. client *dockerapi.Client
  44. }
  45. // Make sure that kubeDockerClient implemented the Interface.
  46. var _ Interface = &kubeDockerClient{}
  47. // There are 2 kinds of docker operations categorized by running time:
  48. // * Long running operation: The long running operation could run for arbitrary long time, and the running time
  49. // usually depends on some uncontrollable factors. These operations include: PullImage, Logs, StartExec, AttachToContainer.
  50. // * Non-long running operation: Given the maximum load of the system, the non-long running operation should finish
  51. // in expected and usually short time. These include all other operations.
  52. // kubeDockerClient only applies timeout on non-long running operations.
  53. const (
  54. // defaultTimeout is the default timeout of short running docker operations.
  55. // Value is slightly offset from 2 minutes to make timeouts due to this
  56. // constant recognizable.
  57. defaultTimeout = 2*time.Minute - 1*time.Second
  58. // defaultShmSize is the default ShmSize to use (in bytes) if not specified.
  59. defaultShmSize = int64(1024 * 1024 * 64)
  60. // defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
  61. defaultImagePullingProgressReportInterval = 10 * time.Second
  62. )
  63. // newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0,
  64. // defaultTimeout will be applied.
  65. func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout, imagePullProgressDeadline time.Duration) Interface {
  66. if requestTimeout == 0 {
  67. requestTimeout = defaultTimeout
  68. }
  69. k := &kubeDockerClient{
  70. client: dockerClient,
  71. timeout: requestTimeout,
  72. imagePullProgressDeadline: imagePullProgressDeadline,
  73. }
  74. // Notice that this assumes that docker is running before kubelet is started.
  75. ctx, cancel := k.getTimeoutContext()
  76. defer cancel()
  77. dockerClient.NegotiateAPIVersion(ctx)
  78. return k
  79. }
  80. func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
  81. ctx, cancel := d.getTimeoutContext()
  82. defer cancel()
  83. containers, err := d.client.ContainerList(ctx, options)
  84. if ctxErr := contextError(ctx); ctxErr != nil {
  85. return nil, ctxErr
  86. }
  87. if err != nil {
  88. return nil, err
  89. }
  90. return containers, nil
  91. }
  92. func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
  93. ctx, cancel := d.getTimeoutContext()
  94. defer cancel()
  95. containerJSON, err := d.client.ContainerInspect(ctx, id)
  96. if ctxErr := contextError(ctx); ctxErr != nil {
  97. return nil, ctxErr
  98. }
  99. if err != nil {
  100. return nil, err
  101. }
  102. return &containerJSON, nil
  103. }
  104. // InspectContainerWithSize is currently only used for Windows container stats
  105. func (d *kubeDockerClient) InspectContainerWithSize(id string) (*dockertypes.ContainerJSON, error) {
  106. ctx, cancel := d.getTimeoutContext()
  107. defer cancel()
  108. // Inspects the container including the fields SizeRw and SizeRootFs.
  109. containerJSON, _, err := d.client.ContainerInspectWithRaw(ctx, id, true)
  110. if ctxErr := contextError(ctx); ctxErr != nil {
  111. return nil, ctxErr
  112. }
  113. if err != nil {
  114. return nil, err
  115. }
  116. return &containerJSON, nil
  117. }
  118. func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
  119. ctx, cancel := d.getTimeoutContext()
  120. defer cancel()
  121. // we provide an explicit default shm size as to not depend on docker daemon.
  122. // TODO: evaluate exposing this as a knob in the API
  123. if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
  124. opts.HostConfig.ShmSize = defaultShmSize
  125. }
  126. createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
  127. if ctxErr := contextError(ctx); ctxErr != nil {
  128. return nil, ctxErr
  129. }
  130. if err != nil {
  131. return nil, err
  132. }
  133. return &createResp, nil
  134. }
  135. func (d *kubeDockerClient) StartContainer(id string) error {
  136. ctx, cancel := d.getTimeoutContext()
  137. defer cancel()
  138. err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
  139. if ctxErr := contextError(ctx); ctxErr != nil {
  140. return ctxErr
  141. }
  142. return err
  143. }
  144. // Stopping an already stopped container will not cause an error in dockerapi.
  145. func (d *kubeDockerClient) StopContainer(id string, timeout time.Duration) error {
  146. ctx, cancel := d.getCustomTimeoutContext(timeout)
  147. defer cancel()
  148. err := d.client.ContainerStop(ctx, id, &timeout)
  149. if ctxErr := contextError(ctx); ctxErr != nil {
  150. return ctxErr
  151. }
  152. return err
  153. }
  154. func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
  155. ctx, cancel := d.getTimeoutContext()
  156. defer cancel()
  157. err := d.client.ContainerRemove(ctx, id, opts)
  158. if ctxErr := contextError(ctx); ctxErr != nil {
  159. return ctxErr
  160. }
  161. return err
  162. }
  163. func (d *kubeDockerClient) UpdateContainerResources(id string, updateConfig dockercontainer.UpdateConfig) error {
  164. ctx, cancel := d.getTimeoutContext()
  165. defer cancel()
  166. _, err := d.client.ContainerUpdate(ctx, id, updateConfig)
  167. if ctxErr := contextError(ctx); ctxErr != nil {
  168. return ctxErr
  169. }
  170. return err
  171. }
  172. func (d *kubeDockerClient) inspectImageRaw(ref string) (*dockertypes.ImageInspect, error) {
  173. ctx, cancel := d.getTimeoutContext()
  174. defer cancel()
  175. resp, _, err := d.client.ImageInspectWithRaw(ctx, ref)
  176. if ctxErr := contextError(ctx); ctxErr != nil {
  177. return nil, ctxErr
  178. }
  179. if err != nil {
  180. if dockerapi.IsErrNotFound(err) {
  181. err = ImageNotFoundError{ID: ref}
  182. }
  183. return nil, err
  184. }
  185. return &resp, nil
  186. }
  187. func (d *kubeDockerClient) InspectImageByID(imageID string) (*dockertypes.ImageInspect, error) {
  188. resp, err := d.inspectImageRaw(imageID)
  189. if err != nil {
  190. return nil, err
  191. }
  192. if !matchImageIDOnly(*resp, imageID) {
  193. return nil, ImageNotFoundError{ID: imageID}
  194. }
  195. return resp, nil
  196. }
  197. func (d *kubeDockerClient) InspectImageByRef(imageRef string) (*dockertypes.ImageInspect, error) {
  198. resp, err := d.inspectImageRaw(imageRef)
  199. if err != nil {
  200. return nil, err
  201. }
  202. if !matchImageTagOrSHA(*resp, imageRef) {
  203. return nil, ImageNotFoundError{ID: imageRef}
  204. }
  205. return resp, nil
  206. }
  207. func (d *kubeDockerClient) ImageHistory(id string) ([]dockerimagetypes.HistoryResponseItem, error) {
  208. ctx, cancel := d.getTimeoutContext()
  209. defer cancel()
  210. resp, err := d.client.ImageHistory(ctx, id)
  211. if ctxErr := contextError(ctx); ctxErr != nil {
  212. return nil, ctxErr
  213. }
  214. return resp, err
  215. }
  216. func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.ImageSummary, error) {
  217. ctx, cancel := d.getTimeoutContext()
  218. defer cancel()
  219. images, err := d.client.ImageList(ctx, opts)
  220. if ctxErr := contextError(ctx); ctxErr != nil {
  221. return nil, ctxErr
  222. }
  223. if err != nil {
  224. return nil, err
  225. }
  226. return images, nil
  227. }
  228. func base64EncodeAuth(auth dockertypes.AuthConfig) (string, error) {
  229. var buf bytes.Buffer
  230. if err := json.NewEncoder(&buf).Encode(auth); err != nil {
  231. return "", err
  232. }
  233. return base64.URLEncoding.EncodeToString(buf.Bytes()), nil
  234. }
  235. // progress is a wrapper of dockermessage.JSONMessage with a lock protecting it.
  236. type progress struct {
  237. sync.RWMutex
  238. // message stores the latest docker json message.
  239. message *dockermessage.JSONMessage
  240. // timestamp of the latest update.
  241. timestamp time.Time
  242. }
  243. func newProgress() *progress {
  244. return &progress{timestamp: time.Now()}
  245. }
  246. func (p *progress) set(msg *dockermessage.JSONMessage) {
  247. p.Lock()
  248. defer p.Unlock()
  249. p.message = msg
  250. p.timestamp = time.Now()
  251. }
  252. func (p *progress) get() (string, time.Time) {
  253. p.RLock()
  254. defer p.RUnlock()
  255. if p.message == nil {
  256. return "No progress", p.timestamp
  257. }
  258. // The following code is based on JSONMessage.Display
  259. var prefix string
  260. if p.message.ID != "" {
  261. prefix = fmt.Sprintf("%s: ", p.message.ID)
  262. }
  263. if p.message.Progress == nil {
  264. return fmt.Sprintf("%s%s", prefix, p.message.Status), p.timestamp
  265. }
  266. return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()), p.timestamp
  267. }
  268. // progressReporter keeps the newest image pulling progress and periodically report the newest progress.
  269. type progressReporter struct {
  270. *progress
  271. image string
  272. cancel context.CancelFunc
  273. stopCh chan struct{}
  274. imagePullProgressDeadline time.Duration
  275. }
  276. // newProgressReporter creates a new progressReporter for specific image with specified reporting interval
  277. func newProgressReporter(image string, cancel context.CancelFunc, imagePullProgressDeadline time.Duration) *progressReporter {
  278. return &progressReporter{
  279. progress: newProgress(),
  280. image: image,
  281. cancel: cancel,
  282. stopCh: make(chan struct{}),
  283. imagePullProgressDeadline: imagePullProgressDeadline,
  284. }
  285. }
  286. // start starts the progressReporter
  287. func (p *progressReporter) start() {
  288. go func() {
  289. ticker := time.NewTicker(defaultImagePullingProgressReportInterval)
  290. defer ticker.Stop()
  291. for {
  292. // TODO(random-liu): Report as events.
  293. select {
  294. case <-ticker.C:
  295. progress, timestamp := p.progress.get()
  296. // If there is no progress for p.imagePullProgressDeadline, cancel the operation.
  297. if time.Since(timestamp) > p.imagePullProgressDeadline {
  298. klog.Errorf("Cancel pulling image %q because of no progress for %v, latest progress: %q", p.image, p.imagePullProgressDeadline, progress)
  299. p.cancel()
  300. return
  301. }
  302. klog.V(2).Infof("Pulling image %q: %q", p.image, progress)
  303. case <-p.stopCh:
  304. progress, _ := p.progress.get()
  305. klog.V(2).Infof("Stop pulling image %q: %q", p.image, progress)
  306. return
  307. }
  308. }
  309. }()
  310. }
  311. // stop stops the progressReporter
  312. func (p *progressReporter) stop() {
  313. close(p.stopCh)
  314. }
  315. func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error {
  316. // RegistryAuth is the base64 encoded credentials for the registry
  317. base64Auth, err := base64EncodeAuth(auth)
  318. if err != nil {
  319. return err
  320. }
  321. opts.RegistryAuth = base64Auth
  322. ctx, cancel := d.getCancelableContext()
  323. defer cancel()
  324. resp, err := d.client.ImagePull(ctx, image, opts)
  325. if err != nil {
  326. return err
  327. }
  328. defer resp.Close()
  329. reporter := newProgressReporter(image, cancel, d.imagePullProgressDeadline)
  330. reporter.start()
  331. defer reporter.stop()
  332. decoder := json.NewDecoder(resp)
  333. for {
  334. var msg dockermessage.JSONMessage
  335. err := decoder.Decode(&msg)
  336. if err == io.EOF {
  337. break
  338. }
  339. if err != nil {
  340. return err
  341. }
  342. if msg.Error != nil {
  343. return msg.Error
  344. }
  345. reporter.set(&msg)
  346. }
  347. return nil
  348. }
  349. func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
  350. ctx, cancel := d.getTimeoutContext()
  351. defer cancel()
  352. resp, err := d.client.ImageRemove(ctx, image, opts)
  353. if ctxErr := contextError(ctx); ctxErr != nil {
  354. return nil, ctxErr
  355. }
  356. if dockerapi.IsErrNotFound(err) {
  357. return nil, ImageNotFoundError{ID: image}
  358. }
  359. return resp, err
  360. }
  361. func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
  362. ctx, cancel := d.getCancelableContext()
  363. defer cancel()
  364. resp, err := d.client.ContainerLogs(ctx, id, opts)
  365. if ctxErr := contextError(ctx); ctxErr != nil {
  366. return ctxErr
  367. }
  368. if err != nil {
  369. return err
  370. }
  371. defer resp.Close()
  372. return d.redirectResponseToOutputStream(sopts.RawTerminal, sopts.OutputStream, sopts.ErrorStream, resp)
  373. }
  374. func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
  375. ctx, cancel := d.getTimeoutContext()
  376. defer cancel()
  377. resp, err := d.client.ServerVersion(ctx)
  378. if ctxErr := contextError(ctx); ctxErr != nil {
  379. return nil, ctxErr
  380. }
  381. if err != nil {
  382. return nil, err
  383. }
  384. return &resp, nil
  385. }
  386. func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
  387. ctx, cancel := d.getTimeoutContext()
  388. defer cancel()
  389. resp, err := d.client.Info(ctx)
  390. if ctxErr := contextError(ctx); ctxErr != nil {
  391. return nil, ctxErr
  392. }
  393. if err != nil {
  394. return nil, err
  395. }
  396. return &resp, nil
  397. }
  398. // TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did.
  399. func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.IDResponse, error) {
  400. ctx, cancel := d.getTimeoutContext()
  401. defer cancel()
  402. resp, err := d.client.ContainerExecCreate(ctx, id, opts)
  403. if ctxErr := contextError(ctx); ctxErr != nil {
  404. return nil, ctxErr
  405. }
  406. if err != nil {
  407. return nil, err
  408. }
  409. return &resp, nil
  410. }
  411. func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
  412. ctx, cancel := d.getCancelableContext()
  413. defer cancel()
  414. if opts.Detach {
  415. err := d.client.ContainerExecStart(ctx, startExec, opts)
  416. if ctxErr := contextError(ctx); ctxErr != nil {
  417. return ctxErr
  418. }
  419. return err
  420. }
  421. resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecStartCheck{
  422. Detach: opts.Detach,
  423. Tty: opts.Tty,
  424. })
  425. if ctxErr := contextError(ctx); ctxErr != nil {
  426. return ctxErr
  427. }
  428. if err != nil {
  429. return err
  430. }
  431. defer resp.Close()
  432. if sopts.ExecStarted != nil {
  433. // Send a message to the channel indicating that the exec has started. This is needed so
  434. // interactive execs can handle resizing correctly - the request to resize the TTY has to happen
  435. // after the call to d.client.ContainerExecAttach, and because d.holdHijackedConnection below
  436. // blocks, we use sopts.ExecStarted to signal the caller that it's ok to resize.
  437. sopts.ExecStarted <- struct{}{}
  438. }
  439. return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
  440. }
  441. func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) {
  442. ctx, cancel := d.getTimeoutContext()
  443. defer cancel()
  444. resp, err := d.client.ContainerExecInspect(ctx, id)
  445. if ctxErr := contextError(ctx); ctxErr != nil {
  446. return nil, ctxErr
  447. }
  448. if err != nil {
  449. return nil, err
  450. }
  451. return &resp, nil
  452. }
  453. func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
  454. ctx, cancel := d.getCancelableContext()
  455. defer cancel()
  456. resp, err := d.client.ContainerAttach(ctx, id, opts)
  457. if ctxErr := contextError(ctx); ctxErr != nil {
  458. return ctxErr
  459. }
  460. if err != nil {
  461. return err
  462. }
  463. defer resp.Close()
  464. return d.holdHijackedConnection(sopts.RawTerminal, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
  465. }
  466. func (d *kubeDockerClient) ResizeExecTTY(id string, height, width uint) error {
  467. ctx, cancel := d.getCancelableContext()
  468. defer cancel()
  469. return d.client.ContainerExecResize(ctx, id, dockertypes.ResizeOptions{
  470. Height: height,
  471. Width: width,
  472. })
  473. }
  474. func (d *kubeDockerClient) ResizeContainerTTY(id string, height, width uint) error {
  475. ctx, cancel := d.getCancelableContext()
  476. defer cancel()
  477. return d.client.ContainerResize(ctx, id, dockertypes.ResizeOptions{
  478. Height: height,
  479. Width: width,
  480. })
  481. }
  482. // GetContainerStats is currently only used for Windows container stats
  483. func (d *kubeDockerClient) GetContainerStats(id string) (*dockertypes.StatsJSON, error) {
  484. ctx, cancel := d.getCancelableContext()
  485. defer cancel()
  486. response, err := d.client.ContainerStats(ctx, id, false)
  487. if err != nil {
  488. return nil, err
  489. }
  490. dec := json.NewDecoder(response.Body)
  491. var stats dockertypes.StatsJSON
  492. err = dec.Decode(&stats)
  493. if err != nil {
  494. return nil, err
  495. }
  496. defer response.Body.Close()
  497. return &stats, nil
  498. }
  499. // redirectResponseToOutputStream redirect the response stream to stdout and stderr. When tty is true, all stream will
  500. // only be redirected to stdout.
  501. func (d *kubeDockerClient) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error {
  502. if outputStream == nil {
  503. outputStream = ioutil.Discard
  504. }
  505. if errorStream == nil {
  506. errorStream = ioutil.Discard
  507. }
  508. var err error
  509. if tty {
  510. _, err = io.Copy(outputStream, resp)
  511. } else {
  512. _, err = dockerstdcopy.StdCopy(outputStream, errorStream, resp)
  513. }
  514. return err
  515. }
  516. // holdHijackedConnection hold the HijackedResponse, redirect the inputStream to the connection, and redirect the response
  517. // stream to stdout and stderr. NOTE: If needed, we could also add context in this function.
  518. func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error {
  519. receiveStdout := make(chan error)
  520. if outputStream != nil || errorStream != nil {
  521. go func() {
  522. receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader)
  523. }()
  524. }
  525. stdinDone := make(chan struct{})
  526. go func() {
  527. if inputStream != nil {
  528. io.Copy(resp.Conn, inputStream)
  529. }
  530. resp.CloseWrite()
  531. close(stdinDone)
  532. }()
  533. select {
  534. case err := <-receiveStdout:
  535. return err
  536. case <-stdinDone:
  537. if outputStream != nil || errorStream != nil {
  538. return <-receiveStdout
  539. }
  540. }
  541. return nil
  542. }
  543. // getCancelableContext returns a new cancelable context. For long running requests without timeout, we use cancelable
  544. // context to avoid potential resource leak, although the current implementation shouldn't leak resource.
  545. func (d *kubeDockerClient) getCancelableContext() (context.Context, context.CancelFunc) {
  546. return context.WithCancel(context.Background())
  547. }
  548. // getTimeoutContext returns a new context with default request timeout
  549. func (d *kubeDockerClient) getTimeoutContext() (context.Context, context.CancelFunc) {
  550. return context.WithTimeout(context.Background(), d.timeout)
  551. }
  552. // getCustomTimeoutContext returns a new context with a specific request timeout
  553. func (d *kubeDockerClient) getCustomTimeoutContext(timeout time.Duration) (context.Context, context.CancelFunc) {
  554. // Pick the larger of the two
  555. if d.timeout > timeout {
  556. timeout = d.timeout
  557. }
  558. return context.WithTimeout(context.Background(), timeout)
  559. }
  560. // contextError checks the context, and returns error if the context is timeout.
  561. func contextError(ctx context.Context) error {
  562. if ctx.Err() == context.DeadlineExceeded {
  563. return operationTimeout{err: ctx.Err()}
  564. }
  565. return ctx.Err()
  566. }
  567. // StreamOptions are the options used to configure the stream redirection
  568. type StreamOptions struct {
  569. RawTerminal bool
  570. InputStream io.Reader
  571. OutputStream io.Writer
  572. ErrorStream io.Writer
  573. ExecStarted chan struct{}
  574. }
  575. // operationTimeout is the error returned when the docker operations are timeout.
  576. type operationTimeout struct {
  577. err error
  578. }
  579. func (e operationTimeout) Error() string {
  580. return fmt.Sprintf("operation timeout: %v", e.err)
  581. }
  582. // containerNotFoundErrorRegx is the regexp of container not found error message.
  583. var containerNotFoundErrorRegx = regexp.MustCompile(`No such container: [0-9a-z]+`)
  584. // IsContainerNotFoundError checks whether the error is container not found error.
  585. func IsContainerNotFoundError(err error) bool {
  586. return containerNotFoundErrorRegx.MatchString(err.Error())
  587. }
  588. // ImageNotFoundError is the error returned by InspectImage when image not found.
  589. // Expose this to inject error in dockershim for testing.
  590. type ImageNotFoundError struct {
  591. ID string
  592. }
  593. func (e ImageNotFoundError) Error() string {
  594. return fmt.Sprintf("no such image: %q", e.ID)
  595. }
  596. // IsImageNotFoundError checks whether the error is image not found error. This is exposed
  597. // to share with dockershim.
  598. func IsImageNotFoundError(err error) bool {
  599. _, ok := err.(ImageNotFoundError)
  600. return ok
  601. }