Kim-Baek 개발자 이야기

Kotlin 코루틴 심화 | Flow, Channel로 데이터 스트림 다루기 본문

개발/java basic

Kotlin 코루틴 심화 | Flow, Channel로 데이터 스트림 다루기

김백개발자 2026. 1. 1. 21:58
반응형

이 글을 읽으면: 단일 값이 아닌 연속된 데이터를 안전하고 효율적으로 처리하는 Flow와 Channel을 마스터할 수 있습니다. 실시간 데이터, 이벤트 스트림, 반응형 프로그래밍을 실전 예제로 완벽하게 배워보세요.


📌 목차

  1. 들어가며 - 왜 Flow가 필요할까?
  2. Flow 기본 개념 - 흐르는 데이터
  3. Flow 연산자 - map, filter, collect
  4. StateFlow와 SharedFlow
  5. Channel - 코루틴 간 통신
  6. 실전 반응형 패턴
  7. 마무리 - 다음 편 예고

들어가며 - 왜 Flow가 필요할까?

실제 프로젝트에서 겪은 문제

2024년 10월, 실시간 채팅 앱 개발

요구사항:
- 서버에서 실시간으로 메시지 수신
- 사용자 타이핑 상태 업데이트
- 접속자 수 실시간 표시
→ "연속된 데이터"를 어떻게 처리하지?

시도한 방법들:
1. suspend 함수 반복 호출 → 비효율적
2. Callback → 콜백 지옥
3. RxJava → 너무 복잡함

해결책: Kotlin Flow → 간단하고 직관적!

suspend vs Flow 차이

비유로 이해하기

suspend 함수 = 택배
- 한 번 주문하면 한 개 받음
- 다시 받으려면 재주문

Flow = 구독 서비스
- 한 번 구독하면 계속 받음
- 우유 배달, 신문 배달처럼

코드로 보기

// suspend - 단일 값
suspend fun fetchUser(): User {
    delay(1000)
    return User("규철")
}

// Flow - 연속된 값
fun getMessages(): Flow<Message> = flow {
    repeat(5) {
        delay(1000)
        emit(Message("메시지 $it"))  // 값 발행
    }
}

fun main() = runBlocking {
    // suspend - 한 번만 받음
    val user = fetchUser()
    println(user)
    
    // Flow - 계속 받음
    getMessages().collect { message ->
        println(message)
    }
}
// 출력:
// User(규철)
// Message(메시지 0)
// (1초 후)
// Message(메시지 1)
// (1초 후)
// ...

Flow가 해결하는 것들

1. 연속된 데이터 처리

예시:
- 실시간 주식 시세
- GPS 위치 추적
- 센서 데이터
- 채팅 메시지

2. 백프레셔 (Backpressure)

문제: 데이터가 너무 빨리 와서 처리 못 함

Flow의 해결:
- 느린 소비자(Collector)에 맞춰서 생산
- 메모리 터지는 거 방지

3. 취소와 예외 처리

fun main() = runBlocking {
    val job = launch {
        getMessages()
            .catch { e -> println("에러: $e") }  // 예외 처리
            .collect { println(it) }
    }
    
    delay(2500)
    job.cancel()  // 취소 가능!
}

Flow 기본 - 흐르는 데이터

Flow 만들기

방법 1: flow 빌더

fun simpleFlow(): Flow<Int> = flow {
    println("Flow 시작")
    for (i in 1..3) {
        delay(1000)
        emit(i)  // 값 발행
    }
    println("Flow 끝")
}

fun main() = runBlocking {
    println("collect 시작")
    simpleFlow().collect { value ->
        println("받은 값: $value")
    }
    println("collect 끝")
}
// 출력:
// collect 시작
// Flow 시작
// (1초 후) 받은 값: 1
// (1초 후) 받은 값: 2
// (1초 후) 받은 값: 3
// Flow 끝
// collect 끝

방법 2: flowOf - 정해진 값들

fun main() = runBlocking {
    flowOf(1, 2, 3, 4, 5).collect { value ->
        println(value)
    }
}
// 출력: 1 2 3 4 5 (즉시)

방법 3: asFlow - 컬렉션을 Flow로

fun main() = runBlocking {
    listOf("A", "B", "C")
        .asFlow()
        .collect { println(it) }
}
// 출력: A B C

Cold Stream - 구독할 때만 시작

Flow는 Cold Stream이다

비유: Netflix

- 재생 버튼(collect) 누르기 전엔 아무 일도 안 일어남
- 재생 버튼 누르면 그때 시작
- 여러 명이 보면 각자 처음부터 시작

