Kotlin

Kotlin Coroutine(코루틴) - Channel(채널)

SKaSha 2019. 12. 14. 00:23

채널은 넌블로킹 전송 개념으로 SendChannel과 ReceiveChannel 인터페이스를 이용해 값들의 스트림을 전송하는 방법을 제공합니다.

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        // 여기에 다량의 CPU 연산작업이나 비동기 로직을 둘 수 있다.
        for (x in 1..5) channel.send(x * x)
    }
    // 5개의 값을 채널로부터 받는다
    repeat(5) { println(channel.receive()) }
    println("Done!")

    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // 모두 보내고 닫기 명시
    }
    // for 루프를 사용해 끝까지 읽기
    for (element in channel) println(element)
    println("Done!")
}

송신자는 isFull 값이 true이면 일시 지연되고 수신자는 isEmpty가 true면 일시 지연됩니다.
close()를 이용하면 채널을 바로 받는 것이 아니라 닫겠다는 특수한 토큰을 보냅니다.
offer(element) 함수는 요소를 채널에 추가할수 있으면 넣고 꽉 찬 경우 false를 리턴합니다.
poll()함수는 요소를 반환하는데 채널이 비어있으면 null을 리턴합니다.

produce 생산자 소비자 패턴

// 생산자를 위한 함수 생성
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {
    var total: Int = 0
    for (x in 1..5) {
        total += x
        send(total)
    }
}

fun main() = runBlocking {
    val result = producer()  // 값의 생산
    result.consumeEach { print("$it ") } // 소비자 루틴 구성
}

버퍼를 가진 채널

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(3) // 버퍼 capacity 값을 준다
    val sender = launch(coroutineContext) { // 송신자 측
        repeat(10) {
            println("Sending $it")
            channel.send(it) // 지속적으로 보내다가 꽉 차면 일시 지연됨
        }
    }
    // 아무것도 안받고 1초가 잠시 기다린 후
    delay(1000)
    sender.cancel() // 송신자의 작업을 취소 한다.
}

채널에 버퍼를 지정해주면 지연 없이 여러개의 요소를 보낼 수 있게 됩니다.

select 표현식

fun main() = runBlocking {
    val routine1 = GlobalScope.produce {
        delay(Random().nextInt(1000).toLong())
        send("A")
    }
    val routine2 = GlobalScope.produce {
        delay(Random().nextInt(1000).toLong())
        send("B")
    }
    // 먼저 수행된 것을 받게 된다.
    val result = select<String> {
        routine1.onReceive { result -> result }
        routine2.onReceive { result -> result }
    }
    println("Result was $result")
}

다양한 채널에서 응답을 해야할때 각 채널의 실행 시간에 따라 결과가 달라질수 있는데, 이때 select을 이용하면 표현식을 통해 결과를 받을수 있습니다.

그 외 확장된 채널 자료형

RendezvousChannel

내부에 버퍼를 두지 않는 채널. send 동작은 receive가 일어나기전까지, recieve는 누군가 send하기 전까지는 일시 중단 상태입니다.

ArrayChannel

특정한 크기로 고정된 버퍼를 가진 채널. send는 해당 버퍼가 차기 전까지 지연없이 계속 보낼수 있고, recieve도 버퍼가 비기전까지 계속 받을수 있습니다.

LinkedListChannel

링크드리스트 구조이기떄문에 버퍼의 크기가 제한이 없습니다.
send가 무한히 지속될 경우 메모리 부족 오류가 발생됩니다.

ConflatedChannel

버퍼는 하나의 요소만 허용하지만 일시 지연 되지 않고 계속 send 할 수 있습니다.
하지만 기존의 값을 덮어 씌웁니다.

참고

서적 : Do it! 코틀린 프로그래밍