utils.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package stackdriver
  14. import (
  15. "context"
  16. "encoding/base64"
  17. "encoding/json"
  18. "fmt"
  19. "sync"
  20. "time"
  21. "k8s.io/apimachinery/pkg/util/wait"
  22. "k8s.io/kubernetes/test/e2e/framework"
  23. e2elog "k8s.io/kubernetes/test/e2e/framework/log"
  24. "k8s.io/kubernetes/test/e2e/instrumentation/logging/utils"
  25. "golang.org/x/oauth2/google"
  26. sd "google.golang.org/api/logging/v2beta1"
  27. pubsub "google.golang.org/api/pubsub/v1"
  28. )
  29. const (
  30. // The amount of time to wait for Stackdriver Logging
  31. // sink to become operational
  32. sinkStartupTimeout = 10 * time.Minute
  33. // The limit on the number of messages to pull from PubSub
  34. maxPullLogMessages = 100 * 1000
  35. // maxQueueSize is the limit on the number of messages in the single queue.
  36. maxQueueSize = 10 * 1000
  37. // PubSub topic with log entries polling interval
  38. sdLoggingPollInterval = 100 * time.Millisecond
  39. // The parallelism level of polling logs process.
  40. sdLoggingPollParallelism = 10
  41. // The limit on the number of stackdriver sinks that can be created within one project.
  42. stackdriverSinkCountLimit = 90
  43. )
  44. type logProviderScope int
  45. const (
  46. podsScope logProviderScope = iota
  47. eventsScope
  48. systemScope
  49. )
  50. var _ utils.LogProvider = &sdLogProvider{}
  51. type sdLogProvider struct {
  52. sdService *sd.Service
  53. pubsubService *pubsub.Service
  54. framework *framework.Framework
  55. topic *pubsub.Topic
  56. subscription *pubsub.Subscription
  57. logSink *sd.LogSink
  58. pollingStopChannel chan struct{}
  59. pollingWG *sync.WaitGroup
  60. queueCollection utils.LogsQueueCollection
  61. scope logProviderScope
  62. }
  63. func newSdLogProvider(f *framework.Framework, scope logProviderScope) (*sdLogProvider, error) {
  64. ctx := context.Background()
  65. hc, err := google.DefaultClient(ctx, sd.CloudPlatformScope)
  66. sdService, err := sd.New(hc)
  67. if err != nil {
  68. return nil, err
  69. }
  70. err = ensureProjectHasSinkCapacity(sdService.Projects.Sinks, framework.TestContext.CloudConfig.ProjectID)
  71. if err != nil {
  72. return nil, err
  73. }
  74. pubsubService, err := pubsub.New(hc)
  75. if err != nil {
  76. return nil, err
  77. }
  78. provider := &sdLogProvider{
  79. scope: scope,
  80. sdService: sdService,
  81. pubsubService: pubsubService,
  82. framework: f,
  83. pollingStopChannel: make(chan struct{}),
  84. pollingWG: &sync.WaitGroup{},
  85. queueCollection: utils.NewLogsQueueCollection(maxQueueSize),
  86. }
  87. return provider, nil
  88. }
  89. func ensureProjectHasSinkCapacity(sinksService *sd.ProjectsSinksService, projectID string) error {
  90. listResponse, err := listSinks(sinksService, projectID)
  91. if err != nil {
  92. return err
  93. }
  94. if len(listResponse.Sinks) >= stackdriverSinkCountLimit {
  95. e2elog.Logf("Reached Stackdriver sink limit. Deleting all sinks")
  96. deleteSinks(sinksService, projectID, listResponse.Sinks)
  97. }
  98. return nil
  99. }
  100. func listSinks(sinksService *sd.ProjectsSinksService, projectID string) (*sd.ListSinksResponse, error) {
  101. projectDst := fmt.Sprintf("projects/%s", projectID)
  102. listResponse, err := sinksService.List(projectDst).PageSize(stackdriverSinkCountLimit).Do()
  103. if err != nil {
  104. return nil, fmt.Errorf("failed to list Stackdriver Logging sinks: %v", err)
  105. }
  106. return listResponse, nil
  107. }
  108. func deleteSinks(sinksService *sd.ProjectsSinksService, projectID string, sinks []*sd.LogSink) {
  109. for _, sink := range sinks {
  110. sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, sink.Name)
  111. if _, err := sinksService.Delete(sinkNameID).Do(); err != nil {
  112. e2elog.Logf("Failed to delete LogSink: %v", err)
  113. }
  114. }
  115. }
  116. func (p *sdLogProvider) Init() error {
  117. projectID := framework.TestContext.CloudConfig.ProjectID
  118. nsName := p.framework.Namespace.Name
  119. topic, err := p.createPubSubTopic(projectID, nsName)
  120. if err != nil {
  121. return fmt.Errorf("failed to create PubSub topic: %v", err)
  122. }
  123. p.topic = topic
  124. subs, err := p.createPubSubSubscription(projectID, nsName, topic.Name)
  125. if err != nil {
  126. return fmt.Errorf("failed to create PubSub subscription: %v", err)
  127. }
  128. p.subscription = subs
  129. logSink, err := p.createSink(projectID, nsName, topic.Name)
  130. if err != nil {
  131. return fmt.Errorf("failed to create Stackdriver Logging sink: %v", err)
  132. }
  133. p.logSink = logSink
  134. if err = p.authorizeSink(); err != nil {
  135. return fmt.Errorf("failed to authorize log sink: %v", err)
  136. }
  137. if err = p.waitSinkInit(); err != nil {
  138. return fmt.Errorf("failed to wait for sink to become operational: %v", err)
  139. }
  140. p.startPollingLogs()
  141. return nil
  142. }
  143. func (p *sdLogProvider) Cleanup() {
  144. close(p.pollingStopChannel)
  145. p.pollingWG.Wait()
  146. if p.logSink != nil {
  147. projectID := framework.TestContext.CloudConfig.ProjectID
  148. sinkNameID := fmt.Sprintf("projects/%s/sinks/%s", projectID, p.logSink.Name)
  149. sinksService := p.sdService.Projects.Sinks
  150. if _, err := sinksService.Delete(sinkNameID).Do(); err != nil {
  151. e2elog.Logf("Failed to delete LogSink: %v", err)
  152. }
  153. }
  154. if p.subscription != nil {
  155. subsService := p.pubsubService.Projects.Subscriptions
  156. if _, err := subsService.Delete(p.subscription.Name).Do(); err != nil {
  157. e2elog.Logf("Failed to delete PubSub subscription: %v", err)
  158. }
  159. }
  160. if p.topic != nil {
  161. topicsService := p.pubsubService.Projects.Topics
  162. if _, err := topicsService.Delete(p.topic.Name).Do(); err != nil {
  163. e2elog.Logf("Failed to delete PubSub topic: %v", err)
  164. }
  165. }
  166. }
  167. func (p *sdLogProvider) ReadEntries(name string) []utils.LogEntry {
  168. return p.queueCollection.Pop(name)
  169. }
  170. func (p *sdLogProvider) LoggingAgentName() string {
  171. return "fluentd-gcp"
  172. }
  173. func (p *sdLogProvider) createPubSubTopic(projectID, topicName string) (*pubsub.Topic, error) {
  174. topicFullName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName)
  175. topic := &pubsub.Topic{
  176. Name: topicFullName,
  177. }
  178. return p.pubsubService.Projects.Topics.Create(topicFullName, topic).Do()
  179. }
  180. func (p *sdLogProvider) createPubSubSubscription(projectID, subsName, topicName string) (*pubsub.Subscription, error) {
  181. subsFullName := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subsName)
  182. subs := &pubsub.Subscription{
  183. Name: subsFullName,
  184. Topic: topicName,
  185. }
  186. return p.pubsubService.Projects.Subscriptions.Create(subsFullName, subs).Do()
  187. }
  188. func (p *sdLogProvider) createSink(projectID, sinkName, topicName string) (*sd.LogSink, error) {
  189. filter, err := p.buildFilter()
  190. if err != nil {
  191. return nil, err
  192. }
  193. e2elog.Logf("Using the following filter for log entries: %s", filter)
  194. sink := &sd.LogSink{
  195. Name: sinkName,
  196. Destination: fmt.Sprintf("pubsub.googleapis.com/%s", topicName),
  197. Filter: filter,
  198. }
  199. projectDst := fmt.Sprintf("projects/%s", projectID)
  200. return p.sdService.Projects.Sinks.Create(projectDst, sink).Do()
  201. }
  202. func (p *sdLogProvider) buildFilter() (string, error) {
  203. switch p.scope {
  204. case podsScope:
  205. return fmt.Sprintf("resource.type=\"container\" AND resource.labels.namespace_id=\"%s\"",
  206. p.framework.Namespace.Name), nil
  207. case eventsScope:
  208. return fmt.Sprintf("resource.type=\"gke_cluster\" AND jsonPayload.metadata.namespace=\"%s\"",
  209. p.framework.Namespace.Name), nil
  210. case systemScope:
  211. // TODO(instrumentation): Filter logs from the current project only.
  212. return "resource.type=\"gce_instance\"", nil
  213. }
  214. return "", fmt.Errorf("Unknown log provider scope: %v", p.scope)
  215. }
  216. func (p *sdLogProvider) authorizeSink() error {
  217. topicsService := p.pubsubService.Projects.Topics
  218. policy, err := topicsService.GetIamPolicy(p.topic.Name).Do()
  219. if err != nil {
  220. return err
  221. }
  222. binding := &pubsub.Binding{
  223. Role: "roles/pubsub.publisher",
  224. Members: []string{p.logSink.WriterIdentity},
  225. }
  226. policy.Bindings = append(policy.Bindings, binding)
  227. req := &pubsub.SetIamPolicyRequest{Policy: policy}
  228. if _, err = topicsService.SetIamPolicy(p.topic.Name, req).Do(); err != nil {
  229. return err
  230. }
  231. return nil
  232. }
  233. func (p *sdLogProvider) waitSinkInit() error {
  234. e2elog.Logf("Waiting for log sink to become operational")
  235. return wait.Poll(1*time.Second, sinkStartupTimeout, func() (bool, error) {
  236. err := publish(p.pubsubService, p.topic, "embrace eternity")
  237. if err != nil {
  238. e2elog.Logf("Failed to push message to PubSub due to %v", err)
  239. }
  240. messages, err := pullAndAck(p.pubsubService, p.subscription)
  241. if err != nil {
  242. e2elog.Logf("Failed to pull messages from PubSub due to %v", err)
  243. return false, nil
  244. }
  245. if len(messages) > 0 {
  246. e2elog.Logf("Sink %s is operational", p.logSink.Name)
  247. return true, nil
  248. }
  249. return false, nil
  250. })
  251. }
  252. func (p *sdLogProvider) startPollingLogs() {
  253. for i := 0; i < sdLoggingPollParallelism; i++ {
  254. p.pollingWG.Add(1)
  255. go func() {
  256. defer p.pollingWG.Done()
  257. wait.PollUntil(sdLoggingPollInterval, func() (bool, error) {
  258. p.pollLogsOnce()
  259. return false, nil
  260. }, p.pollingStopChannel)
  261. }()
  262. }
  263. }
  264. func (p *sdLogProvider) pollLogsOnce() {
  265. messages, err := pullAndAck(p.pubsubService, p.subscription)
  266. if err != nil {
  267. e2elog.Logf("Failed to pull messages from PubSub due to %v", err)
  268. return
  269. }
  270. for _, msg := range messages {
  271. logEntryEncoded, err := base64.StdEncoding.DecodeString(msg.Message.Data)
  272. if err != nil {
  273. e2elog.Logf("Got a message from pubsub that is not base64-encoded: %s", msg.Message.Data)
  274. continue
  275. }
  276. var sdLogEntry sd.LogEntry
  277. if err := json.Unmarshal(logEntryEncoded, &sdLogEntry); err != nil {
  278. e2elog.Logf("Failed to decode a pubsub message '%s': %v", logEntryEncoded, err)
  279. continue
  280. }
  281. name, ok := p.tryGetName(sdLogEntry)
  282. if !ok {
  283. e2elog.Logf("Received LogEntry with unexpected resource type: %s", sdLogEntry.Resource.Type)
  284. continue
  285. }
  286. logEntry, err := convertLogEntry(sdLogEntry)
  287. if err != nil {
  288. e2elog.Logf("Failed to parse Stackdriver LogEntry: %v", err)
  289. continue
  290. }
  291. p.queueCollection.Push(name, logEntry)
  292. }
  293. }
  294. func (p *sdLogProvider) tryGetName(sdLogEntry sd.LogEntry) (string, bool) {
  295. switch sdLogEntry.Resource.Type {
  296. case "container":
  297. return sdLogEntry.Resource.Labels["pod_id"], true
  298. case "gke_cluster":
  299. return "", true
  300. case "gce_instance":
  301. return sdLogEntry.Resource.Labels["instance_id"], true
  302. }
  303. return "", false
  304. }
  305. func convertLogEntry(sdLogEntry sd.LogEntry) (entry utils.LogEntry, err error) {
  306. entry = utils.LogEntry{LogName: sdLogEntry.LogName}
  307. entry.Location = sdLogEntry.Resource.Labels["location"]
  308. if sdLogEntry.TextPayload != "" {
  309. entry.TextPayload = sdLogEntry.TextPayload
  310. return
  311. }
  312. bytes, err := sdLogEntry.JsonPayload.MarshalJSON()
  313. if err != nil {
  314. err = fmt.Errorf("Failed to get jsonPayload from LogEntry %v", sdLogEntry)
  315. return
  316. }
  317. var jsonObject map[string]interface{}
  318. err = json.Unmarshal(bytes, &jsonObject)
  319. if err != nil {
  320. err = fmt.Errorf("Failed to deserialize jsonPayload as json object %s", string(bytes[:]))
  321. return
  322. }
  323. entry.JSONPayload = jsonObject
  324. return
  325. }
  326. func pullAndAck(service *pubsub.Service, subs *pubsub.Subscription) ([]*pubsub.ReceivedMessage, error) {
  327. subsService := service.Projects.Subscriptions
  328. req := &pubsub.PullRequest{
  329. ReturnImmediately: true,
  330. MaxMessages: maxPullLogMessages,
  331. }
  332. resp, err := subsService.Pull(subs.Name, req).Do()
  333. if err != nil {
  334. return nil, err
  335. }
  336. var ids []string
  337. for _, msg := range resp.ReceivedMessages {
  338. ids = append(ids, msg.AckId)
  339. }
  340. if len(ids) > 0 {
  341. ackReq := &pubsub.AcknowledgeRequest{AckIds: ids}
  342. if _, err = subsService.Acknowledge(subs.Name, ackReq).Do(); err != nil {
  343. e2elog.Logf("Failed to ack poll: %v", err)
  344. }
  345. }
  346. return resp.ReceivedMessages, nil
  347. }
  348. func publish(service *pubsub.Service, topic *pubsub.Topic, msg string) error {
  349. topicsService := service.Projects.Topics
  350. req := &pubsub.PublishRequest{
  351. Messages: []*pubsub.PubsubMessage{
  352. {
  353. Data: base64.StdEncoding.EncodeToString([]byte(msg)),
  354. },
  355. },
  356. }
  357. _, err := topicsService.Publish(topic.Name, req).Do()
  358. return err
  359. }
  360. func withLogProviderForScope(f *framework.Framework, scope logProviderScope, fun func(*sdLogProvider)) {
  361. p, err := newSdLogProvider(f, scope)
  362. framework.ExpectNoError(err, "Failed to create Stackdriver logs provider")
  363. err = p.Init()
  364. defer p.Cleanup()
  365. framework.ExpectNoError(err, "Failed to init Stackdriver logs provider")
  366. err = utils.EnsureLoggingAgentDeployment(f, p.LoggingAgentName())
  367. framework.ExpectNoError(err, "Logging agents deployed incorrectly")
  368. fun(p)
  369. }