channel/filter.go

40 lines
981 B
Go
Raw Permalink Normal View History

2022-07-05 21:30:19 +02:00
package channel
2024-05-14 20:26:58 +02:00
func FilterSuccessive[T any](source <-chan T, filter func(T) bool) <-chan T {
2022-07-05 21:30:19 +02:00
out := make(chan T, cap(source))
go func() {
defer close(out)
for value := range source {
if filter(value) {
out <- value
}
}
}()
return out
}
2024-05-14 20:26:58 +02:00
func Filter[T any](source <-chan T, filter func(T) bool) <-chan T {
return FilterPreserveOrderWithRunner(source, getDefaultRunner(), filter)
}
func FilterPreserveOrderWithRunner[T any](source <-chan T, runner Runner, filter func(T) bool) <-chan T {
type FilteredValue[T any] struct {
Value T
Filter bool
}
mappedValues := MapPreserveOrderWithRunner(source, runner, func(value T) FilteredValue[T] {
return FilteredValue[T]{Value: value, Filter: filter(value)}
})
filteredValues := FilterSuccessive(mappedValues, func(filteredValue FilteredValue[T]) bool {
return filteredValue.Filter
})
return MapSuccessive(filteredValues, func(filteredValue FilteredValue[T]) T {
return filteredValue.Value
})
}