channels/channels.go

137 lines
3.1 KiB
Go
Raw Normal View History

2025-05-07 19:23:51 -04:00
package channels
import (
"context"
"errors"
"sync"
)
var (
ErrChannelClosed = errors.New("channel is closed (returned !ok)")
)
type ChannelPipelineWorkFunction[T any] func(T) T
func ProcessChannelThroughFunction[T any](ctx context.Context, c <-chan T, workCallback ChannelPipelineWorkFunction[T]) <-chan T {
results := make(chan T, cap(c))
go func() {
defer close(results)
for {
if t, err := TrySelectFromChannel(ctx, c); err != nil {
return
} else if err := TryAddToChannel(ctx, results, workCallback(t)); err != nil {
return
}
}
}()
return results
}
// could be problematic in certain situations because it launches one goroutine per incoming channel.
// this shouldn't typically be a problem with a small number of channels, or when the channels producers
// aren't very busy, but it's worth noting.
func MergeChannelsWithContext[T any](ctx context.Context, bufferSize int, channels ...<-chan T) <-chan T {
results := make(chan T, bufferSize)
go func() {
defer close(results)
var wg sync.WaitGroup
for _, c := range channels {
wg.Add(1)
go func() {
defer wg.Done()
for {
if t, err := TrySelectFromChannel(ctx, c); err != nil {
return
} else if err := TryAddToChannel(ctx, results, t); err != nil {
return
}
}
}()
}
wg.Wait()
}()
return results
}
func MergeChannels[T any](bufferSize int, channels ...<-chan T) <-chan T {
return MergeChannelsWithContext(context.Background(), bufferSize, channels...)
}
func TryAddToChannel[T any](ctx context.Context, c chan<- T, t T) error {
select {
case <-ctx.Done():
return context.DeadlineExceeded
case c <- t:
return nil
}
}
func TrySelectFromChannel[T any](ctx context.Context, c <-chan T) (T, error) {
var t T
var ok bool
select {
case <-ctx.Done():
return t, context.DeadlineExceeded
case t, ok = <-c:
if !ok {
return t, ErrChannelClosed
} else {
return t, nil
}
}
}
func ChannelToSliceWithContext[T any](ctx context.Context, c <-chan T) []T {
results := make([]T, 0, cap(c))
loop:
for {
if t, err := TrySelectFromChannel(ctx, c); err != nil {
break loop
} else {
results = append(results, t)
}
}
return results
}
func ChannelToSlice[T any](c <-chan T) []T {
return ChannelToSliceWithContext(context.Background(), c)
}
func SliceToChannelWithContext[T any](ctx context.Context, s []T, bufferSize int) <-chan T {
results := make(chan T, bufferSize)
go func() {
defer close(results)
for _, t := range s {
if err := TryAddToChannel(ctx, results, t); err != nil {
return
}
}
}()
return results
}
func SliceToChannel[T any](s []T, bufferSize int) <-chan T {
return SliceToChannelWithContext(context.Background(), s, bufferSize)
}
func CopyChannelWithContext[T any](ctx context.Context, c <-chan T) (a, b <-chan T) {
left, right := make(chan T, cap(c)), make(chan T, cap(c))
go func() {
defer func() {
close(left)
close(right)
}()
for {
if t, err := TrySelectFromChannel(ctx, c); err != nil {
return
} else if err := TryAddToChannel(ctx, left, t); err != nil {
return
} else if err := TryAddToChannel(ctx, right, t); err != nil {
return
}
}
}()
return left, right
}