utils.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package stackdriver
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "fmt"
  19. "sync"
  20. "time"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. "k8s.io/kubernetes/test/e2e/framework"
  23. "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
  24. "golang.org/x/oauth2/google"
  25. sd "google.golang.org/api/logging/v2beta1"
  26. "google.golang.org/api/option"
  27. pubsub "google.golang.org/api/pubsub/v1"
  28. )
  29. const (
  30. // The amount of time to wait for Stackdriver Logging
  31. // sink to become operational
  32. sinkStartupTimeout = 10 * time.Minute
  33. // The limit on the number of messages to pull from PubSub
  34. maxPullLogMessages = 100 * 1000
  35. // maxQueueSize is the limit on the number of messages in the single queue.
  36. maxQueueSize = 10 * 1000
  37. // PubSub topic with log entries polling interval
  38. sdLoggingPollInterval = 100 * time.Millisecond
  39. // The parallelism level of polling logs process.
  40. sdLoggingPollParallelism = 10
  41. // The limit on the number of stackdriver sinks that can be created within one project.
  42. stackdriverSinkCountLimit = 90
  43. )
  44. type logProviderScope int
  45. const (
  46. podsScope logProviderScope = iota
  47. eventsScope
  48. systemScope
  49. )
  50. var _ utils.LogProvider = &sdLogProvider{}
  51. type sdLogProvider struct {
  52. sdService *sd.Service
  53. pubsubService *pubsub.Service
  54. framework *framework.Framework
  55. topic *pubsub.Topic
  56. subscription *pubsub.Subscription
  57. logSink *sd.LogSink
  58. pollingStopChannel chan struct{}
  59. pollingWG *sync.WaitGroup
  60. queueCollection utils.LogsQueueCollection
  61. scope logProviderScope
  62. }
  63. func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogProvider, error) {
  64. ctx := context.Background()
  65. hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope)
  66. framework.ExpectNoError(err)
  67. sdService, err := sd.NewService(ctx, option.WithHTTPClient(hc))
  68. if err != nil {
  69. return nil, err
  70. }
  71. err = ensureProjectHasSinkCapacity(sdService.Projects.Sinks, framework.TestContext.CloudConfig.ProjectID)
  72. if err != nil {
  73. return nil, err
  74. }
  75. pubsubService, err := pubsub.NewService(ctx, option.WithHTTPClient(hc))
  76. if err != nil {
  77. return nil, err
  78. }
  79. provider := &sdLogProvider{
  80. scope: scope,
  81. sdService: sdService,
  82. pubsubService: pubsubService,
  83. framework: f,
  84. pollingStopChannel: make(chan struct{}),
  85. pollingWG: &sync.WaitGroup{},
  86. queueCollection: utils.NewLogsQueueCollection(maxQueueSize),
  87. }
  88. return provider, nil
  89. }
  90. func ensureProjectHasSinkCapacity(sinksService *sd.ProjectsSinksService, projectID string) error {
  91. listResponse, err := listSinks(sinksService, projectID)
  92. if err != nil {
  93. return err
  94. }
  95. if len(listResponse.Sinks) >= stackdriverSinkCountLimit {
  96. framework.Logf("Reached Stackdriver sink limit. Deleting all sinks")
  97. deleteSinks(sinksService, projectID, listResponse.Sinks)
  98. }
  99. return nil
  100. }
  101. func listSinks(sinksService *sd.ProjectsSinksService, projectID string) (*sd.ListSinksResponse, error) {
  102. projectDst := fmt.Sprintf("projects/%s", projectID)
  103. listResponse, err := sinksService.List(projectDst).PageSize(stackdriverSinkCountLimit).Do()
  104. if err != nil {
  105. return nil, fmt.Errorf("failed to list Stackdriver Logging sinks: %v", err)
  106. }
  107. return listResponse, nil
  108. }
  109. func deleteSinks(sinksService *sd.ProjectsSinksService, projectID string, sinks []*sd.LogSink) {
  110. for _, sink := range sinks {
  111. sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, sink.Name)
  112. if _, err := sinksService.Delete(sinkNameID).Do(); err != nil {
  113. framework.Logf("Failed to delete LogSink: %v", err)
  114. }
  115. }
  116. }
  117. func (p *sdLogProvider) Init() error {
  118. projectID := framework.TestContext.CloudConfig.ProjectID
  119. nsName := p.framework.Namespace.Name
  120. topic, err := p.createPubSubTopic(projectID, nsName)
  121. if err != nil {
  122. return fmt.Errorf("failed to create PubSub topic: %v", err)
  123. }
  124. p.topic = topic
  125. subs, err := p.createPubSubSubscription(projectID, nsName, topic.Name)
  126. if err != nil {
  127. return fmt.Errorf("failed to create PubSub subscription: %v", err)
  128. }
  129. p.subscription = subs
  130. logSink, err := p.createSink(projectID, nsName, topic.Name)
  131. if err != nil {
  132. return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
  133. }
  134. p.logSink = logSink
  135. if err = p.authorizeSink(); err != nil {
  136. return fmt.Errorf("failed to authorize log sink: %v", err)
  137. }
  138. if err = p.waitSinkInit(); err != nil {
  139. return fmt.Errorf("failed to wait for sink to become operational: %v", err)
  140. }
  141. p.startPollingLogs()
  142. return nil
  143. }
  144. func (p *sdLogProvider) Cleanup() {
  145. close(p.pollingStopChannel)
  146. p.pollingWG.Wait()
  147. if p.logSink != nil {
  148. projectID := framework.TestContext.CloudConfig.ProjectID
  149. sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, p.logSink.Name)
  150. sinksService := p.sdService.Projects.Sinks
  151. if _, err := sinksService.Delete(sinkNameID).Do(); err != nil {
  152. framework.Logf("Failed to delete LogSink: %v", err)
  153. }
  154. }
  155. if p.subscription != nil {
  156. subsService := p.pubsubService.Projects.Subscriptions
  157. if _, err := subsService.Delete(p.subscription.Name).Do(); err != nil {
  158. framework.Logf("Failed to delete PubSub subscription: %v", err)
  159. }
  160. }
  161. if p.topic != nil {
  162. topicsService := p.pubsubService.Projects.Topics
  163. if _, err := topicsService.Delete(p.topic.Name).Do(); err != nil {
  164. framework.Logf("Failed to delete PubSub topic: %v", err)
  165. }
  166. }
  167. }
  168. func (p *sdLogProvider) ReadEntries(name string) []utils.LogEntry {
  169. return p.queueCollection.Pop(name)
  170. }
  171. func (p *sdLogProvider) LoggingAgentName() string {
  172. return "fluentd-gcp"
  173. }
  174. func (p *sdLogProvider) createPubSubTopic(projectID, topicName string) (*pubsub.Topic, error) {
  175. topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName)
  176. topic := &pubsub.Topic{
  177. Name: topicFullName,
  178. }
  179. return p.pubsubService.Projects.Topics.Create(topicFullName, topic).Do()
  180. }
  181. func (p *sdLogProvider) createPubSubSubscription(projectID, subsName, topicName string) (*pubsub.Subscription, error) {
  182. subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subsName)
  183. subs := &pubsub.Subscription{
  184. Name: subsFullName,
  185. Topic: topicName,
  186. }
  187. return p.pubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
  188. }
  189. func (p *sdLogProvider) createSink(projectID, sinkName, topicName string) (*sd.LogSink, error) {
  190. filter, err := p.buildFilter()
  191. if err != nil {
  192. return nil, err
  193. }
  194. framework.Logf("Using the following filter for log entries: %s", filter)
  195. sink := &sd.LogSink{
  196. Name: sinkName,
  197. Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
  198. Filter: filter,
  199. }
  200. projectDst := fmt.Sprintf("projects/%s", projectID)
  201. return p.sdService.Projects.Sinks.Create(projectDst, sink).Do()
  202. }
  203. func (p *sdLogProvider) buildFilter() (string, error) {
  204. switch p.scope {
  205. case podsScope:
  206. return fmt.Sprintf("resource.type=\"container\" AND resource.labels.namespace_id=\"%s\"",
  207. p.framework.Namespace.Name), nil
  208. case eventsScope:
  209. return fmt.Sprintf("resource.type=\"gke_cluster\" AND jsonPayload.metadata.namespace=\"%s\"",
  210. p.framework.Namespace.Name), nil
  211. case systemScope:
  212. // TODO(instrumentation): Filter logs from the current project only.
  213. return "resource.type=\"gce_instance\"", nil
  214. }
  215. return "", fmt.Errorf("Unknown log provider scope: %v", p.scope)
  216. }
  217. func (p *sdLogProvider) authorizeSink() error {
  218. topicsService := p.pubsubService.Projects.Topics
  219. policy, err := topicsService.GetIamPolicy(p.topic.Name).Do()
  220. if err != nil {
  221. return err
  222. }
  223. binding := &pubsub.Binding{
  224. Role: "roles/pubsub.publisher",
  225. Members: []string{p.logSink.WriterIdentity},
  226. }
  227. policy.Bindings = append(policy.Bindings, binding)
  228. req := &pubsub.SetIamPolicyRequest{Policy: policy}
  229. if _, err = topicsService.SetIamPolicy(p.topic.Name, req).Do(); err != nil {
  230. return err
  231. }
  232. return nil
  233. }
  234. func (p *sdLogProvider) waitSinkInit() error {
  235. framework.Logf("Waiting for log sink to become operational")
  236. return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) {
  237. err := publish(p.pubsubService, p.topic, "embrace eternity")
  238. if err != nil {
  239. framework.Logf("Failed to push message to PubSub due to %v", err)
  240. }
  241. messages, err := pullAndAck(p.pubsubService, p.subscription)
  242. if err != nil {
  243. framework.Logf("Failed to pull messages from PubSub due to %v", err)
  244. return false, nil
  245. }
  246. if len(messages) > 0 {
  247. framework.Logf("Sink %s is operational", p.logSink.Name)
  248. return true, nil
  249. }
  250. return false, nil
  251. })
  252. }
  253. func (p *sdLogProvider) startPollingLogs() {
  254. for i := 0; i < sdLoggingPollParallelism; i++ {
  255. p.pollingWG.Add(1)
  256. go func() {
  257. defer p.pollingWG.Done()
  258. wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
  259. p.pollLogsOnce()
  260. return false, nil
  261. }, p.pollingStopChannel)
  262. }()
  263. }
  264. }
  265. func (p *sdLogProvider) pollLogsOnce() {
  266. messages, err := pullAndAck(p.pubsubService, p.subscription)
  267. if err != nil {
  268. framework.Logf("Failed to pull messages from PubSub due to %v", err)
  269. return
  270. }
  271. for _, msg := range messages {
  272. logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
  273. if err != nil {
  274. framework.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
  275. continue
  276. }
  277. var sdLogEntry sd.LogEntry
  278. if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
  279. framework.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
  280. continue
  281. }
  282. name, ok := p.tryGetName(sdLogEntry)
  283. if !ok {
  284. framework.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
  285. continue
  286. }
  287. logEntry, err := convertLogEntry(sdLogEntry)
  288. if err != nil {
  289. framework.Logf("Failed to parse Stackdriver LogEntry: %v", err)
  290. continue
  291. }
  292. p.queueCollection.Push(name, logEntry)
  293. }
  294. }
  295. func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) {
  296. switch sdLogEntry.Resource.Type {
  297. case "container":
  298. return sdLogEntry.Resource.Labels["pod_id"], true
  299. case "gke_cluster":
  300. return "", true
  301. case "gce_instance":
  302. return sdLogEntry.Resource.Labels["instance_id"], true
  303. }
  304. return "", false
  305. }
  306. func convertLogEntry(sdLogEntry sd.LogEntry) (entry utils.LogEntry, err error) {
  307. entry = utils.LogEntry{LogName: sdLogEntry.LogName}
  308. entry.Location = sdLogEntry.Resource.Labels["location"]
  309. if sdLogEntry.TextPayload != "" {
  310. entry.TextPayload = sdLogEntry.TextPayload
  311. return
  312. }
  313. bytes, err := sdLogEntry.JsonPayload.MarshalJSON()
  314. if err != nil {
  315. err = fmt.Errorf("Failed to get jsonPayload from LogEntry %v", sdLogEntry)
  316. return
  317. }
  318. var jsonObject map[string]interface{}
  319. err = json.Unmarshal(bytes, &jsonObject)
  320. if err != nil {
  321. err = fmt.Errorf("Failed to deserialize jsonPayload as json object %s", string(bytes[:]))
  322. return
  323. }
  324. entry.JSONPayload = jsonObject
  325. return
  326. }
  327. func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) {
  328. subsService := service.Projects.Subscriptions
  329. req := &pubsub.PullRequest{
  330. ReturnImmediately: true,
  331. MaxMessages: maxPullLogMessages,
  332. }
  333. resp, err := subsService.Pull(subs.Name, req).Do()
  334. if err != nil {
  335. return nil, err
  336. }
  337. var ids []string
  338. for _, msg := range resp.ReceivedMessages {
  339. ids = append(ids, msg.AckId)
  340. }
  341. if len(ids) > 0 {
  342. ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
  343. if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil {
  344. framework.Logf("Failed to ack poll: %v", err)
  345. }
  346. }
  347. return resp.ReceivedMessages, nil
  348. }
  349. func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error {
  350. topicsService := service.Projects.Topics
  351. req := &pubsub.PublishRequest{
  352. Messages: []*pubsub.PubsubMessage{
  353. {
  354. Data: base64.StdEncoding.EncodeToString([]byte(msg)),
  355. },
  356. },
  357. }
  358. _, err := topicsService.Publish(topic.Name, req).Do()
  359. return err
  360. }
  361. func withLogProviderForScope(f *framework.Framework, scope logProviderScope, fun func(*sdLogProvider)) {
  362. p, err := newSdLogProvider(f, scope)
  363. framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
  364. err = p.Init()
  365. defer p.Cleanup()
  366. framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
  367. err = utils.EnsureLoggingAgentDeployment(f, p.LoggingAgentName())
  368. framework.ExpectNoError(err, "Logging agents deployed incorrectly")
  369. fun(p)
  370. }