Problem Statement
Implement a fan-in function that merges N input channels into a single output channel. Close the output channel only when all inputs are closed.
Implement a fan-in function that merges N input channels into a single output channel. Close the output channel only when all inputs are closed.
package fanin
import "sync"
func Merge[T any](channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
// Start a goroutine for each input channel
for _, ch := range channels {
wg.Add(1)
go func(c <-chan T) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
// Close output when all inputs are done
go func() {
wg.Wait()
close(out)
}()
return out
}func main() {
// Create input channels
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
// Start producers
go func() {
for i := 0; i < 5; i++ {
ch1 <- i * 10
}
close(ch1)
}()
go func() {
for i := 0; i < 5; i++ {
ch2 <- i * 100
}
close(ch2)
}()
go func() {
for i := 0; i < 5; i++ {
ch3 <- i * 1000
}
close(ch3)
}()
// Merge and consume
merged := fanin.Merge(ch1, ch2, ch3)
for val := range merged {
fmt.Println(val)
}
}func MergeWithCancel[T any](ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan T) {
defer wg.Done()
for {
select {
case val, ok := <-c:
if !ok {
return
}
select {
case out <- val:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}func FanOut[T any](input <-chan T, n int) []<-chan T {
outputs := make([]chan T, n)
for i := range outputs {
outputs[i] = make(chan T)
}
go func() {
defer func() {
for _, ch := range outputs {
close(ch)
}
}()
i := 0
for val := range input {
outputs[i] <- val
i = (i + 1) % n
}
}()
result := make([]<-chan T, n)
for i, ch := range outputs {
result[i] = ch
}
return result
}