fixed naming schemes for Each methods

This commit is contained in:
milarin 2023-03-04 11:24:42 +01:00
parent 169f978e14
commit f0a869b409
6 changed files with 76 additions and 77 deletions

View File

@ -22,7 +22,7 @@ func WriteIntoDelayed[T any](ch chan<- T, delay time.Duration, values ...T) {
// WriteIntoWriter reads all values from ch and writes them via fmt.Fprintln to all writers // WriteIntoWriter reads all values from ch and writes them via fmt.Fprintln to all writers
func WriteIntoWriter[T any](ch <-chan T, writers ...io.Writer) { func WriteIntoWriter[T any](ch <-chan T, writers ...io.Writer) {
w := io.MultiWriter(writers...) w := io.MultiWriter(writers...)
Each(ch, func(value T) { EachSuccessive(ch, func(value T) {
fmt.Fprintln(w, value) fmt.Fprintln(w, value)
}) })
} }

34
each.go Normal file
View File

@ -0,0 +1,34 @@
package channel
import "sync"
// Each consumes all values and calls f for each of them.
// It blocks until source is closed
func Each[T any](source <-chan T, f func(T)) {
EachWithRunner(source, getDefaultRunner(), f)
}
// Each consumes all values and calls f for each of them.
// It blocks until source is closed
func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) {
wg := &sync.WaitGroup{}
for value := range source {
value := value
wg.Add(1)
runner.Run(func() {
defer wg.Done()
f(value)
})
}
wg.Wait()
}
// EachSuccessive consumes all values and calls f for each of them.
// It blocks until source is closed
func EachSuccessive[T any](source <-chan T, f func(T)) {
for value := range source {
f(value)
}
}

21
find.go
View File

@ -17,23 +17,6 @@ func FindFirstAndCancel[T any](source <-chan T, cancel context.CancelFunc) *T {
return nil return nil
} }
func FindFirstAndFlush[T any](source <-chan T) *T {
for v := range source {
go Flush(source)
return &v
}
return nil
}
func FindFirstAndCancelFlush[T any](source <-chan T, cancel context.CancelFunc) *T {
for v := range source {
cancel()
go Flush(source)
return &v
}
return nil
}
func FindLast[T any](source <-chan T) *T { func FindLast[T any](source <-chan T) *T {
var last *T = new(T) var last *T = new(T)
found := false found := false
@ -53,7 +36,3 @@ func FindLast[T any](source <-chan T) *T {
func HasAny[T any](source <-chan T) bool { func HasAny[T any](source <-chan T) bool {
return FindFirst(source) != nil return FindFirst(source) != nil
} }
func HasAnyAndFlush[T any](source <-chan T) bool {
return FindFirstAndFlush(source) != nil
}

37
tee.go Normal file
View File

@ -0,0 +1,37 @@
package channel
// Tee returns 2 channels which both receive all values from source.
// It's basically a copy function for channels
func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
outs := TeeMany(source, 2)
return outs[0], outs[1]
}
// TeeMany returns a given amount of channels which all receive all values from source.
// It's basically a copy function for channels
func TeeMany[T any](source <-chan T, amount int) []<-chan T {
outputs := make([]chan T, amount)
for i := range outputs {
outputs[i] = make(chan T, cap(source))
}
go func() {
defer func() {
for _, out := range outputs {
close(out)
}
}()
for value := range source {
for _, out := range outputs {
out <- value
}
}
}()
readOnlyOutputs := make([]<-chan T, 0, amount)
for _, out := range outputs {
readOnlyOutputs = append(readOnlyOutputs, out)
}
return readOnlyOutputs
}

8
to.go
View File

@ -5,7 +5,7 @@ import "container/list"
// ToSlice returns a slice containing all values read from ch // ToSlice returns a slice containing all values read from ch
func ToSlice[T any](ch <-chan T) []T { func ToSlice[T any](ch <-chan T) []T {
s := make([]T, 0, cap(ch)) s := make([]T, 0, cap(ch))
Each(ch, func(value T) { s = append(s, value) }) EachSuccessive(ch, func(value T) { s = append(s, value) })
return s return s
} }
@ -14,7 +14,7 @@ func ToSlice[T any](ch <-chan T) []T {
// Nil pointers are ignored. // Nil pointers are ignored.
func ToSliceDeref[T any](ch <-chan *T) []T { func ToSliceDeref[T any](ch <-chan *T) []T {
s := make([]T, 0, cap(ch)) s := make([]T, 0, cap(ch))
Each(ch, func(value *T) { EachSuccessive(ch, func(value *T) {
if value != nil { if value != nil {
s = append(s, *value) s = append(s, *value)
} }
@ -25,7 +25,7 @@ func ToSliceDeref[T any](ch <-chan *T) []T {
// ToList returns a list.List containing all values read from ch // ToList returns a list.List containing all values read from ch
func ToList[T any](ch <-chan T) *list.List { func ToList[T any](ch <-chan T) *list.List {
l := list.New() l := list.New()
Each(ch, func(value T) { l.PushBack(value) }) EachSuccessive(ch, func(value T) { l.PushBack(value) })
return l return l
} }
@ -54,7 +54,7 @@ func ToMapWithRunner[T any, K comparable, V any](ch <-chan T, runner Runner, f f
// The map key-value pairs are determined by f // The map key-value pairs are determined by f
func ToMapSuccessive[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { func ToMapSuccessive[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V {
m := map[K]V{} m := map[K]V{}
Each(ch, func(value T) { EachSuccessive(ch, func(value T) {
k, v := f(value) k, v := f(value)
m[k] = v m[k] = v
}) })

View File

@ -13,54 +13,3 @@ func determineBufferSize[T any](channels []<-chan T) int {
} }
return maxBufSize return maxBufSize
} }
// Flush consumes all values and discards them immediately.
// It blocks until all sources are closed
func Flush[T any](sources ...<-chan T) {
for range Merge(sources...) {
}
}
// Each consumes all values and calls f for each of them.
// It blocks until source is closed
func Each[T any](source <-chan T, f func(T)) {
for value := range source {
f(value)
}
}
// Tee returns 2 channels which both receive all values from source.
// It's basically a copy function for channels
func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
outs := TeeMany(source, 2)
return outs[0], outs[1]
}
// TeeMany returns a given amount of channels which all receive all values from source.
// It's basically a copy function for channels
func TeeMany[T any](source <-chan T, amount int) []<-chan T {
outputs := make([]chan T, amount)
for i := range outputs {
outputs[i] = make(chan T, cap(source))
}
go func() {
defer func() {
for _, out := range outputs {
close(out)
}
}()
for value := range source {
for _, out := range outputs {
out <- value
}
}
}()
readOnlyOutputs := make([]<-chan T, 0, amount)
for _, out := range outputs {
readOnlyOutputs = append(readOnlyOutputs, out)
}
return readOnlyOutputs
}