코드로 확인

fun coldFlow(): Flow<Int> = flow {
    println("Flow 시작!")
    emit(1)
    emit(2)
}

fun main() = runBlocking {
    val flow = coldFlow()
    
    println("구독자 1")
    flow.collect { println("구독자1: $it") }
    
    println("\n구독자 2")
    flow.collect { println("구독자2: $it") }
}
// 출력:
// 구독자 1
// Flow 시작!  ← 첫 번째 collect에서 실행
// 구독자1: 1
// 구독자1: 2
//
// 구독자 2
// Flow 시작!  ← 두 번째 collect에서 다시 실행
// 구독자2: 1
// 구독자2: 2

실전 예제: 실시간 주식 시세

data class StockPrice(val symbol: String, val price: Double)

fun getStockPrices(symbol: String): Flow<StockPrice> = flow {
    println("[$symbol] 시세 구독 시작")
    
    repeat(10) {
        delay(1000)
        val price = (10000..20000).random().toDouble()
        emit(StockPrice(symbol, price))
    }
}

fun main() = runBlocking {
    println("=== 삼성전자 시세 구독 ===")
    getStockPrices("005930")
        .collect { stock ->
            println("${stock.symbol}: ${stock.price}원")
        }
}
// 출력:
// === 삼성전자 시세 구독 ===
// [005930] 시세 구독 시작
// 005930: 15234.0원
// (1초 후)
// 005930: 18921.0원
// ...

Flow 연산자 - map, filter, collect

변환 - map

"각 값을 다른 형태로 바꾸기"

fun main() = runBlocking {
    (1..5).asFlow()
        .map { it * it }  // 제곱
        .collect { println(it) }
}
// 출력: 1, 4, 9, 16, 25

실전 예제: API 응답 변환

data class UserDto(val id: Long, val name: String, val email: String)
data class User(val id: Long, val name: String)

fun fetchUsers(): Flow<UserDto> = flow {
    emit(UserDto(1, "규철", "user1@example.com"))
    emit(UserDto(2, "영희", "user2@example.com"))
}

fun main() = runBlocking {
    fetchUsers()
        .map { dto -> User(dto.id, dto.name) }  // DTO → Entity
        .collect { user -> println(user) }
}
// 출력:
// User(id=1, name=규철)
// User(id=2, name=영희)

필터링 - filter

"조건에 맞는 것만 통과"

fun main() = runBlocking {
    (1..10).asFlow()
        .filter { it % 2 == 0 }  // 짝수만
        .collect { println(it) }
}
// 출력: 2, 4, 6, 8, 10

실전 예제: 중요한 알림만

data class Notification(val level: String, val message: String)

fun getNotifications(): Flow<Notification> = flow {
    emit(Notification("INFO", "로그인 성공"))
    emit(Notification("ERROR", "결제 실패"))
    emit(Notification("INFO", "프로필 업데이트"))
    emit(Notification("ERROR", "네트워크 오류"))
}

fun main() = runBlocking {
    getNotifications()
        .filter { it.level == "ERROR" }  // 에러만
        .collect { println("[중요] ${it.message}") }
}
// 출력:
// [중요] 결제 실패
// [중요] 네트워크 오류

결합 - combine, zip

combine - 최신 값들을 조합

fun main() = runBlocking {
    val numbers = (1..3).asFlow().onEach { delay(300) }
    val strings = flowOf("A", "B", "C", "D").onEach { delay(400) }
    
    numbers.combine(strings) { num, str ->
        "$num$str"
    }.collect { println(it) }
}
// 출력:
// 1A
// 2A
// 2B
// 3B
// 3C
// 3D

비유: 온도계 + 습도계

온도 변화: 20° → 21° → 22°
습도 변화: 60% → 65%

combine:
(20°, 60%) → (21°, 60%) → (21°, 65%) → (22°, 65%)
→ 최신 값끼리 계속 조합

zip - 순서대로 쌍 만들기

fun main() = runBlocking {
    val numbers = (1..3).asFlow()
    val strings = flowOf("A", "B", "C")
    
    numbers.zip(strings) { num, str ->
        "$num$str"
    }.collect { println(it) }
}
// 출력:
// 1A
// 2B
// 3C

시간 제어 - debounce, throttle

debounce - 연속 입력 무시

비유: 엘리베이터 문 닫기 버튼

