티스토리 뷰

반응형

코루틴과 멀티스레딩 중 어떤게 더 속도 측면에서 빠를지 궁금해서 간단한 클래스와 이를 테스트 했던 것들을 공유하고자 한다.

 

# 가상 요구사항

우선 여러개의 파일들을 비동기적으로 어딘가 보내야하는 상황을 가상 시나리오로 생각해보자.

 

이때, 다음과 같은 요구사항을 충족해야한다.

  • 속도가 너무 느리면 안 된다. 
  • 여러개의 파일들을 보내다가 중간에 전송을 실패하더라도 전체 파일전송에는 영향을 미치지 않아야한다.
  • 중간에 파일 전송에 실패하면 재전송 시도를 한다.

 

해당 요구사항을 충족하기 위해 코루틴을 활용한 구현체와, 스레드풀을 활용한 구현체 두 가지를 만들고 성능 테스트를 진행할 것이다.

 

# 테스트 클래스

AsyncFileSender (Interface)

interface AsyncFileSender {
    fun sendAllFiles(files: List<String>)
}

내부 구현으로는 파일을 1000개 받으면 각각 100개씩 청크로 분할하여 각기 다른 스레드에서 파일 전송을 진행할 것이다.

 

파일은 단순히 1부터 시작하여 1씩 증가하는 문자열 리스트로 구성될 것이다.

 

# 코루틴을 활용한 구현체

먼저 코루틴을 사용하려면 다음 의존성을 추가해야한다.

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.1")
}

 

코루틴의 async 고차함수를 활용하여 모두 메인스레드 하나에서 동작하도록 구현했다.

 

AsyncFileSenderCoroutineImple

class AsyncFileSenderCoroutineImpl(
    private val chunkSize: Int = 100,
    private val maxRetries: Int = 3
): AsyncFileSender {
    private val successFiles = mutableListOf<String>()
    private val failedFiles = mutableListOf<String>()

    // 파일 전송은 50ms 소요되며, 10%확률로 실패하여 예외를 던진다.
    private suspend fun sendFile(file: String): String {
        delay(50)
        if (Random.nextDouble() < 0.1) {
            throw RuntimeException("Error while sending file: $file")
        }
        return file
    }

    // 실패를 하면 최대 3번까지 리트라이를 한다.
    // 실패한 파일들은 별도의 리스트에 저장한다.
    private suspend fun attemptToSendFile(file: String, chunkIndex: Int, fileIndex: Int) {
        repeat(maxRetries) { attempt ->
            try {
                val sentFile = sendFile(file)
                successFiles.add(sentFile)
                return
            } catch (e: RuntimeException) {
                if (attempt == maxRetries - 1) {
                    logFailureToSend(file, chunkIndex, fileIndex, attempt + 1, e.message)
                    failedFiles.add(file)
                }
            }
        }
    }

    // 파일을 받으면 청크로 분할하여 코루틴을 생성한다.
    override fun sendAllFiles(files: List<String>) = runBlocking {
        files.chunked(chunkSize).mapIndexed { chunkIndex, chunk ->
            async {
                chunk.forEachIndexed { fileIndex, file ->
                    attemptToSendFile(file, chunkIndex, fileIndex)
                }
            }
        }.awaitAll()
        reportAndCleanUp()
    }

    private fun logFailureToSend(file: String, chunkIndex: Int, fileIndex: Int, attempts: Int, errorMessage: String?) {
        log("Failed to send file $file at chunk $chunkIndex, position $fileIndex after $attempts attempts: $errorMessage")
    }

    private fun reportAndCleanUp() {
        log("Number of successfully sent files: ${successFiles.size}")
        log("Number of failed files: ${failedFiles.size}")
        failedFiles.forEach { log("Failed file: $it") }

        successFiles.clear()
        failedFiles.clear()
    }
}

 

# 스레드 풀을 활용한 구현체

스레드 풀은 디폴트 200개로 설정했다.

 

AsyncFileSenderThreadImpl

class AsyncFileSenderThreadImpl(
    private val chunkSize: Int = 100,
    private val maxRetries: Int = 3
): AsyncFileSender {
    // 여러 스레드가 접근하므로 공유자원을 동기화해준다.
    private val successFiles = Collections.synchronizedList(mutableListOf<String>())
    private val failedFiles = Collections.synchronizedList(mutableListOf<String>())

    private fun sendFile(file: String): String {
        // 코루틴과 다르게 50ms 대기를 Thread.sleep()으로 해준다.
        Thread.sleep(50)

        if (Random.nextDouble() < 0.1) {
            throw RuntimeException("Error while sending file: $file")
        }

        return file
    }

    private fun attemptToSendFile(file: String, chunkIndex: Int, fileIndex: Int) {
        repeat(maxRetries) { attempt ->
            try {
                val sentFile = sendFile(file)
                successFiles.add(sentFile)
                return
            } catch (e: RuntimeException) {
                if (attempt == maxRetries - 1) {
                    logFailureToSend(file, chunkIndex, fileIndex, attempt + 1, e.message)
                    failedFiles.add(file)
                }
            }
        }
    }

    // 스레드 풀을 생성하여 각각 job을 제출해준다.
    override fun sendAllFiles(files: List<String>) {
        val executor = Executors.newFixedThreadPool(200)
        files.chunked(chunkSize).forEachIndexed { chunkIndex, chunk ->
            chunk.forEachIndexed { fileIndex, file ->
                executor.submit { attemptToSendFile(file, chunkIndex, fileIndex) }
            }
        }
        executor.shutdown()
        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)
        } catch (e: InterruptedException) {
            log("File sending interrupted: ${e.message}")
        }
        reportAndCleanUp()
    }

    private fun logFailureToSend(file: String, chunkIndex: Int, fileIndex: Int, attempts: Int, errorMessage: String?) {
        log("Failed to send file $file at chunk $chunkIndex, position $fileIndex after $attempts attempts: $errorMessage")
    }

    private fun reportAndCleanUp() {
        log("Number of successfully sent files: ${successFiles.size}")
        log("Number of failed files: ${failedFiles.size}")
        failedFiles.forEach { log("Failed file: $it") }

        successFiles.clear()
        failedFiles.clear()
    }
}

 

