diff --git a/chan_io.go b/chan_io.go index fb59584..3c9d41c 100644 --- a/chan_io.go +++ b/chan_io.go @@ -22,7 +22,7 @@ func WriteIntoDelayed[T any](ch chan<- T, delay time.Duration, values ...T) { // WriteIntoWriter reads all values from ch and writes them via fmt.Fprintln to all writers func WriteIntoWriter[T any](ch <-chan T, writers ...io.Writer) { w := io.MultiWriter(writers...) - Each(ch, func(value T) { + EachSuccessive(ch, func(value T) { fmt.Fprintln(w, value) }) } diff --git a/each.go b/each.go new file mode 100644 index 0000000..8c7503f --- /dev/null +++ b/each.go @@ -0,0 +1,34 @@ +package channel + +import "sync" + +// Each consumes all values and calls f for each of them. +// It blocks until source is closed +func Each[T any](source <-chan T, f func(T)) { + EachWithRunner(source, getDefaultRunner(), f) +} + +// Each consumes all values and calls f for each of them. +// It blocks until source is closed +func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) { + wg := &sync.WaitGroup{} + + for value := range source { + value := value + wg.Add(1) + runner.Run(func() { + defer wg.Done() + f(value) + }) + } + + wg.Wait() +} + +// EachSuccessive consumes all values and calls f for each of them. +// It blocks until source is closed +func EachSuccessive[T any](source <-chan T, f func(T)) { + for value := range source { + f(value) + } +} diff --git a/find.go b/find.go index a840bbf..e516949 100644 --- a/find.go +++ b/find.go @@ -17,23 +17,6 @@ func FindFirstAndCancel[T any](source <-chan T, cancel context.CancelFunc) *T { return nil } -func FindFirstAndFlush[T any](source <-chan T) *T { - for v := range source { - go Flush(source) - return &v - } - return nil -} - -func FindFirstAndCancelFlush[T any](source <-chan T, cancel context.CancelFunc) *T { - for v := range source { - cancel() - go Flush(source) - return &v - } - return nil -} - func FindLast[T any](source <-chan T) *T { var last *T = new(T) found := false @@ -53,7 +36,3 @@ func FindLast[T any](source <-chan T) *T { func HasAny[T any](source <-chan T) bool { return FindFirst(source) != nil } - -func HasAnyAndFlush[T any](source <-chan T) bool { - return FindFirstAndFlush(source) != nil -} diff --git a/tee.go b/tee.go new file mode 100644 index 0000000..60c5cd0 --- /dev/null +++ b/tee.go @@ -0,0 +1,37 @@ +package channel + +// Tee returns 2 channels which both receive all values from source. +// It's basically a copy function for channels +func Tee[T any](source <-chan T) (<-chan T, <-chan T) { + outs := TeeMany(source, 2) + return outs[0], outs[1] +} + +// TeeMany returns a given amount of channels which all receive all values from source. +// It's basically a copy function for channels +func TeeMany[T any](source <-chan T, amount int) []<-chan T { + outputs := make([]chan T, amount) + for i := range outputs { + outputs[i] = make(chan T, cap(source)) + } + + go func() { + defer func() { + for _, out := range outputs { + close(out) + } + }() + + for value := range source { + for _, out := range outputs { + out <- value + } + } + }() + + readOnlyOutputs := make([]<-chan T, 0, amount) + for _, out := range outputs { + readOnlyOutputs = append(readOnlyOutputs, out) + } + return readOnlyOutputs +} diff --git a/to.go b/to.go index a8a7d80..43ad408 100644 --- a/to.go +++ b/to.go @@ -5,7 +5,7 @@ import "container/list" // ToSlice returns a slice containing all values read from ch func ToSlice[T any](ch <-chan T) []T { s := make([]T, 0, cap(ch)) - Each(ch, func(value T) { s = append(s, value) }) + EachSuccessive(ch, func(value T) { s = append(s, value) }) return s } @@ -14,7 +14,7 @@ func ToSlice[T any](ch <-chan T) []T { // Nil pointers are ignored. func ToSliceDeref[T any](ch <-chan *T) []T { s := make([]T, 0, cap(ch)) - Each(ch, func(value *T) { + EachSuccessive(ch, func(value *T) { if value != nil { s = append(s, *value) } @@ -25,7 +25,7 @@ func ToSliceDeref[T any](ch <-chan *T) []T { // ToList returns a list.List containing all values read from ch func ToList[T any](ch <-chan T) *list.List { l := list.New() - Each(ch, func(value T) { l.PushBack(value) }) + EachSuccessive(ch, func(value T) { l.PushBack(value) }) return l } @@ -54,7 +54,7 @@ func ToMapWithRunner[T any, K comparable, V any](ch <-chan T, runner Runner, f f // The map key-value pairs are determined by f func ToMapSuccessive[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { m := map[K]V{} - Each(ch, func(value T) { + EachSuccessive(ch, func(value T) { k, v := f(value) m[k] = v }) diff --git a/utils.go b/utils.go index 8dda8cd..1bd3227 100644 --- a/utils.go +++ b/utils.go @@ -13,54 +13,3 @@ func determineBufferSize[T any](channels []<-chan T) int { } return maxBufSize } - -// Flush consumes all values and discards them immediately. -// It blocks until all sources are closed -func Flush[T any](sources ...<-chan T) { - for range Merge(sources...) { - } -} - -// Each consumes all values and calls f for each of them. -// It blocks until source is closed -func Each[T any](source <-chan T, f func(T)) { - for value := range source { - f(value) - } -} - -// Tee returns 2 channels which both receive all values from source. -// It's basically a copy function for channels -func Tee[T any](source <-chan T) (<-chan T, <-chan T) { - outs := TeeMany(source, 2) - return outs[0], outs[1] -} - -// TeeMany returns a given amount of channels which all receive all values from source. -// It's basically a copy function for channels -func TeeMany[T any](source <-chan T, amount int) []<-chan T { - outputs := make([]chan T, amount) - for i := range outputs { - outputs[i] = make(chan T, cap(source)) - } - - go func() { - defer func() { - for _, out := range outputs { - close(out) - } - }() - - for value := range source { - for _, out := range outputs { - out <- value - } - } - }() - - readOnlyOutputs := make([]<-chan T, 0, amount) - for _, out := range outputs { - readOnlyOutputs = append(readOnlyOutputs, out) - } - return readOnlyOutputs -}