Notice
Recent Posts
Recent Comments
Link
반응형
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- JavaScript
- 알고리즘정렬
- ElasticSearch
- 예제로 배우는 스프링 입문
- 스프링 핵심원리
- 이차전지관련주
- k8s
- Effective Java 3
- springboot
- effectivejava
- 알고리즘
- 클린아키텍처
- 스프링핵심원리
- 이펙티브 자바
- Sort
- 김영한
- 티스토리챌린지
- 자바스크립트
- java
- kubernetes
- 카카오
- 자바
- Spring
- 스프링부트
- 스프링
- 엘라스틱서치
- Kotlin
- 이펙티브자바
- 오블완
- Effective Java
Archives
- Today
- Total
Kim-Baek 개발자 이야기
Kotlin 코루틴 심화 | Flow, Channel로 데이터 스트림 다루기 본문
반응형
이 글을 읽으면: 단일 값이 아닌 연속된 데이터를 안전하고 효율적으로 처리하는 Flow와 Channel을 마스터할 수 있습니다. 실시간 데이터, 이벤트 스트림, 반응형 프로그래밍을 실전 예제로 완벽하게 배워보세요.
📌 목차
- 들어가며 - 왜 Flow가 필요할까?
- Flow 기본 개념 - 흐르는 데이터
- Flow 연산자 - map, filter, collect
- StateFlow와 SharedFlow
- Channel - 코루틴 간 통신
- 실전 반응형 패턴
- 마무리 - 다음 편 예고