log() 함수

fun now(): LocalDateTime = LocalDateTime.now()
fun log(message: String) = println("${now()}:${Thread.currentThread()}:$message")

 

 

# 성능 테스트

fun main() {
    var start = System.currentTimeMillis()
    val sender1 = AsyncFileSenderCoroutineImpl()
    val files = (1..100000).map { it.toString() }
    sender1.sendAllFiles(files)
    var end = System.currentTimeMillis()
    var executedTimeSeconds = (end - start) / 1000.0
    log("Coroutine Sender executed time: $executedTimeSeconds seconds")

    start = System.currentTimeMillis()
    val sender2 = AsyncFileSenderThreadImpl()
    sender2.sendAllFiles(files)
    end = System.currentTimeMillis()
    executedTimeSeconds = (end - start) / 1000.0
    log("Thread Sender executed time: $executedTimeSeconds seconds")
}

 

 

이제 파일 건수별 몇초가 걸리는지 테스트를 해보자.

 

다음 환경에서 수행했음을 다시 한번 상기하자.

  • 청크 사이즈 = 100
  • 스레드 풀 = 200

 

1000건

 

코루틴 : 6.5초

멀티스레드 : 0.3초

 

1만건

코루틴 : 6초

멀티스레드 : 2초

 

5만건

코루틴 : 6.6초

멀티스레드 : 10초

 

 

10만건

코루틴 : 7초

멀티스레드 : 20초

 

20만건

코루틴 : 7초

멀티스레드 : 40초

 

 

코루틴은 더 많은 건수에서 월등히 좋은 성능을 보였다.

코루틴 100만건

 

코루틴 1000만건
코루틴 1억건

 

스레드 풀 1000개로 테스트 해보자.

스레드 풀 1000개, 10만건

 

스레드 풀 1000개, 100만건

스레드 풀을 1000개로 설정해도 수행시간이 코루틴 마냥 줄어들지 않았는데, 아무래도 하드웨어 프로세서 개수를 전부 1000개로 활용을 못 해서와 100만건을 청크 100으로 나누면 1,000,000 / 100 = 10,000 으로 쓰레드가 계속 밀려서 그런 것 같다.

 

혹시라도 공유자원을 동기화 하기위해서 락을 걸기때문일까? 싶어서 successFiles와 failedFiles에 추가하는 부분을 제거하고 돌려봤는데,

달라지지 않았다.

 

왜 코루틴이 멀티스레드 보다 더 빠른 성능을 보이는 걸까?

 

정답일지는 모르겠지만 GPT는 다음과 같이 답변을 해줬다.

컨텍스트 스위칭이 코루틴이 더 효율적이여서 그런것 같다.

 

여담으로 코루틴의 경우 청크사이즈를 10으로 바꿨을 경우 100일때 보다 더 좋은 성능을 냈다.

청크사이즈를 1로 하니 OOM이 떠서 청크사이즈를 줄일수록 (더 많은 코루틴을 만들 수 있을 수록) 성능이 좋아지지만 하드웨어 자원을 그 만큼 많이 잡아먹는 것 같다.

 

# 그럼 코루틴이 무조건 좋은가?

본문의 테스트를 보면 코루틴이 스레드 풀 보다 월등히 좋은 것으로 나왔지만, 단점도 있다.

코루틴은 위 코드는 메인스레드 하나로만 돌리는데, 만약 프로그램 내부에서 해당 메인스레드를 블록하면 아예 멈춰버리기 때문에 코틀린 공식문서에서는 멀티스레딩과 코루틴을 적절히 섞어서 사용하도록 (스레드에 코루틴을 던져주는, Dispatcher를 활용해서) 권장하고 있다.

 

override fun sendAllFiles(files: List<String>) = runBlocking {
    files.chunked(chunkSize).mapIndexed { chunkIndex, chunk ->
        CoroutineScope(Dispatchers.IO).launch {
            chunk.forEachIndexed { fileIndex, file ->
                attemptToSendFile(file, chunkIndex, fileIndex)
            }
        }
    }.joinAll()
    reportAndCleanUp()
}

Dispatchers.IO를 통해 위와 같이 코드를 작성하면 스레드 풀의 스레드에게 코루틴을 전달(Dispatch)해서 비동기적으로 수행할 수 있다. 이러면 훨씬 안정적이다.

 

사실 파일 전송 같은 경우는 1000건이 넘어가는 경우는 거의 없을 것이므로 만건 이상 테스트 부터는 단순 호기심에 진행해봤는데 흥미로운 결과가 나왔다.

 

위의 가상 요구사항을 충족하려면 멀티스레딩으로 구현하는 것이 가장 적합해보인다. (1000건 1초내외) 뭐 코루틴을 섞어도 괜찮을 것 같다.

 

반응형
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크