1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- /*
- Copyright (c) 2014 VMware, Inc. All Rights Reserved.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package progress
- import "sync"
- type Aggregator struct {
- downstream Sinker
- upstream chan (<-chan Report)
- done chan struct{}
- w sync.WaitGroup
- }
- func NewAggregator(s Sinker) *Aggregator {
- a := &Aggregator{
- downstream: s,
- upstream: make(chan (<-chan Report)),
- done: make(chan struct{}),
- }
- a.w.Add(1)
- go a.loop()
- return a
- }
- func (a *Aggregator) loop() {
- defer a.w.Done()
- dch := a.downstream.Sink()
- defer close(dch)
- for {
- select {
- case uch := <-a.upstream:
- // Drain upstream channel
- for e := range uch {
- dch <- e
- }
- case <-a.done:
- return
- }
- }
- }
- func (a *Aggregator) Sink() chan<- Report {
- ch := make(chan Report)
- a.upstream <- ch
- return ch
- }
- // Done marks the aggregator as done. No more calls to Sink() may be made and
- // the downstream progress report channel will be closed when Done() returns.
- func (a *Aggregator) Done() {
- close(a.done)
- a.w.Wait()
- }
|