1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363 |
- // Copyright 2014 Google Inc. All Rights Reserved.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // Manager of cAdvisor-monitored containers.
- package manager
- import (
- "flag"
- "fmt"
- "net/http"
- "os"
- "path"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/google/cadvisor/accelerators"
- "github.com/google/cadvisor/cache/memory"
- "github.com/google/cadvisor/collector"
- "github.com/google/cadvisor/container"
- "github.com/google/cadvisor/container/docker"
- "github.com/google/cadvisor/container/raw"
- "github.com/google/cadvisor/events"
- "github.com/google/cadvisor/fs"
- info "github.com/google/cadvisor/info/v1"
- "github.com/google/cadvisor/info/v2"
- "github.com/google/cadvisor/machine"
- "github.com/google/cadvisor/utils/oomparser"
- "github.com/google/cadvisor/utils/sysfs"
- "github.com/google/cadvisor/version"
- "github.com/google/cadvisor/watcher"
- "github.com/opencontainers/runc/libcontainer/cgroups"
- "k8s.io/klog"
- "k8s.io/utils/clock"
- )
- var globalHousekeepingInterval = flag.Duration("global_housekeeping_interval", 1*time.Minute, "Interval between global housekeepings")
- var updateMachineInfoInterval = flag.Duration("update_machine_info_interval", 5*time.Minute, "Interval between machine info updates.")
- var logCadvisorUsage = flag.Bool("log_cadvisor_usage", false, "Whether to log the usage of the cAdvisor container")
- var eventStorageAgeLimit = flag.String("event_storage_age_limit", "default=24h", "Max length of time for which to store events (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is a duration. Default is applied to all non-specified event types")
- var eventStorageEventLimit = flag.String("event_storage_event_limit", "default=100000", "Max number of events to store (per type). Value is a comma separated list of key values, where the keys are event types (e.g.: creation, oom) or \"default\" and the value is an integer. Default is applied to all non-specified event types")
- var applicationMetricsCountLimit = flag.Int("application_metrics_count_limit", 100, "Max number of application metrics to store (per container)")
- // The Manager interface defines operations for starting a manager and getting
- // container and machine information.
- type Manager interface {
- // Start the manager. Calling other manager methods before this returns
- // may produce undefined behavior.
- Start() error
- // Stops the manager.
- Stop() error
- // information about a container.
- GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
- // Get V2 information about a container.
- // Recursive (subcontainer) requests are best-effort, and may return a partial result alongside an
- // error in the partial failure case.
- GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error)
- // Get information about all subcontainers of the specified container (includes self).
- SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error)
- // Gets all the Docker containers. Return is a map from full container name to ContainerInfo.
- AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error)
- // Gets information about a specific Docker container. The specified name is within the Docker namespace.
- DockerContainer(dockerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error)
- // Gets spec for all containers based on request options.
- GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error)
- // Gets summary stats for all containers based on request options.
- GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error)
- // Get info for all requested containers based on the request options.
- GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error)
- // Returns true if the named container exists.
- Exists(containerName string) bool
- // Get information about the machine.
- GetMachineInfo() (*info.MachineInfo, error)
- // Get version information about different components we depend on.
- GetVersionInfo() (*info.VersionInfo, error)
- // GetFsInfoByFsUUID returns the information of the device having the
- // specified filesystem uuid. If no such device with the UUID exists, this
- // function will return the fs.ErrNoSuchDevice error.
- GetFsInfoByFsUUID(uuid string) (v2.FsInfo, error)
- // Get filesystem information for the filesystem that contains the given directory
- GetDirFsInfo(dir string) (v2.FsInfo, error)
- // Get filesystem information for a given label.
- // Returns information for all global filesystems if label is empty.
- GetFsInfo(label string) ([]v2.FsInfo, error)
- // Get ps output for a container.
- GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error)
- // Get events streamed through passedChannel that fit the request.
- WatchForEvents(request *events.Request) (*events.EventChannel, error)
- // Get past events that have been detected and that fit the request.
- GetPastEvents(request *events.Request) ([]*info.Event, error)
- CloseEventChannel(watch_id int)
- // Get status information about docker.
- DockerInfo() (info.DockerStatus, error)
- // Get details about interesting docker images.
- DockerImages() ([]info.DockerImage, error)
- // Returns debugging information. Map of lines per category.
- DebugInfo() map[string][]string
- }
- // New takes a memory storage and returns a new manager.
- func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, includedMetricsSet container.MetricSet, collectorHttpClient *http.Client, rawContainerCgroupPathPrefixWhiteList []string) (Manager, error) {
- if memoryCache == nil {
- return nil, fmt.Errorf("manager requires memory storage")
- }
- // Detect the container we are running on.
- selfContainer, err := cgroups.GetOwnCgroupPath("cpu")
- if err != nil {
- return nil, err
- }
- klog.V(2).Infof("cAdvisor running in container: %q", selfContainer)
- context := fs.Context{}
- if err := container.InitializeFSContext(&context); err != nil {
- return nil, err
- }
- fsInfo, err := fs.NewFsInfo(context)
- if err != nil {
- return nil, err
- }
- // If cAdvisor was started with host's rootfs mounted, assume that its running
- // in its own namespaces.
- inHostNamespace := false
- if _, err := os.Stat("/rootfs/proc"); os.IsNotExist(err) {
- inHostNamespace = true
- }
- // Register for new subcontainers.
- eventsChannel := make(chan watcher.ContainerEvent, 16)
- newManager := &manager{
- containers: make(map[namespacedContainerName]*containerData),
- quitChannels: make([]chan error, 0, 2),
- memoryCache: memoryCache,
- fsInfo: fsInfo,
- sysFs: sysfs,
- cadvisorContainer: selfContainer,
- inHostNamespace: inHostNamespace,
- startupTime: time.Now(),
- maxHousekeepingInterval: maxHousekeepingInterval,
- allowDynamicHousekeeping: allowDynamicHousekeeping,
- includedMetrics: includedMetricsSet,
- containerWatchers: []watcher.ContainerWatcher{},
- eventsChannel: eventsChannel,
- collectorHttpClient: collectorHttpClient,
- nvidiaManager: &accelerators.NvidiaManager{},
- rawContainerCgroupPathPrefixWhiteList: rawContainerCgroupPathPrefixWhiteList,
- }
- machineInfo, err := machine.Info(sysfs, fsInfo, inHostNamespace)
- if err != nil {
- return nil, err
- }
- newManager.machineInfo = *machineInfo
- klog.V(1).Infof("Machine: %+v", newManager.machineInfo)
- versionInfo, err := getVersionInfo()
- if err != nil {
- return nil, err
- }
- klog.V(1).Infof("Version: %+v", *versionInfo)
- newManager.eventHandler = events.NewEventManager(parseEventsStoragePolicy())
- return newManager, nil
- }
- // A namespaced container name.
- type namespacedContainerName struct {
- // The namespace of the container. Can be empty for the root namespace.
- Namespace string
- // The name of the container in this namespace.
- Name string
- }
- type manager struct {
- containers map[namespacedContainerName]*containerData
- containersLock sync.RWMutex
- memoryCache *memory.InMemoryCache
- fsInfo fs.FsInfo
- sysFs sysfs.SysFs
- machineMu sync.RWMutex // protects machineInfo
- machineInfo info.MachineInfo
- quitChannels []chan error
- cadvisorContainer string
- inHostNamespace bool
- eventHandler events.EventManager
- startupTime time.Time
- maxHousekeepingInterval time.Duration
- allowDynamicHousekeeping bool
- includedMetrics container.MetricSet
- containerWatchers []watcher.ContainerWatcher
- eventsChannel chan watcher.ContainerEvent
- collectorHttpClient *http.Client
- nvidiaManager accelerators.AcceleratorManager
- // List of raw container cgroup path prefix whitelist.
- rawContainerCgroupPathPrefixWhiteList []string
- }
- // Start the container manager.
- func (self *manager) Start() error {
- self.containerWatchers = container.InitializePlugins(self, self.fsInfo, self.includedMetrics)
- err := raw.Register(self, self.fsInfo, self.includedMetrics, self.rawContainerCgroupPathPrefixWhiteList)
- if err != nil {
- klog.Errorf("Registration of the raw container factory failed: %v", err)
- }
- rawWatcher, err := raw.NewRawContainerWatcher()
- if err != nil {
- return err
- }
- self.containerWatchers = append(self.containerWatchers, rawWatcher)
- // Watch for OOMs.
- err = self.watchForNewOoms()
- if err != nil {
- klog.Warningf("Could not configure a source for OOM detection, disabling OOM events: %v", err)
- }
- // If there are no factories, don't start any housekeeping and serve the information we do have.
- if !container.HasFactories() {
- return nil
- }
- // Setup collection of nvidia GPU metrics if any of them are attached to the machine.
- self.nvidiaManager.Setup()
- // Create root and then recover all containers.
- err = self.createContainer("/", watcher.Raw)
- if err != nil {
- return err
- }
- klog.V(2).Infof("Starting recovery of all containers")
- err = self.detectSubcontainers("/")
- if err != nil {
- return err
- }
- klog.V(2).Infof("Recovery completed")
- // Watch for new container.
- quitWatcher := make(chan error)
- err = self.watchForNewContainers(quitWatcher)
- if err != nil {
- return err
- }
- self.quitChannels = append(self.quitChannels, quitWatcher)
- // Look for new containers in the main housekeeping thread.
- quitGlobalHousekeeping := make(chan error)
- self.quitChannels = append(self.quitChannels, quitGlobalHousekeeping)
- go self.globalHousekeeping(quitGlobalHousekeeping)
- quitUpdateMachineInfo := make(chan error)
- self.quitChannels = append(self.quitChannels, quitUpdateMachineInfo)
- go self.updateMachineInfo(quitUpdateMachineInfo)
- return nil
- }
- func (self *manager) Stop() error {
- defer self.nvidiaManager.Destroy()
- // Stop and wait on all quit channels.
- for i, c := range self.quitChannels {
- // Send the exit signal and wait on the thread to exit (by closing the channel).
- c <- nil
- err := <-c
- if err != nil {
- // Remove the channels that quit successfully.
- self.quitChannels = self.quitChannels[i:]
- return err
- }
- }
- self.quitChannels = make([]chan error, 0, 2)
- return nil
- }
- func (self *manager) updateMachineInfo(quit chan error) {
- ticker := time.NewTicker(*updateMachineInfoInterval)
- for {
- select {
- case <-ticker.C:
- info, err := machine.Info(self.sysFs, self.fsInfo, self.inHostNamespace)
- if err != nil {
- klog.Errorf("Could not get machine info: %v", err)
- break
- }
- self.machineMu.Lock()
- self.machineInfo = *info
- self.machineMu.Unlock()
- klog.V(5).Infof("Update machine info: %+v", *info)
- case <-quit:
- ticker.Stop()
- quit <- nil
- return
- }
- }
- }
- func (self *manager) globalHousekeeping(quit chan error) {
- // Long housekeeping is either 100ms or half of the housekeeping interval.
- longHousekeeping := 100 * time.Millisecond
- if *globalHousekeepingInterval/2 < longHousekeeping {
- longHousekeeping = *globalHousekeepingInterval / 2
- }
- ticker := time.Tick(*globalHousekeepingInterval)
- for {
- select {
- case t := <-ticker:
- start := time.Now()
- // Check for new containers.
- err := self.detectSubcontainers("/")
- if err != nil {
- klog.Errorf("Failed to detect containers: %s", err)
- }
- // Log if housekeeping took too long.
- duration := time.Since(start)
- if duration >= longHousekeeping {
- klog.V(3).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
- }
- case <-quit:
- // Quit if asked to do so.
- quit <- nil
- klog.Infof("Exiting global housekeeping thread")
- return
- }
- }
- }
- func (self *manager) getContainerData(containerName string) (*containerData, error) {
- var cont *containerData
- var ok bool
- func() {
- self.containersLock.RLock()
- defer self.containersLock.RUnlock()
- // Ensure we have the container.
- cont, ok = self.containers[namespacedContainerName{
- Name: containerName,
- }]
- }()
- if !ok {
- return nil, fmt.Errorf("unknown container %q", containerName)
- }
- return cont, nil
- }
- func (self *manager) GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error) {
- conts, err := self.getRequestedContainers(containerName, options)
- if err != nil {
- return nil, err
- }
- var errs partialFailure
- stats := make(map[string]v2.DerivedStats)
- for name, cont := range conts {
- d, err := cont.DerivedStats()
- if err != nil {
- errs.append(name, "DerivedStats", err)
- }
- stats[name] = d
- }
- return stats, errs.OrNil()
- }
- func (self *manager) GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) {
- conts, err := self.getRequestedContainers(containerName, options)
- if err != nil {
- return nil, err
- }
- var errs partialFailure
- specs := make(map[string]v2.ContainerSpec)
- for name, cont := range conts {
- cinfo, err := cont.GetInfo(false)
- if err != nil {
- errs.append(name, "GetInfo", err)
- }
- spec := self.getV2Spec(cinfo)
- specs[name] = spec
- }
- return specs, errs.OrNil()
- }
- // Get V2 container spec from v1 container info.
- func (self *manager) getV2Spec(cinfo *containerInfo) v2.ContainerSpec {
- spec := self.getAdjustedSpec(cinfo)
- return v2.ContainerSpecFromV1(&spec, cinfo.Aliases, cinfo.Namespace)
- }
- func (self *manager) getAdjustedSpec(cinfo *containerInfo) info.ContainerSpec {
- spec := cinfo.Spec
- // Set default value to an actual value
- if spec.HasMemory {
- // Memory.Limit is 0 means there's no limit
- if spec.Memory.Limit == 0 {
- self.machineMu.RLock()
- spec.Memory.Limit = uint64(self.machineInfo.MemoryCapacity)
- self.machineMu.RUnlock()
- }
- }
- return spec
- }
- func (self *manager) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
- cont, err := self.getContainerData(containerName)
- if err != nil {
- return nil, err
- }
- return self.containerDataToContainerInfo(cont, query)
- }
- func (self *manager) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) {
- containers, err := self.getRequestedContainers(containerName, options)
- if err != nil {
- return nil, err
- }
- var errs partialFailure
- var nilTime time.Time // Ignored.
- infos := make(map[string]v2.ContainerInfo, len(containers))
- for name, container := range containers {
- result := v2.ContainerInfo{}
- cinfo, err := container.GetInfo(false)
- if err != nil {
- errs.append(name, "GetInfo", err)
- infos[name] = result
- continue
- }
- result.Spec = self.getV2Spec(cinfo)
- stats, err := self.memoryCache.RecentStats(name, nilTime, nilTime, options.Count)
- if err != nil {
- errs.append(name, "RecentStats", err)
- infos[name] = result
- continue
- }
- result.Stats = v2.ContainerStatsFromV1(containerName, &cinfo.Spec, stats)
- infos[name] = result
- }
- return infos, errs.OrNil()
- }
- func (self *manager) containerDataToContainerInfo(cont *containerData, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
- // Get the info from the container.
- cinfo, err := cont.GetInfo(true)
- if err != nil {
- return nil, err
- }
- stats, err := self.memoryCache.RecentStats(cinfo.Name, query.Start, query.End, query.NumStats)
- if err != nil {
- return nil, err
- }
- // Make a copy of the info for the user.
- ret := &info.ContainerInfo{
- ContainerReference: cinfo.ContainerReference,
- Subcontainers: cinfo.Subcontainers,
- Spec: self.getAdjustedSpec(cinfo),
- Stats: stats,
- }
- return ret, nil
- }
- func (self *manager) getContainer(containerName string) (*containerData, error) {
- self.containersLock.RLock()
- defer self.containersLock.RUnlock()
- cont, ok := self.containers[namespacedContainerName{Name: containerName}]
- if !ok {
- return nil, fmt.Errorf("unknown container %q", containerName)
- }
- return cont, nil
- }
- func (self *manager) getSubcontainers(containerName string) map[string]*containerData {
- self.containersLock.RLock()
- defer self.containersLock.RUnlock()
- containersMap := make(map[string]*containerData, len(self.containers))
- // Get all the unique subcontainers of the specified container
- matchedName := path.Join(containerName, "/")
- for i := range self.containers {
- name := self.containers[i].info.Name
- if name == containerName || strings.HasPrefix(name, matchedName) {
- containersMap[self.containers[i].info.Name] = self.containers[i]
- }
- }
- return containersMap
- }
- func (self *manager) SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
- containersMap := self.getSubcontainers(containerName)
- containers := make([]*containerData, 0, len(containersMap))
- for _, cont := range containersMap {
- containers = append(containers, cont)
- }
- return self.containerDataSliceToContainerInfoSlice(containers, query)
- }
- func (self *manager) getAllDockerContainers() map[string]*containerData {
- self.containersLock.RLock()
- defer self.containersLock.RUnlock()
- containers := make(map[string]*containerData, len(self.containers))
- // Get containers in the Docker namespace.
- for name, cont := range self.containers {
- if name.Namespace == docker.DockerNamespace {
- containers[cont.info.Name] = cont
- }
- }
- return containers
- }
- func (self *manager) AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error) {
- containers := self.getAllDockerContainers()
- output := make(map[string]info.ContainerInfo, len(containers))
- for name, cont := range containers {
- inf, err := self.containerDataToContainerInfo(cont, query)
- if err != nil {
- // Ignore the error because of race condition and return best-effort result.
- if err == memory.ErrDataNotFound {
- klog.Warningf("Error getting data for container %s because of race condition", name)
- continue
- }
- return nil, err
- }
- output[name] = *inf
- }
- return output, nil
- }
- func (self *manager) getDockerContainer(containerName string) (*containerData, error) {
- self.containersLock.RLock()
- defer self.containersLock.RUnlock()
- // Check for the container in the Docker container namespace.
- cont, ok := self.containers[namespacedContainerName{
- Namespace: docker.DockerNamespace,
- Name: containerName,
- }]
- // Look for container by short prefix name if no exact match found.
- if !ok {
- for contName, c := range self.containers {
- if contName.Namespace == docker.DockerNamespace && strings.HasPrefix(contName.Name, containerName) {
- if cont == nil {
- cont = c
- } else {
- return nil, fmt.Errorf("unable to find container. Container %q is not unique", containerName)
- }
- }
- }
- if cont == nil {
- return nil, fmt.Errorf("unable to find Docker container %q", containerName)
- }
- }
- return cont, nil
- }
- func (self *manager) DockerContainer(containerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error) {
- container, err := self.getDockerContainer(containerName)
- if err != nil {
- return info.ContainerInfo{}, err
- }
- inf, err := self.containerDataToContainerInfo(container, query)
- if err != nil {
- return info.ContainerInfo{}, err
- }
- return *inf, nil
- }
- func (self *manager) containerDataSliceToContainerInfoSlice(containers []*containerData, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
- if len(containers) == 0 {
- return nil, fmt.Errorf("no containers found")
- }
- // Get the info for each container.
- output := make([]*info.ContainerInfo, 0, len(containers))
- for i := range containers {
- cinfo, err := self.containerDataToContainerInfo(containers[i], query)
- if err != nil {
- // Skip containers with errors, we try to degrade gracefully.
- continue
- }
- output = append(output, cinfo)
- }
- return output, nil
- }
- func (self *manager) GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error) {
- containers, err := self.getRequestedContainers(containerName, options)
- if err != nil {
- return nil, err
- }
- var errs partialFailure
- containersMap := make(map[string]*info.ContainerInfo)
- query := info.ContainerInfoRequest{
- NumStats: options.Count,
- }
- for name, data := range containers {
- info, err := self.containerDataToContainerInfo(data, &query)
- if err != nil {
- errs.append(name, "containerDataToContainerInfo", err)
- }
- containersMap[name] = info
- }
- return containersMap, errs.OrNil()
- }
- func (self *manager) getRequestedContainers(containerName string, options v2.RequestOptions) (map[string]*containerData, error) {
- containersMap := make(map[string]*containerData)
- switch options.IdType {
- case v2.TypeName:
- if options.Recursive == false {
- cont, err := self.getContainer(containerName)
- if err != nil {
- return containersMap, err
- }
- containersMap[cont.info.Name] = cont
- } else {
- containersMap = self.getSubcontainers(containerName)
- if len(containersMap) == 0 {
- return containersMap, fmt.Errorf("unknown container: %q", containerName)
- }
- }
- case v2.TypeDocker:
- if options.Recursive == false {
- containerName = strings.TrimPrefix(containerName, "/")
- cont, err := self.getDockerContainer(containerName)
- if err != nil {
- return containersMap, err
- }
- containersMap[cont.info.Name] = cont
- } else {
- if containerName != "/" {
- return containersMap, fmt.Errorf("invalid request for docker container %q with subcontainers", containerName)
- }
- containersMap = self.getAllDockerContainers()
- }
- default:
- return containersMap, fmt.Errorf("invalid request type %q", options.IdType)
- }
- if options.MaxAge != nil {
- // update stats for all containers in containersMap
- var waitGroup sync.WaitGroup
- waitGroup.Add(len(containersMap))
- for _, container := range containersMap {
- go func(cont *containerData) {
- cont.OnDemandHousekeeping(*options.MaxAge)
- waitGroup.Done()
- }(container)
- }
- waitGroup.Wait()
- }
- return containersMap, nil
- }
- func (self *manager) GetDirFsInfo(dir string) (v2.FsInfo, error) {
- device, err := self.fsInfo.GetDirFsDevice(dir)
- if err != nil {
- return v2.FsInfo{}, fmt.Errorf("failed to get device for dir %q: %v", dir, err)
- }
- return self.getFsInfoByDeviceName(device.Device)
- }
- func (self *manager) GetFsInfoByFsUUID(uuid string) (v2.FsInfo, error) {
- device, err := self.fsInfo.GetDeviceInfoByFsUUID(uuid)
- if err != nil {
- return v2.FsInfo{}, err
- }
- return self.getFsInfoByDeviceName(device.Device)
- }
- func (self *manager) GetFsInfo(label string) ([]v2.FsInfo, error) {
- var empty time.Time
- // Get latest data from filesystems hanging off root container.
- stats, err := self.memoryCache.RecentStats("/", empty, empty, 1)
- if err != nil {
- return nil, err
- }
- dev := ""
- if len(label) != 0 {
- dev, err = self.fsInfo.GetDeviceForLabel(label)
- if err != nil {
- return nil, err
- }
- }
- fsInfo := []v2.FsInfo{}
- for i := range stats[0].Filesystem {
- fs := stats[0].Filesystem[i]
- if len(label) != 0 && fs.Device != dev {
- continue
- }
- mountpoint, err := self.fsInfo.GetMountpointForDevice(fs.Device)
- if err != nil {
- return nil, err
- }
- labels, err := self.fsInfo.GetLabelsForDevice(fs.Device)
- if err != nil {
- return nil, err
- }
- fi := v2.FsInfo{
- Timestamp: stats[0].Timestamp,
- Device: fs.Device,
- Mountpoint: mountpoint,
- Capacity: fs.Limit,
- Usage: fs.Usage,
- Available: fs.Available,
- Labels: labels,
- }
- if fs.HasInodes {
- fi.Inodes = &fs.Inodes
- fi.InodesFree = &fs.InodesFree
- }
- fsInfo = append(fsInfo, fi)
- }
- return fsInfo, nil
- }
- func (m *manager) GetMachineInfo() (*info.MachineInfo, error) {
- m.machineMu.RLock()
- defer m.machineMu.RUnlock()
- // Copy and return the MachineInfo.
- return &m.machineInfo, nil
- }
- func (m *manager) GetVersionInfo() (*info.VersionInfo, error) {
- // TODO: Consider caching this and periodically updating. The VersionInfo may change if
- // the docker daemon is started after the cAdvisor client is created. Caching the value
- // would be helpful so we would be able to return the last known docker version if
- // docker was down at the time of a query.
- return getVersionInfo()
- }
- func (m *manager) Exists(containerName string) bool {
- m.containersLock.Lock()
- defer m.containersLock.Unlock()
- namespacedName := namespacedContainerName{
- Name: containerName,
- }
- _, ok := m.containers[namespacedName]
- if ok {
- return true
- }
- return false
- }
- func (m *manager) GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error) {
- // override recursive. Only support single container listing.
- options.Recursive = false
- // override MaxAge. ProcessList does not require updated stats.
- options.MaxAge = nil
- conts, err := m.getRequestedContainers(containerName, options)
- if err != nil {
- return nil, err
- }
- if len(conts) != 1 {
- return nil, fmt.Errorf("Expected the request to match only one container")
- }
- // TODO(rjnagal): handle count? Only if we can do count by type (eg. top 5 cpu users)
- ps := []v2.ProcessInfo{}
- for _, cont := range conts {
- ps, err = cont.GetProcessList(m.cadvisorContainer, m.inHostNamespace)
- if err != nil {
- return nil, err
- }
- }
- return ps, nil
- }
- func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *containerData) error {
- for k, v := range collectorConfigs {
- configFile, err := cont.ReadFile(v, m.inHostNamespace)
- if err != nil {
- return fmt.Errorf("failed to read config file %q for config %q, container %q: %v", k, v, cont.info.Name, err)
- }
- klog.V(4).Infof("Got config from %q: %q", v, configFile)
- if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") {
- newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHttpClient)
- if err != nil {
- return fmt.Errorf("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
- }
- err = cont.collectorManager.RegisterCollector(newCollector)
- if err != nil {
- return fmt.Errorf("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err)
- }
- } else {
- newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHttpClient)
- if err != nil {
- return fmt.Errorf("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
- }
- err = cont.collectorManager.RegisterCollector(newCollector)
- if err != nil {
- return fmt.Errorf("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err)
- }
- }
- }
- return nil
- }
- // Enables overwriting an existing containerData/Handler object for a given containerName.
- // Can't use createContainer as it just returns if a given containerName has a handler already.
- // Ex: rkt handler will want to take priority over the raw handler, but the raw handler might be created first.
- // Only allow raw handler to be overridden
- func (m *manager) overrideContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
- m.containersLock.Lock()
- defer m.containersLock.Unlock()
- namespacedName := namespacedContainerName{
- Name: containerName,
- }
- if _, ok := m.containers[namespacedName]; ok {
- containerData := m.containers[namespacedName]
- if containerData.handler.Type() != container.ContainerTypeRaw {
- return nil
- }
- err := m.destroyContainerLocked(containerName)
- if err != nil {
- return fmt.Errorf("overrideContainer: failed to destroy containerData/handler for %v: %v", containerName, err)
- }
- }
- return m.createContainerLocked(containerName, watchSource)
- }
- // Create a container.
- func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
- m.containersLock.Lock()
- defer m.containersLock.Unlock()
- return m.createContainerLocked(containerName, watchSource)
- }
- func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
- namespacedName := namespacedContainerName{
- Name: containerName,
- }
- // Check that the container didn't already exist.
- if _, ok := m.containers[namespacedName]; ok {
- return nil
- }
- handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
- if err != nil {
- return err
- }
- if !accept {
- // ignoring this container.
- klog.V(4).Infof("ignoring container %q", containerName)
- return nil
- }
- collectorManager, err := collector.NewCollectorManager()
- if err != nil {
- return err
- }
- logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
- cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
- if err != nil {
- return err
- }
- devicesCgroupPath, err := handler.GetCgroupPath("devices")
- if err != nil {
- klog.Warningf("Error getting devices cgroup path: %v", err)
- } else {
- cont.nvidiaCollector, err = m.nvidiaManager.GetCollector(devicesCgroupPath)
- if err != nil {
- klog.V(4).Infof("GPU metrics may be unavailable/incomplete for container %q: %v", cont.info.Name, err)
- }
- }
- // Add collectors
- labels := handler.GetContainerLabels()
- collectorConfigs := collector.GetCollectorConfigs(labels)
- err = m.registerCollectors(collectorConfigs, cont)
- if err != nil {
- klog.Warningf("Failed to register collectors for %q: %v", containerName, err)
- }
- // Add the container name and all its aliases. The aliases must be within the namespace of the factory.
- m.containers[namespacedName] = cont
- for _, alias := range cont.info.Aliases {
- m.containers[namespacedContainerName{
- Namespace: cont.info.Namespace,
- Name: alias,
- }] = cont
- }
- klog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
- contSpec, err := cont.handler.GetSpec()
- if err != nil {
- return err
- }
- contRef, err := cont.handler.ContainerReference()
- if err != nil {
- return err
- }
- newEvent := &info.Event{
- ContainerName: contRef.Name,
- Timestamp: contSpec.CreationTime,
- EventType: info.EventContainerCreation,
- }
- err = m.eventHandler.AddEvent(newEvent)
- if err != nil {
- return err
- }
- // Start the container's housekeeping.
- return cont.Start()
- }
- func (m *manager) destroyContainer(containerName string) error {
- m.containersLock.Lock()
- defer m.containersLock.Unlock()
- return m.destroyContainerLocked(containerName)
- }
- func (m *manager) destroyContainerLocked(containerName string) error {
- namespacedName := namespacedContainerName{
- Name: containerName,
- }
- cont, ok := m.containers[namespacedName]
- if !ok {
- // Already destroyed, done.
- return nil
- }
- // Tell the container to stop.
- err := cont.Stop()
- if err != nil {
- return err
- }
- // Remove the container from our records (and all its aliases).
- delete(m.containers, namespacedName)
- for _, alias := range cont.info.Aliases {
- delete(m.containers, namespacedContainerName{
- Namespace: cont.info.Namespace,
- Name: alias,
- })
- }
- klog.V(3).Infof("Destroyed container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
- contRef, err := cont.handler.ContainerReference()
- if err != nil {
- return err
- }
- newEvent := &info.Event{
- ContainerName: contRef.Name,
- Timestamp: time.Now(),
- EventType: info.EventContainerDeletion,
- }
- err = m.eventHandler.AddEvent(newEvent)
- if err != nil {
- return err
- }
- return nil
- }
- // Detect all containers that have been added or deleted from the specified container.
- func (m *manager) getContainersDiff(containerName string) (added []info.ContainerReference, removed []info.ContainerReference, err error) {
- // Get all subcontainers recursively.
- m.containersLock.RLock()
- cont, ok := m.containers[namespacedContainerName{
- Name: containerName,
- }]
- m.containersLock.RUnlock()
- if !ok {
- return nil, nil, fmt.Errorf("failed to find container %q while checking for new containers", containerName)
- }
- allContainers, err := cont.handler.ListContainers(container.ListRecursive)
- if err != nil {
- return nil, nil, err
- }
- allContainers = append(allContainers, info.ContainerReference{Name: containerName})
- m.containersLock.RLock()
- defer m.containersLock.RUnlock()
- // Determine which were added and which were removed.
- allContainersSet := make(map[string]*containerData)
- for name, d := range m.containers {
- // Only add the canonical name.
- if d.info.Name == name.Name {
- allContainersSet[name.Name] = d
- }
- }
- // Added containers
- for _, c := range allContainers {
- delete(allContainersSet, c.Name)
- _, ok := m.containers[namespacedContainerName{
- Name: c.Name,
- }]
- if !ok {
- added = append(added, c)
- }
- }
- // Removed ones are no longer in the container listing.
- for _, d := range allContainersSet {
- removed = append(removed, d.info.ContainerReference)
- }
- return
- }
- // Detect the existing subcontainers and reflect the setup here.
- func (m *manager) detectSubcontainers(containerName string) error {
- added, removed, err := m.getContainersDiff(containerName)
- if err != nil {
- return err
- }
- // Add the new containers.
- for _, cont := range added {
- err = m.createContainer(cont.Name, watcher.Raw)
- if err != nil {
- klog.Errorf("Failed to create existing container: %s: %s", cont.Name, err)
- }
- }
- // Remove the old containers.
- for _, cont := range removed {
- err = m.destroyContainer(cont.Name)
- if err != nil {
- klog.Errorf("Failed to destroy existing container: %s: %s", cont.Name, err)
- }
- }
- return nil
- }
- // Watches for new containers started in the system. Runs forever unless there is a setup error.
- func (self *manager) watchForNewContainers(quit chan error) error {
- for _, watcher := range self.containerWatchers {
- err := watcher.Start(self.eventsChannel)
- if err != nil {
- return err
- }
- }
- // There is a race between starting the watch and new container creation so we do a detection before we read new containers.
- err := self.detectSubcontainers("/")
- if err != nil {
- return err
- }
- // Listen to events from the container handler.
- go func() {
- for {
- select {
- case event := <-self.eventsChannel:
- switch {
- case event.EventType == watcher.ContainerAdd:
- switch event.WatchSource {
- // the Rkt and Raw watchers can race, and if Raw wins, we want Rkt to override and create a new handler for Rkt containers
- case watcher.Rkt:
- err = self.overrideContainer(event.Name, event.WatchSource)
- default:
- err = self.createContainer(event.Name, event.WatchSource)
- }
- case event.EventType == watcher.ContainerDelete:
- err = self.destroyContainer(event.Name)
- }
- if err != nil {
- klog.Warningf("Failed to process watch event %+v: %v", event, err)
- }
- case <-quit:
- var errs partialFailure
- // Stop processing events if asked to quit.
- for i, watcher := range self.containerWatchers {
- err := watcher.Stop()
- if err != nil {
- errs.append(fmt.Sprintf("watcher %d", i), "Stop", err)
- }
- }
- if len(errs) > 0 {
- quit <- errs
- } else {
- quit <- nil
- klog.Infof("Exiting thread watching subcontainers")
- return
- }
- }
- }
- }()
- return nil
- }
- func (self *manager) watchForNewOoms() error {
- klog.V(2).Infof("Started watching for new ooms in manager")
- outStream := make(chan *oomparser.OomInstance, 10)
- oomLog, err := oomparser.New()
- if err != nil {
- return err
- }
- go oomLog.StreamOoms(outStream)
- go func() {
- for oomInstance := range outStream {
- // Surface OOM and OOM kill events.
- newEvent := &info.Event{
- ContainerName: oomInstance.ContainerName,
- Timestamp: oomInstance.TimeOfDeath,
- EventType: info.EventOom,
- }
- err := self.eventHandler.AddEvent(newEvent)
- if err != nil {
- klog.Errorf("failed to add OOM event for %q: %v", oomInstance.ContainerName, err)
- }
- klog.V(3).Infof("Created an OOM event in container %q at %v", oomInstance.ContainerName, oomInstance.TimeOfDeath)
- newEvent = &info.Event{
- ContainerName: oomInstance.VictimContainerName,
- Timestamp: oomInstance.TimeOfDeath,
- EventType: info.EventOomKill,
- EventData: info.EventData{
- OomKill: &info.OomKillEventData{
- Pid: oomInstance.Pid,
- ProcessName: oomInstance.ProcessName,
- },
- },
- }
- err = self.eventHandler.AddEvent(newEvent)
- if err != nil {
- klog.Errorf("failed to add OOM kill event for %q: %v", oomInstance.ContainerName, err)
- }
- }
- }()
- return nil
- }
- // can be called by the api which will take events returned on the channel
- func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) {
- return self.eventHandler.WatchEvents(request)
- }
- // can be called by the api which will return all events satisfying the request
- func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, error) {
- return self.eventHandler.GetEvents(request)
- }
- // called by the api when a client is no longer listening to the channel
- func (self *manager) CloseEventChannel(watch_id int) {
- self.eventHandler.StopWatch(watch_id)
- }
- // Parses the events StoragePolicy from the flags.
- func parseEventsStoragePolicy() events.StoragePolicy {
- policy := events.DefaultStoragePolicy()
- // Parse max age.
- parts := strings.Split(*eventStorageAgeLimit, ",")
- for _, part := range parts {
- items := strings.Split(part, "=")
- if len(items) != 2 {
- klog.Warningf("Unknown event storage policy %q when parsing max age", part)
- continue
- }
- dur, err := time.ParseDuration(items[1])
- if err != nil {
- klog.Warningf("Unable to parse event max age duration %q: %v", items[1], err)
- continue
- }
- if items[0] == "default" {
- policy.DefaultMaxAge = dur
- continue
- }
- policy.PerTypeMaxAge[info.EventType(items[0])] = dur
- }
- // Parse max number.
- parts = strings.Split(*eventStorageEventLimit, ",")
- for _, part := range parts {
- items := strings.Split(part, "=")
- if len(items) != 2 {
- klog.Warningf("Unknown event storage policy %q when parsing max event limit", part)
- continue
- }
- val, err := strconv.Atoi(items[1])
- if err != nil {
- klog.Warningf("Unable to parse integer from %q: %v", items[1], err)
- continue
- }
- if items[0] == "default" {
- policy.DefaultMaxNumEvents = val
- continue
- }
- policy.PerTypeMaxNumEvents[info.EventType(items[0])] = val
- }
- return policy
- }
- func (m *manager) DockerImages() ([]info.DockerImage, error) {
- return docker.Images()
- }
- func (m *manager) DockerInfo() (info.DockerStatus, error) {
- return docker.Status()
- }
- func (m *manager) DebugInfo() map[string][]string {
- debugInfo := container.DebugInfo()
- // Get unique containers.
- var conts map[*containerData]struct{}
- func() {
- m.containersLock.RLock()
- defer m.containersLock.RUnlock()
- conts = make(map[*containerData]struct{}, len(m.containers))
- for _, c := range m.containers {
- conts[c] = struct{}{}
- }
- }()
- // List containers.
- lines := make([]string, 0, len(conts))
- for cont := range conts {
- lines = append(lines, cont.info.Name)
- if cont.info.Namespace != "" {
- lines = append(lines, fmt.Sprintf("\tNamespace: %s", cont.info.Namespace))
- }
- if len(cont.info.Aliases) != 0 {
- lines = append(lines, "\tAliases:")
- for _, alias := range cont.info.Aliases {
- lines = append(lines, fmt.Sprintf("\t\t%s", alias))
- }
- }
- }
- debugInfo["Managed containers"] = lines
- return debugInfo
- }
- func (self *manager) getFsInfoByDeviceName(deviceName string) (v2.FsInfo, error) {
- mountPoint, err := self.fsInfo.GetMountpointForDevice(deviceName)
- if err != nil {
- return v2.FsInfo{}, fmt.Errorf("failed to get mount point for device %q: %v", deviceName, err)
- }
- infos, err := self.GetFsInfo("")
- if err != nil {
- return v2.FsInfo{}, err
- }
- for _, info := range infos {
- if info.Mountpoint == mountPoint {
- return info, nil
- }
- }
- return v2.FsInfo{}, fmt.Errorf("cannot find filesystem info for device %q", deviceName)
- }
- func getVersionInfo() (*info.VersionInfo, error) {
- kernel_version := machine.KernelVersion()
- container_os := machine.ContainerOsVersion()
- docker_version, err := docker.VersionString()
- if err != nil {
- return nil, err
- }
- docker_api_version, err := docker.APIVersionString()
- if err != nil {
- return nil, err
- }
- return &info.VersionInfo{
- KernelVersion: kernel_version,
- ContainerOsVersion: container_os,
- DockerVersion: docker_version,
- DockerAPIVersion: docker_api_version,
- CadvisorVersion: version.Info["version"],
- CadvisorRevision: version.Info["revision"],
- }, nil
- }
- // Helper for accumulating partial failures.
- type partialFailure []string
- func (f *partialFailure) append(id, operation string, err error) {
- *f = append(*f, fmt.Sprintf("[%q: %s: %s]", id, operation, err))
- }
- func (f partialFailure) Error() string {
- return fmt.Sprintf("partial failures: %s", strings.Join(f, ", "))
- }
- func (f partialFailure) OrNil() error {
- if len(f) == 0 {
- return nil
- }
- return f
- }
|