commit 969bd519486e7cc90239eb311e472a9d71bbb389 Author: Tordarus Date: Wed Feb 9 12:10:02 2022 +0100 initial commit diff --git a/chan_io.go b/chan_io.go new file mode 100644 index 0000000..5a382ea --- /dev/null +++ b/chan_io.go @@ -0,0 +1,29 @@ +package channel + +import ( + "fmt" + "io" + "time" +) + +// WriteInto writes all given values into the channel ch. +// Is is a shorthand for Forward(ch, AsChan(values...)) +func WriteInto[T any](ch chan<- T, values ...T) { + Forward(ch, AsChan(values...)) +} + +// WriteIntoDelayed writes all given values into the channel ch. +// It sleeps after every write for the given amount of time. +// It is a shorthand for Forward(ch, AsChanDelayed(time, values...)) +func WriteIntoDelayed[T any](ch chan<- T, delay time.Duration, values ...T) { + Forward(ch, AsChanDelayed(delay, values...)) +} + +// WriteIntoWriter reads all values from ch and writes them via fmt.Fprintln to all writers +func WriteIntoWriter[T any](ch <-chan T, writers ...io.Writer) { + for value := range ch { + for _, w := range writers { + fmt.Fprintln(w, value) + } + } +} diff --git a/forward.go b/forward.go new file mode 100644 index 0000000..5808504 --- /dev/null +++ b/forward.go @@ -0,0 +1,11 @@ +package channel + +// Forward reads all values from all sources and sends them to target. +// It blocks until all values are forwarded and the out channel was closed. +// Use with go keyword for non-blocking behavior +func Forward[T any](target chan<- T, sources ...<-chan T) { + for value := range Merge(sources...) { + target <- value + } + close(target) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..f355ce5 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.tordarus.net/Tordarus/channel + +go 1.18 diff --git a/group.go b/group.go new file mode 100644 index 0000000..4078259 --- /dev/null +++ b/group.go @@ -0,0 +1,94 @@ +package channel + +import ( + "time" +) + +// GroupByTime groups all incoming values from source using the grouper function +// and sends them to the returned channel after the given amount of time. +// This can useful for summing or averaging values from a channel on a fixed interval +func GroupByTime[T, G any](source <-chan T, duration time.Duration, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + ticker := time.NewTicker(duration) + defer ticker.Stop() + + current := *new(G) + changed := false + for { + select { + case value, ok := <-source: + if !ok { + if changed { + out <- current + } + return + } + current = grouper(current, value) + changed = true + case <-ticker.C: + group := current + out <- group + current = *new(G) + changed = false + } + } + }() + + return out +} + +// GroupByAmount groups all incoming values from source using the grouper function +// and sends them to the returned channel after the given amount of values were grouped. +// This can useful for summing or averaging values from a channel for a given amount of values +func GroupByAmount[T, G any](source <-chan T, amount int, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + current := *new(G) + currentAmount := 0 + for value := range source { + currentAmount++ + current = grouper(current, value) + + if currentAmount%amount == 0 { + group := current + out <- group + current = *new(G) + } + } + }() + + return out +} + +// GroupByValue groups all incoming values from source using the grouper function +// and sends them to the returned channel after valueFunc returns true for a given value. +// This can useful for summing or averaging values from a channel for a given amount of values +func GroupByValue[T, G any](source <-chan T, valueFunc func(T) bool, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + current := *new(G) + currentAmount := 0 + for value := range source { + currentAmount++ + current = grouper(current, value) + + if valueFunc(value) { + group := current + out <- group + current = *new(G) + } + } + }() + + return out +} diff --git a/limited_runner.go b/limited_runner.go new file mode 100644 index 0000000..75e4630 --- /dev/null +++ b/limited_runner.go @@ -0,0 +1,28 @@ +package channel + +// LimitedRunner is a Runner which runs its methods +// in a pre-defined amount of routines +type LimitedRunner struct { + limiter chan struct{} +} + +var _ Runner = &LimitedRunner{} + +// NewLimitedRunner returns a new LimitedRunner with the given amount +// of allowed routines +func NewLimitedRunner(routineLimit int) *LimitedRunner { + return &LimitedRunner{ + limiter: make(chan struct{}, routineLimit), + } +} + +// Run blocks if the limit is currently exceeded. +// It blocks until a routine becomes available again. +// For non-blocking behavior, use go syntax +func (r *LimitedRunner) Run(f func()) { + r.limiter <- struct{}{} + go func() { + f() + <-r.limiter + }() +} diff --git a/map.go b/map.go new file mode 100644 index 0000000..250c10e --- /dev/null +++ b/map.go @@ -0,0 +1,55 @@ +package channel + +// MapParallel applies mapper to all I's coming from in and sends their return values to out while conserving input order. +// All mappings will be done as concurrently as possible +func MapParallel[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { + return MapParallelWithRunner(source, NewUnlimitedRunner(), mapper) +} + +// MapParallelWithRunner behaves like MapParallel but uses runner to spawn its routines +func MapParallelWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) (out <-chan O) { + outputChannel := make(chan O, cap(source)) + outchannels := make(chan chan O, cap(source)) + + // start routine for each incoming value + go func(in <-chan I, outchannels chan chan O) { + defer close(outchannels) + for inputValue := range in { + inputValue := inputValue + outCh := make(chan O) + outchannels <- outCh + + runner.Run(func() { + defer close(outCh) + outCh <- mapper(inputValue) + }) + } + }(source, outchannels) + + // gather all results in incoming order + go func(out chan<- O, outchannels chan chan O) { + defer close(out) + for ch := range outchannels { + for outputValue := range ch { + out <- outputValue + } + } + }(outputChannel, outchannels) + + return outputChannel +} + +// Map applies mapper to all I's coming from in and sends their return values to out while conserving input order. +// All mappings will be done successively +func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) + + go func() { + defer close(out) + for value := range source { + out <- mapper(value) + } + }() + + return out +} diff --git a/merge.go b/merge.go new file mode 100644 index 0000000..a0a5661 --- /dev/null +++ b/merge.go @@ -0,0 +1,28 @@ +package channel + +import "sync" + +// Merge returns a channel in which all values of all incoming channels are sent to. +// The values will be sent in the same order as they are received +func Merge[T any](channels ...<-chan T) <-chan T { + out := make(chan T, determineBufferSize(channels)) + + var wg sync.WaitGroup + wg.Add(len(channels)) + + for _, ch := range channels { + go func(ch <-chan T) { + for v := range ch { + out <- v + } + wg.Done() + }(ch) + } + + go func() { + wg.Wait() + close(out) + }() + + return out +} diff --git a/of.go b/of.go new file mode 100644 index 0000000..3395250 --- /dev/null +++ b/of.go @@ -0,0 +1,60 @@ +package channel + +// Of returns a channel containing all values +func Of[T any](values ...T) <-chan T { + return AsChanDelayed(0, values...) +} + +// OfDelayed behaves like Of but with a pre-defined delay between each value +func OfDelayed[T any](delay time.Duration, values ...T) <-chan T { + out := make(chan T, len(values)) + + go func(out chan T, values []T) { + for i, value := range values { + out <- value + if i < len(values)-1 { + time.Sleep(delay) + } + } + close(out) + }(out, values) + + return out +} + +// OfDelayedFunc behaves like OfDelayed but accepts a function to determine the delay +func OfDelayedFunc[T any](delayFunc func(value T) time.Duration, values ...T) <-chan T { + out := make(chan T, len(values)) + + go func(out chan T, values []T) { + for i, value := range values { + out <- value + if i < len(values)-1 { + time.Sleep(delayFunc(value)) + } + } + close(out) + }(out, values) + + return out +} + +// OfFunc returns a channel containing the return values of successively calling f +// It closes the channel as soon as ctx is done +func OfFunc[T any](ctx context.Context, buffer int, f func() T) <-chan T { + out := make(chan T, buffer) + + go func() { + defer close(out) + + for ctx.Err() == nil { + select { + case out <- f(): + case <-ctx.Done(): + return + } + } + }() + + return out +} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..7de9e36 --- /dev/null +++ b/runner.go @@ -0,0 +1,8 @@ +package channel + +// Runner is any runnable environment +type Runner interface { + // Run runs f in the Runners environment + // It might be blocking or non-blocking depending on Runners implementation + Run(f func()) +} diff --git a/timeout.go b/timeout.go new file mode 100644 index 0000000..806037a --- /dev/null +++ b/timeout.go @@ -0,0 +1,30 @@ +package channel + +import "time" + +// CloseOnTimeout returns a channel which receives all values from all sources in order of arrival. +// If no source is sending a value in the given timeout duration, the channel will be closed +func CloseOnTimeout[T any](timeout time.Duration, sources ...<-chan T) <-chan T { + input := Merge(sources...) + output := make(chan T, cap(input)) + + go func() { + defer close(output) + + for { + timer := time.NewTimer(timeout) + + select { + case value, ok := <-input: + if !ok { + return + } + output <- value + case <-timer.C: + return + } + } + }() + + return output +} diff --git a/unlimited_runner.go b/unlimited_runner.go new file mode 100644 index 0000000..58e05c3 --- /dev/null +++ b/unlimited_runner.go @@ -0,0 +1,19 @@ +package channel + +// UnlimitedRunner is a Runner which runs each method +// in its own routine +type UnlimitedRunner struct { +} + +var _ Runner = &UnlimitedRunner{} + +// NewUnlimitedRunner returns a new LimitedRunner with the given amount +// of allowed routines +func NewUnlimitedRunner() *UnlimitedRunner { + return &UnlimitedRunner{} +} + +// Run always returns immediately +func (r *UnlimitedRunner) Run(f func()) { + go f() +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..43a3baa --- /dev/null +++ b/utils.go @@ -0,0 +1,18 @@ +package channel + +import ( + "context" + "time" +) + +func determineBufferSize[T any](channels []<-chan T) int { + if len(channels) == 0 { + return 0 + } + + bufSize := 0 + for _, ch := range channels { + bufSize += cap(ch) + } + return bufSize / len(channels) +}