- 막 누르면 → 마지막 한 번만 인정
- 검색창 타이핑 → 0.5초 멈추면 그때 검색
fun searchQuery(): Flow<String> = flow {
    emit("k")
    delay(100)
    emit("ko")
    delay(100)
    emit("kot")
    delay(600)  // 0.5초 이상 멈춤
    emit("kotlin")
}

fun main() = runBlocking {
    searchQuery()
        .debounce(500)  // 500ms 동안 입력 없으면 발행
        .collect { query ->
            println("검색: $query")
        }
}
// 출력:
// 검색: kot
// 검색: kotlin

실전 활용: 검색창

fun userInput(): Flow<String> = flow {
    val inputs = listOf("a", "an", "and", "andr", "andro", "android")
    inputs.forEach { input ->
        emit(input)
        delay(200)  // 사용자 타이핑
    }
}

fun main() = runBlocking {
    userInput()
        .debounce(500)  // 0.5초 멈추면 검색
        .collect { query ->
            println("API 호출: $query")
        }
}
// 출력:
// API 호출: android  (마지막 것만)

StateFlow와 SharedFlow

StateFlow - 상태 관리

"현재 상태를 항상 가지고 있는 Flow"

비유: 온도계 디스플레이

- 항상 현재 온도 표시
- 새로운 사람이 봐도 현재 온도 알 수 있음
- 온도 변경되면 모두에게 알림

기본 사용법

import kotlinx.coroutines.flow.MutableStateFlow

fun main() = runBlocking {
    val stateFlow = MutableStateFlow(0)  // 초기값 0
    
    // 구독자 1
    launch {
        stateFlow.collect { value ->
            println("구독자1: $value")
        }
    }
    
    delay(100)
    
    // 구독자 2 (나중에 구독해도 현재 값 받음)
    launch {
        stateFlow.collect { value ->
            println("구독자2: $value")
        }
    }
    
    delay(100)
    stateFlow.value = 1  // 값 변경
    delay(100)
    stateFlow.value = 2
    
    delay(100)
}
// 출력:
// 구독자1: 0  ← 초기값
// 구독자2: 0  ← 나중에 구독해도 현재값 받음
// 구독자1: 1
// 구독자2: 1
// 구독자1: 2
// 구독자2: 2

실전 예제: 로그인 상태 관리

sealed class AuthState {
    object Loading : AuthState()
    data class LoggedIn(val userName: String) : AuthState()
    object LoggedOut : AuthState()
}

class AuthViewModel {
    private val _authState = MutableStateFlow<AuthState>(AuthState.LoggedOut)
    val authState: StateFlow<AuthState> = _authState
    
    suspend fun login(username: String, password: String) {
        _authState.value = AuthState.Loading
        
        delay(1000)  // API 호출 시뮬레이션
        
        if (password == "1234") {
            _authState.value = AuthState.LoggedIn(username)
        } else {
            _authState.value = AuthState.LoggedOut
        }
    }
    
    fun logout() {
        _authState.value = AuthState.LoggedOut
    }
}

fun main() = runBlocking {
    val viewModel = AuthViewModel()
    
    // UI에서 상태 관찰
    launch {
        viewModel.authState.collect { state ->
            when (state) {
                AuthState.Loading -> println("로그인 중...")
                is AuthState.LoggedIn -> println("환영합니다, ${state.userName}님!")
                AuthState.LoggedOut -> println("로그아웃 상태")
            }
        }
    }
    
    delay(100)
    viewModel.login("규철", "1234")
    
    delay(2000)
    viewModel.logout()
    
    delay(100)
}
// 출력:
// 로그아웃 상태
// 로그인 중...
// 환영합니다, 규철님!
// 로그아웃 상태

SharedFlow - 이벤트 브로드캐스팅

"여러 구독자에게 동시에 알림"

비유: 라디오 방송

- 여러 사람이 동시에 들음
- 늦게 튠 하면 이전 내용 못 들음 (기본)
- replay로 최근 N개는 다시 들을 수 있음

기본 사용법

import kotlinx.coroutines.flow.MutableSharedFlow

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<String>()
    
    // 구독자 1
    launch {
        sharedFlow.collect { value ->
            println("구독자1: $value")
        }
    }
    
    // 구독자 2
    launch {
        sharedFlow.collect { value ->
            println("구독자2: $value")
        }
    }
    
    delay(100)
    sharedFlow.emit("메시지 1")
    delay(100)
    sharedFlow.emit("메시지 2")
    
    delay(100)
}
// 출력:
// 구독자1: 메시지 1
// 구독자2: 메시지 1
// 구독자1: 메시지 2
// 구독자2: 메시지 2