들어가며 - 왜 Flow가 필요할까?
실제 프로젝트에서 겪은 문제
2024년 10월, 실시간 채팅 앱 개발
요구사항:
- 서버에서 실시간으로 메시지 수신
- 사용자 타이핑 상태 업데이트
- 접속자 수 실시간 표시
→ "연속된 데이터"를 어떻게 처리하지?
시도한 방법들:
1. suspend 함수 반복 호출 → 비효율적
2. Callback → 콜백 지옥
3. RxJava → 너무 복잡함
해결책: Kotlin Flow → 간단하고 직관적!
suspend vs Flow 차이
비유로 이해하기
suspend 함수 = 택배
- 한 번 주문하면 한 개 받음
- 다시 받으려면 재주문
Flow = 구독 서비스
- 한 번 구독하면 계속 받음
- 우유 배달, 신문 배달처럼
코드로 보기
// suspend - 단일 값
suspend fun fetchUser(): User {
delay(1000)
return User("규철")
}
// Flow - 연속된 값
fun getMessages(): Flow<Message> = flow {
repeat(5) {
delay(1000)
emit(Message("메시지 $it")) // 값 발행
}
}
fun main() = runBlocking {
// suspend - 한 번만 받음
val user = fetchUser()
println(user)
// Flow - 계속 받음
getMessages().collect { message ->
println(message)
}
}
// 출력:
// User(규철)
// Message(메시지 0)
// (1초 후)
// Message(메시지 1)
// (1초 후)
// ...
Flow가 해결하는 것들
1. 연속된 데이터 처리
예시:
- 실시간 주식 시세
- GPS 위치 추적
- 센서 데이터
- 채팅 메시지
2. 백프레셔 (Backpressure)
문제: 데이터가 너무 빨리 와서 처리 못 함
Flow의 해결:
- 느린 소비자(Collector)에 맞춰서 생산
- 메모리 터지는 거 방지
3. 취소와 예외 처리
fun main() = runBlocking {
val job = launch {
getMessages()
.catch { e -> println("에러: $e") } // 예외 처리
.collect { println(it) }
}
delay(2500)
job.cancel() // 취소 가능!
}
Flow 기본 - 흐르는 데이터
Flow 만들기
방법 1: flow 빌더
fun simpleFlow(): Flow<Int> = flow {
println("Flow 시작")
for (i in 1..3) {
delay(1000)
emit(i) // 값 발행
}
println("Flow 끝")
}
fun main() = runBlocking {
println("collect 시작")
simpleFlow().collect { value ->
println("받은 값: $value")
}
println("collect 끝")
}
// 출력:
// collect 시작
// Flow 시작
// (1초 후) 받은 값: 1
// (1초 후) 받은 값: 2
// (1초 후) 받은 값: 3
// Flow 끝
// collect 끝
방법 2: flowOf - 정해진 값들
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5).collect { value ->
println(value)
}
}
// 출력: 1 2 3 4 5 (즉시)
방법 3: asFlow - 컬렉션을 Flow로
fun main() = runBlocking {
listOf("A", "B", "C")
.asFlow()
.collect { println(it) }
}
// 출력: A B C
Cold Stream - 구독할 때만 시작
Flow는 Cold Stream이다
비유: Netflix
- 재생 버튼(collect) 누르기 전엔 아무 일도 안 일어남
- 재생 버튼 누르면 그때 시작
- 여러 명이 보면 각자 처음부터 시작
코드로 확인
fun coldFlow(): Flow<Int> = flow {
println("Flow 시작!")
emit(1)
emit(2)
}
fun main() = runBlocking {
val flow = coldFlow()
println("구독자 1")
flow.collect { println("구독자1: $it") }
println("\n구독자 2")
flow.collect { println("구독자2: $it") }
}
// 출력:
// 구독자 1
// Flow 시작! ← 첫 번째 collect에서 실행
// 구독자1: 1
// 구독자1: 2
//
// 구독자 2
// Flow 시작! ← 두 번째 collect에서 다시 실행
// 구독자2: 1
// 구독자2: 2
실전 예제: 실시간 주식 시세
data class StockPrice(val symbol: String, val price: Double)
fun getStockPrices(symbol: String): Flow<StockPrice> = flow {
println("[$symbol] 시세 구독 시작")
repeat(10) {
delay(1000)
val price = (10000..20000).random().toDouble()
emit(StockPrice(symbol, price))
}
}
fun main() = runBlocking {
println("=== 삼성전자 시세 구독 ===")
getStockPrices("005930")
.collect { stock ->
println("${stock.symbol}: ${stock.price}원")
}
}
// 출력:
// === 삼성전자 시세 구독 ===
// [005930] 시세 구독 시작
// 005930: 15234.0원
// (1초 후)
// 005930: 18921.0원
// ...
Flow 연산자 - map, filter, collect
변환 - map
"각 값을 다른 형태로 바꾸기"
fun main() = runBlocking {
(1..5).asFlow()
.map { it * it } // 제곱
.collect { println(it) }
}
// 출력: 1, 4, 9, 16, 25
실전 예제: API 응답 변환
data class UserDto(val id: Long, val name: String, val email: String)
data class User(val id: Long, val name: String)
fun fetchUsers(): Flow<UserDto> = flow {
emit(UserDto(1, "규철", "user1@example.com"))
emit(UserDto(2, "영희", "user2@example.com"))
}
fun main() = runBlocking {
fetchUsers()
.map { dto -> User(dto.id, dto.name) } // DTO → Entity
.collect { user -> println(user) }
}
// 출력:
// User(id=1, name=규철)
// User(id=2, name=영희)
필터링 - filter
"조건에 맞는 것만 통과"
fun main() = runBlocking {
(1..10).asFlow()
.filter { it % 2 == 0 } // 짝수만
.collect { println(it) }
}
// 출력: 2, 4, 6, 8, 10
실전 예제: 중요한 알림만
data class Notification(val level: String, val message: String)
fun getNotifications(): Flow<Notification> = flow {
emit(Notification("INFO", "로그인 성공"))
emit(Notification("ERROR", "결제 실패"))
emit(Notification("INFO", "프로필 업데이트"))
emit(Notification("ERROR", "네트워크 오류"))
}
fun main() = runBlocking {
getNotifications()
.filter { it.level == "ERROR" } // 에러만
.collect { println("[중요] ${it.message}") }
}
// 출력:
// [중요] 결제 실패
// [중요] 네트워크 오류
결합 - combine, zip
combine - 최신 값들을 조합
fun main() = runBlocking {
val numbers = (1..3).asFlow().onEach { delay(300) }
val strings = flowOf("A", "B", "C", "D").onEach { delay(400) }
numbers.combine(strings) { num, str ->
"$num$str"
}.collect { println(it) }
}
// 출력:
// 1A
// 2A
// 2B
// 3B
// 3C
// 3D
비유: 온도계 + 습도계
온도 변화: 20° → 21° → 22°
습도 변화: 60% → 65%
combine:
(20°, 60%) → (21°, 60%) → (21°, 65%) → (22°, 65%)
→ 최신 값끼리 계속 조합
zip - 순서대로 쌍 만들기
fun main() = runBlocking {
val numbers = (1..3).asFlow()
val strings = flowOf("A", "B", "C")
numbers.zip(strings) { num, str ->
"$num$str"
}.collect { println(it) }
}
// 출력:
// 1A
// 2B
// 3C
시간 제어 - debounce, throttle
debounce - 연속 입력 무시
비유: 엘리베이터 문 닫기 버튼
- 막 누르면 → 마지막 한 번만 인정
- 검색창 타이핑 → 0.5초 멈추면 그때 검색
fun searchQuery(): Flow<String> = flow {
emit("k")
delay(100)
emit("ko")
delay(100)
emit("kot")
delay(600) // 0.5초 이상 멈춤
emit("kotlin")
}
fun main() = runBlocking {
searchQuery()
.debounce(500) // 500ms 동안 입력 없으면 발행
.collect { query ->
println("검색: $query")
}
}
// 출력:
// 검색: kot
// 검색: kotlin
실전 활용: 검색창
fun userInput(): Flow<String> = flow {
val inputs = listOf("a", "an", "and", "andr", "andro", "android")
inputs.forEach { input ->
emit(input)
delay(200) // 사용자 타이핑
}
}
fun main() = runBlocking {
userInput()
.debounce(500) // 0.5초 멈추면 검색
.collect { query ->
println("API 호출: $query")
}
}
// 출력:
// API 호출: android (마지막 것만)
StateFlow와 SharedFlow
StateFlow - 상태 관리
"현재 상태를 항상 가지고 있는 Flow"
비유: 온도계 디스플레이
- 항상 현재 온도 표시
- 새로운 사람이 봐도 현재 온도 알 수 있음
- 온도 변경되면 모두에게 알림
기본 사용법
import kotlinx.coroutines.flow.MutableStateFlow
fun main() = runBlocking {
val stateFlow = MutableStateFlow(0) // 초기값 0
// 구독자 1
launch {
stateFlow.collect { value ->
println("구독자1: $value")
}
}
delay(100)
// 구독자 2 (나중에 구독해도 현재 값 받음)
launch {
stateFlow.collect { value ->
println("구독자2: $value")
}
}
delay(100)
stateFlow.value = 1 // 값 변경
delay(100)
stateFlow.value = 2
delay(100)
}
// 출력:
// 구독자1: 0 ← 초기값
// 구독자2: 0 ← 나중에 구독해도 현재값 받음
// 구독자1: 1
// 구독자2: 1
// 구독자1: 2
// 구독자2: 2
실전 예제: 로그인 상태 관리
sealed class AuthState {
object Loading : AuthState()
data class LoggedIn(val userName: String) : AuthState()
object LoggedOut : AuthState()
}
class AuthViewModel {
private val _authState = MutableStateFlow<AuthState>(AuthState.LoggedOut)
val authState: StateFlow<AuthState> = _authState
suspend fun login(username: String, password: String) {
_authState.value = AuthState.Loading
delay(1000) // API 호출 시뮬레이션
if (password == "1234") {
_authState.value = AuthState.LoggedIn(username)
} else {
_authState.value = AuthState.LoggedOut
}
}
fun logout() {
_authState.value = AuthState.LoggedOut
}
}
fun main() = runBlocking {
val viewModel = AuthViewModel()
// UI에서 상태 관찰
launch {
viewModel.authState.collect { state ->
when (state) {
AuthState.Loading -> println("로그인 중...")
is AuthState.LoggedIn -> println("환영합니다, ${state.userName}님!")
AuthState.LoggedOut -> println("로그아웃 상태")
}
}
}
delay(100)
viewModel.login("규철", "1234")
delay(2000)
viewModel.logout()
delay(100)
}
// 출력:
// 로그아웃 상태
// 로그인 중...
// 환영합니다, 규철님!
// 로그아웃 상태
SharedFlow - 이벤트 브로드캐스팅
"여러 구독자에게 동시에 알림"
비유: 라디오 방송
- 여러 사람이 동시에 들음
- 늦게 튠 하면 이전 내용 못 들음 (기본)
- replay로 최근 N개는 다시 들을 수 있음
기본 사용법
import kotlinx.coroutines.flow.MutableSharedFlow
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<String>()
// 구독자 1
launch {
sharedFlow.collect { value ->
println("구독자1: $value")
}
}
// 구독자 2
launch {
sharedFlow.collect { value ->
println("구독자2: $value")
}
}
delay(100)
sharedFlow.emit("메시지 1")
delay(100)
sharedFlow.emit("메시지 2")
delay(100)
}
// 출력:
// 구독자1: 메시지 1
// 구독자2: 메시지 1
// 구독자1: 메시지 2
// 구독자2: 메시지 2
replay - 최근 N개 재생
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<String>(replay = 2) // 최근 2개 저장
sharedFlow.emit("메시지 1")
sharedFlow.emit("메시지 2")
sharedFlow.emit("메시지 3")
// 이제 구독 시작 (늦었지만 최근 2개는 받음)
launch {
sharedFlow.collect { value ->
println("늦은 구독자: $value")
}
}
delay(100)
}
// 출력:
// 늦은 구독자: 메시지 2
// 늦은 구독자: 메시지 3
StateFlow vs SharedFlow
StateFlow SharedFlow
| 용도 | 상태 관리 | 이벤트 전달 |
| 초기값 | 필수 | 선택 |
| 현재값 | 항상 있음 | 없음 |
| replay | 1개 (현재값) | 설정 가능 |
| 비유 | 온도계 | 라디오 |
Channel - 코루틴 간 통신
Channel이 뭐길래?
"코루틴 간 데이터 주고받는 파이프"
비유: 컨베이어 벨트
- 한쪽에서 물건 올림 (send)
- 다른 쪽에서 물건 받음 (receive)
- 물건 쌓이면 대기 (버퍼)
Flow vs Channel 차이
Flow Channel
| 방향 | 단방향 (생산자→소비자) | 양방향 가능 |
| Cold/Hot | Cold | Hot |
| 버퍼 | 없음 | 있음 |
| 비유 | 수도꼭지 | 우편함 |
기본 사용법
import kotlinx.coroutines.channels.Channel
fun main() = runBlocking {
val channel = Channel<Int>()
// 생산자
launch {
for (x in 1..5) {
println("보내기: $x")
channel.send(x)
}
channel.close() // 더 이상 안 보냄
}
// 소비자
for (y in channel) {
println("받기: $y")
delay(1000) // 천천히 처리
}
}
// 출력:
// 보내기: 1
// 받기: 1
// 보내기: 2
// (1초 후)
// 받기: 2
// ...
버퍼링
버퍼 없음 (기본) - Rendezvous
fun main() = runBlocking {
val channel = Channel<Int>() // 버퍼 0
launch {
repeat(5) {
println("보내기 시작: $it")
channel.send(it) // 받을 때까지 대기!
println("보내기 완료: $it")
}
channel.close()
}
delay(1000) // 1초 기다림
for (value in channel) {
println("받기: $value")
delay(500)
}
}
// 출력:
// 보내기 시작: 0
// (1초 후, 받는 쪽 시작)
// 받기: 0
// 보내기 완료: 0
// 보내기 시작: 1
// ...
버퍼 있음
fun main() = runBlocking {
val channel = Channel<Int>(capacity = 3) // 버퍼 3개
launch {
repeat(5) {
println("보내기: $it")
channel.send(it) // 버퍼 공간 있으면 즉시 완료
}
channel.close()
}
delay(2000) // 2초 기다림
for (value in channel) {
println("받기: $value")
}
}
// 출력:
// 보내기: 0 ← 즉시
// 보내기: 1 ← 즉시
// 보내기: 2 ← 즉시
// 보내기: 3 ← 버퍼 가득, 대기
// (2초 후)
// 받기: 0
// 보내기: 4
// 받기: 1
// ...
실전 예제: 생산자-소비자 패턴
data class Task(val id: Int, val name: String)
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) {
send(x++)
delay(100)
}
}
fun CoroutineScope.processTasks(channel: ReceiveChannel<Int>) = launch {
for (task in channel) {
println("[Worker] 작업 $task 처리 중...")
delay(300) // 작업 시간
println("[Worker] 작업 $task 완료")
}
}
fun main() = runBlocking {
val numbers = produceNumbers()
val worker = processTasks(numbers)
delay(2000)
numbers.cancel()
worker.join()
}
// 출력:
// [Worker] 작업 1 처리 중...
// [Worker] 작업 1 완료
// [Worker] 작업 2 처리 중...
// [Worker] 작업 2 완료
// ...
실전 패턴 모음
패턴 1: 실시간 검색
fun searchFlow(query: Flow<String>): Flow<List<String>> {
return query
.debounce(300) // 0.3초 대기
.filter { it.length >= 2 } // 2글자 이상
.distinctUntilChanged() // 같은 검색어 중복 제거
.flatMapLatest { searchQuery -> // 최신 검색만
flow {
println("검색 중: $searchQuery")
delay(500) // API 호출
emit(listOf("${searchQuery}1", "${searchQuery}2"))
}
}
}
fun main() = runBlocking {
val queryFlow = MutableSharedFlow<String>()
launch {
searchFlow(queryFlow).collect { results ->
println("결과: $results")
}
}
delay(100)
queryFlow.emit("k")
delay(100)
queryFlow.emit("ko")
delay(100)
queryFlow.emit("kot")
delay(500) // 입력 멈춤
delay(1000)
}
// 출력:
// 검색 중: kot
// 결과: [kot1, kot2]
패턴 2: 페이징
data class Page<T>(val items: List<T>, val nextPage: Int?)
fun loadPages(): Flow<Page<String>> = flow {
var page = 1
while (page <= 5) {
println("페이지 $page 로딩 중...")
delay(1000)
val items = List(10) { "아이템 ${(page-1)*10 + it + 1}" }
val nextPage = if (page < 5) page + 1 else null
emit(Page(items, nextPage))
page++
}
}
fun main() = runBlocking {
loadPages()
.onEach { page ->
println("받은 아이템: ${page.items.size}개")
page.items.take(3).forEach { println(" - $it") }
}
.collect()
}
// 출력:
// 페이지 1 로딩 중...
// 받은 아이템: 10개
// - 아이템 1
// - 아이템 2
// - 아이템 3
// 페이지 2 로딩 중...
// ...
패턴 3: 여러 소스 결합
data class WeatherData(val temp: Int, val humidity: Int, val windSpeed: Int)
fun temperatureFlow(): Flow<Int> = flow {
while (true) {
emit((15..30).random())
delay(1000)
}
}
fun humidityFlow(): Flow<Int> = flow {
while (true) {
emit((40..80).random())
delay(1500)
}
}
fun windSpeedFlow(): Flow<Int> = flow {
while (true) {
emit((0..20).random())
delay(2000)
}
}
fun main() = runBlocking {
combine(
temperatureFlow(),
humidityFlow(),
windSpeedFlow()
) { temp, humidity, wind ->
WeatherData(temp, humidity, wind)
}
.take(5)
.collect { data ->
println("날씨: 온도 ${data.temp}°, 습도 ${data.humidity}%, 풍속 ${data.windSpeed}m/s")
}
}
패턴 4: 재시도 로직
fun <T> Flow<T>.retryWhen(
predicate: suspend (cause: Throwable, attempt: Long) -> Boolean
): Flow<T> = flow {
var attempt = 0L
do {
val shallRetry = try {
collect { emit(it) }
false
} catch (e: Throwable) {
predicate(e, attempt++)
}
} while (shallRetry)
}
// 사용
fun unstableApi(): Flow<String> = flow {
val shouldFail = (1..3).random() != 3
if (shouldFail) {
throw Exception("네트워크 오류")
}
emit("성공!")
}
fun main() = runBlocking {
unstableApi()
.retryWhen { cause, attempt ->
println("시도 $attempt 실패: ${cause.message}")
delay(1000 * (attempt + 1))
attempt < 3
}
.catch { e -> println("최종 실패: ${e.message}") }
.collect { println(it) }
}
마무리 - 다음 편 예고
오늘 배운 것 ✅
- Flow 기본 - 연속된 데이터 스트림
- Flow 연산자 - map, filter, combine, debounce
- StateFlow - 상태 관리 (항상 현재값 유지)
- SharedFlow - 이벤트 브로드캐스팅
- Channel - 코루틴 간 통신
- 실전 패턴 - 검색, 페이징, 재시도
다음 편에서 배울 것 📚
16편: Spring Boot와 Kotlin | 실전 백엔드 개발
- Spring Boot + Kotlin 설정
- Controller, Service, Repository
- JPA with Kotlin
- 코루틴으로 비동기 API
- 실전 REST API 구현
핵심 정리
Flow vs suspend
suspend Flow
| 반환 | 단일 값 | 연속된 값 |
| 용도 | API 한 번 호출 | 실시간 데이터 |
| 비유 | 택배 | 구독 서비스 |
StateFlow vs SharedFlow
StateFlow SharedFlow
| 초기값 | 필수 | 선택 |
| 현재값 | 있음 | 없음 |
| 용도 | 상태 | 이벤트 |
| 비유 | 온도계 | 라디오 |
💬 댓글로 알려주세요!
- Flow를 어디에 활용하고 계신가요?
- StateFlow vs SharedFlow 중 뭘 더 많이 쓰시나요?
- 이 글이 도움이 되셨나요?
반응형
'개발 > java basic' 카테고리의 다른 글
| Kotlin 코루틴 기초 | launch, async, suspend로 비동기 정복하기 (0) | 2025.12.29 |
|---|---|
| Kotlin 예외 처리 완벽 가이드 | try-catch, runCatching, Result로 안전한 코드 만들기 (0) | 2025.12.28 |
| Kotlin Delegation 완벽 가이드 | by 키워드로 보일러플레이트 제거하기 (0) | 2025.12.28 |
| Kotlin 상속과 인터페이스 완벽 가이드 | open, abstract, sealed (1) | 2025.12.27 |
| Kotlin 제네릭스 완벽 가이드 | in, out, reified 마스터하기 (0) | 2025.12.27 |
Comments