replay - 최근 N개 재생

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<String>(replay = 2)  // 최근 2개 저장
    
    sharedFlow.emit("메시지 1")
    sharedFlow.emit("메시지 2")
    sharedFlow.emit("메시지 3")
    
    // 이제 구독 시작 (늦었지만 최근 2개는 받음)
    launch {
        sharedFlow.collect { value ->
            println("늦은 구독자: $value")
        }
    }
    
    delay(100)
}
// 출력:
// 늦은 구독자: 메시지 2
// 늦은 구독자: 메시지 3

StateFlow vs SharedFlow

StateFlow SharedFlow

용도 상태 관리 이벤트 전달
초기값 필수 선택
현재값 항상 있음 없음
replay 1개 (현재값) 설정 가능
비유 온도계 라디오

Channel - 코루틴 간 통신

Channel이 뭐길래?

"코루틴 간 데이터 주고받는 파이프"

비유: 컨베이어 벨트

- 한쪽에서 물건 올림 (send)
- 다른 쪽에서 물건 받음 (receive)
- 물건 쌓이면 대기 (버퍼)

Flow vs Channel 차이

Flow Channel

방향 단방향 (생산자→소비자) 양방향 가능
Cold/Hot Cold Hot
버퍼 없음 있음
비유 수도꼭지 우편함

기본 사용법

import kotlinx.coroutines.channels.Channel

fun main() = runBlocking {
    val channel = Channel<Int>()
    
    // 생산자
    launch {
        for (x in 1..5) {
            println("보내기: $x")
            channel.send(x)
        }
        channel.close()  // 더 이상 안 보냄
    }
    
    // 소비자
    for (y in channel) {
        println("받기: $y")
        delay(1000)  // 천천히 처리
    }
}
// 출력:
// 보내기: 1
// 받기: 1
// 보내기: 2
// (1초 후)
// 받기: 2
// ...

버퍼링

버퍼 없음 (기본) - Rendezvous

fun main() = runBlocking {
    val channel = Channel<Int>()  // 버퍼 0
    
    launch {
        repeat(5) {
            println("보내기 시작: $it")
            channel.send(it)  // 받을 때까지 대기!
            println("보내기 완료: $it")
        }
        channel.close()
    }
    
    delay(1000)  // 1초 기다림
    
    for (value in channel) {
        println("받기: $value")
        delay(500)
    }
}
// 출력:
// 보내기 시작: 0
// (1초 후, 받는 쪽 시작)
// 받기: 0
// 보내기 완료: 0
// 보내기 시작: 1
// ...

버퍼 있음

fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 3)  // 버퍼 3개
    
    launch {
        repeat(5) {
            println("보내기: $it")
            channel.send(it)  // 버퍼 공간 있으면 즉시 완료
        }
        channel.close()
    }
    
    delay(2000)  // 2초 기다림
    
    for (value in channel) {
        println("받기: $value")
    }
}
// 출력:
// 보내기: 0  ← 즉시
// 보내기: 1  ← 즉시
// 보내기: 2  ← 즉시
// 보내기: 3  ← 버퍼 가득, 대기
// (2초 후)
// 받기: 0
// 보내기: 4
// 받기: 1
// ...

실전 예제: 생산자-소비자 패턴

data class Task(val id: Int, val name: String)

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
        delay(100)
    }
}

fun CoroutineScope.processTasks(channel: ReceiveChannel<Int>) = launch {
    for (task in channel) {
        println("[Worker] 작업 $task 처리 중...")
        delay(300)  // 작업 시간
        println("[Worker] 작업 $task 완료")
    }
}

fun main() = runBlocking {
    val numbers = produceNumbers()
    val worker = processTasks(numbers)
    
    delay(2000)
    numbers.cancel()
    worker.join()
}
// 출력:
// [Worker] 작업 1 처리 중...
// [Worker] 작업 1 완료
// [Worker] 작업 2 처리 중...
// [Worker] 작업 2 완료
// ...

실전 패턴 모음

패턴 1: 실시간 검색

fun searchFlow(query: Flow<String>): Flow<List<String>> {
    return query
        .debounce(300)  // 0.3초 대기
        .filter { it.length >= 2 }  // 2글자 이상
        .distinctUntilChanged()  // 같은 검색어 중복 제거
        .flatMapLatest { searchQuery ->  // 최신 검색만
            flow {
                println("검색 중: $searchQuery")
                delay(500)  // API 호출
                emit(listOf("${searchQuery}1", "${searchQuery}2"))
            }
        }
}

fun main() = runBlocking {
    val queryFlow = MutableSharedFlow<String>()
    
    launch {
        searchFlow(queryFlow).collect { results ->
            println("결과: $results")
        }
    }
    
    delay(100)
    queryFlow.emit("k")
    delay(100)
    queryFlow.emit("ko")
    delay(100)
    queryFlow.emit("kot")
    delay(500)  // 입력 멈춤
    
    delay(1000)
}
// 출력:
// 검색 중: kot
// 결과: [kot1, kot2]

패턴 2: 페이징

data class Page<T>(val items: List<T>, val nextPage: Int?)

fun loadPages(): Flow<Page<String>> = flow {
    var page = 1
    
    while (page <= 5) {
        println("페이지 $page 로딩 중...")
        delay(1000)
        
        val items = List(10) { "아이템 ${(page-1)*10 + it + 1}" }
        val nextPage = if (page < 5) page + 1 else null
        
        emit(Page(items, nextPage))
        page++
    }
}

fun main() = runBlocking {
    loadPages()
        .onEach { page ->
            println("받은 아이템: ${page.items.size}개")
            page.items.take(3).forEach { println("  - $it") }
        }
        .collect()
}
// 출력:
// 페이지 1 로딩 중...
// 받은 아이템: 10개
//   - 아이템 1
//   - 아이템 2
//   - 아이템 3
// 페이지 2 로딩 중...
// ...

패턴 3: 여러 소스 결합

data class WeatherData(val temp: Int, val humidity: Int, val windSpeed: Int)

fun temperatureFlow(): Flow<Int> = flow {
    while (true) {
        emit((15..30).random())
        delay(1000)
    }
}

fun humidityFlow(): Flow<Int> = flow {
    while (true) {
        emit((40..80).random())
        delay(1500)
    }
}

fun windSpeedFlow(): Flow<Int> = flow {
    while (true) {
        emit((0..20).random())
        delay(2000)
    }
}

fun main() = runBlocking {
    combine(
        temperatureFlow(),
        humidityFlow(),
        windSpeedFlow()
    ) { temp, humidity, wind ->
        WeatherData(temp, humidity, wind)
    }
        .take(5)
        .collect { data ->
            println("날씨: 온도 ${data.temp}°, 습도 ${data.humidity}%, 풍속 ${data.windSpeed}m/s")
        }
}

패턴 4: 재시도 로직

fun <T> Flow<T>.retryWhen(
    predicate: suspend (cause: Throwable, attempt: Long) -> Boolean
): Flow<T> = flow {
    var attempt = 0L
    
    do {
        val shallRetry = try {
            collect { emit(it) }
            false
        } catch (e: Throwable) {
            predicate(e, attempt++)
        }
    } while (shallRetry)
}

// 사용
fun unstableApi(): Flow<String> = flow {
    val shouldFail = (1..3).random() != 3
    
    if (shouldFail) {
        throw Exception("네트워크 오류")
    }
    
    emit("성공!")
}

fun main() = runBlocking {
    unstableApi()
        .retryWhen { cause, attempt ->
            println("시도 $attempt 실패: ${cause.message}")
            delay(1000 * (attempt + 1))
            attempt < 3
        }
        .catch { e -> println("최종 실패: ${e.message}") }
        .collect { println(it) }
}

마무리 - 다음 편 예고

오늘 배운 것 ✅

  • Flow 기본 - 연속된 데이터 스트림
  • Flow 연산자 - map, filter, combine, debounce
  • StateFlow - 상태 관리 (항상 현재값 유지)
  • SharedFlow - 이벤트 브로드캐스팅
  • Channel - 코루틴 간 통신
  • 실전 패턴 - 검색, 페이징, 재시도

다음 편에서 배울 것 📚

16편: Spring Boot와 Kotlin | 실전 백엔드 개발

  • Spring Boot + Kotlin 설정
  • Controller, Service, Repository
  • JPA with Kotlin
  • 코루틴으로 비동기 API
  • 실전 REST API 구현

핵심 정리

Flow vs suspend

suspend Flow

반환 단일 값 연속된 값
용도 API 한 번 호출 실시간 데이터
비유 택배 구독 서비스

StateFlow vs SharedFlow

StateFlow SharedFlow

초기값 필수 선택
현재값 있음 없음
용도 상태 이벤트
비유 온도계 라디오

💬 댓글로 알려주세요!

  • Flow를 어디에 활용하고 계신가요?
  • StateFlow vs SharedFlow 중 뭘 더 많이 쓰시나요?
  • 이 글이 도움이 되셨나요?
반응형
Comments