CQRS & Event Sourcing
분류: Layer 8 - 데이터베이스 심화
CQRS / Event Sourcing
섹션 제목: “CQRS / Event Sourcing”1. 한 줄 정의
섹션 제목: “1. 한 줄 정의”CQRS는 읽기(Query)와 쓰기(Command) 책임을 분리하는 아키텍처 패턴이고, Event Sourcing은 상태 대신 이벤트(변경 이력)를 저장해 현재 상태를 재현하는 패턴이다. 두 패턴은 독립적으로 쓸 수 있지만, 결합하면 Event Sourcing의 이벤트가 CQRS 읽기 모델 동기화 트리거가 되어 시너지가 극대화된다.
2. 왜 중요한가
섹션 제목: “2. 왜 중요한가”CQRS가 필요한 이유
섹션 제목: “CQRS가 필요한 이유”대부분의 시스템에서 읽기(Read)와 쓰기(Write) 요청의 특성은 근본적으로 다르다. 읽기는 빠른 응답과 복잡한 조인(JOIN)이 필요하고, 쓰기는 데이터 정합성과 트랜잭션이 중요하다. 단일 모델로 두 가지를 모두 만족시키려 하면 어느 쪽도 최적화하기 어렵다.
- 트래픽이 많은 서비스에서 읽기와 쓰기를 독립적으로 스케일링할 수 있다
- 읽기 모델과 쓰기 모델을 서로 다른 DB로 분리할 수 있다 (예: 쓰기는 PostgreSQL, 읽기는 Elasticsearch)
- DDD(Domain-Driven Design)와 잘 맞아떨어진다
Event Sourcing이 필요한 이유
섹션 제목: “Event Sourcing이 필요한 이유”전통적인 CRUD는 현재 상태만 저장한다. “언제, 왜, 어떻게 이 상태가 됐는가”를 알 수 없다. 금융, 주문 이력, 감사(Audit) 시스템처럼 변경 이력이 중요한 도메인에서는 이벤트 자체를 저장해 완벽한 히스토리를 보장해야 한다.
프론트엔드 개발자를 위한 브릿지
섹션 제목: “프론트엔드 개발자를 위한 브릿지”React에서 useMutation으로 데이터를 변경(Command)한 후 invalidateQueries로 캐시를 무효화(Query 읽기 모델 재조회)하는 패턴은 사실상 미니 CQRS다. useMutation = Command 실행, invalidateQueries = 읽기 모델 무효화 후 최신 데이터 재조회. 규모가 커지면 이 패턴이 서버 아키텍처 레벨의 CQRS로 발전한다.
3. 핵심 개념
섹션 제목: “3. 핵심 개념”3-1. CQRS (Command Query Responsibility Segregation)
섹션 제목: “3-1. CQRS (Command Query Responsibility Segregation)”비유: 은행 창구
섹션 제목: “비유: 은행 창구”은행에는 두 종류의 창구가 있다.
- 입출금 창구 (Command): 돈을 넣거나 빼는 작업 — 상태를 변경한다. 빠른 처리보다 정확성이 중요하다.
- 잔액 조회 창구 (Query): 현재 잔액을 확인하는 작업 — 상태를 읽기만 한다. 빠른 응답이 중요하다.
만약 하나의 창구에서 모든 업무를 처리하면 입출금 작업 때문에 조회도 느려지고, 조회 때문에 입출금도 영향을 받는다. CQRS는 이 두 창구를 완전히 분리하는 설계다.
클라이언트 요청 │ ├─ 쓰기 요청 (Command) ──▶ Command 모델 ──▶ 쓰기 DB (정규화, 무결성 중심) │ │ │ │ 이벤트/동기화 │ ▼ └─ 읽기 요청 (Query) ──▶ Query 모델 ──▶ 읽기 DB (역정규화, 조회 최적화)- Command 모델: 데이터 정합성이 우선. 정규화된 테이블 구조. 트랜잭션 처리.
- Query 모델: 응답 속도가 우선. 역정규화(Denormalization)된 구조. 복잡한 JOIN 없이 바로 조회.
왜 이렇게 동작하는가 — 읽기와 쓰기의 근본적 차이
섹션 제목: “왜 이렇게 동작하는가 — 읽기와 쓰기의 근본적 차이”전통적인 CRUD 아키텍처에서는 하나의 도메인 모델(Entity)이 읽기와 쓰기를 모두 담당한다. 그런데 실무에서 읽기와 쓰기는 요구사항이 전혀 다르다.
쓰기(Command)의 특성:
- 비즈니스 규칙 검증이 필수다 (재고가 0 이하로 갈 수 없음, 잔액보다 많이 출금 불가)
- 정규화된 스키마로 데이터 무결성을 보장해야 한다
- 동시성 제어(Lock, 낙관적 제어)가 필요하다
- 트래픽 비중이 상대적으로 적다 (보통 전체의 10~20%)
읽기(Query)의 특성:
- 여러 테이블을 JOIN해서 한 화면에 필요한 데이터를 조합해야 한다
- 비즈니스 규칙 검증이 필요 없다 (읽기만 하므로)
- 빠른 응답 속도가 핵심이다 (UX 직결)
- 트래픽 비중이 압도적으로 크다 (보통 전체의 80~90%)
하나의 모델로 양쪽을 만족시키려면 정규화 ↔ 역정규화 사이에서 타협해야 한다. 정규화하면 쓰기는 안전하지만 읽기가 느려지고(JOIN 폭발), 역정규화하면 읽기는 빠르지만 쓰기 시 데이터 동기화 부담이 생긴다. CQRS는 이 딜레마를 “그냥 둘 다 만들자” 로 해결한다.
CQRS를 도입하면 안 되는 경우:
- 단순 CRUD 앱 (읽기/쓰기 복잡도 차이가 거의 없음)
- 팀 규모가 작고 운영 복잡도를 감당하기 어려운 경우
- 읽기 모델 동기화 지연(Eventual Consistency)을 비즈니스가 허용하지 않는 경우
📖 더 보기: Microsoft Azure - CQRS Pattern — CQRS 적용 시나리오/비적용 시나리오를 클라우드 아키텍처 관점에서 상세 설명 (입문)
NestJS @nestjs/cqrs 구현 예시
섹션 제목: “NestJS @nestjs/cqrs 구현 예시”설치
npm install @nestjs/cqrs모듈 설정
import { Module } from "@nestjs/common";import { CqrsModule } from "@nestjs/cqrs";import { TaskController } from "./task.controller";import { TaskRepository } from "./task.repository";import { CreateTaskHandler } from "./commands/create-task.handler";import { GetAllTasksHandler } from "./queries/get-all-tasks.handler";import { TaskCreatedHandler } from "./events/task-created.handler";
@Module({ imports: [CqrsModule], controllers: [TaskController], providers: [ TaskRepository, CreateTaskHandler, // Command Handler 등록 GetAllTasksHandler, // Query Handler 등록 TaskCreatedHandler, // Event Handler 등록 ],})export class TaskModule {}Command 정의 및 Handler
export class CreateTaskCommand { constructor( public readonly title: string, public readonly description: string, public readonly userId: string, ) {}}
// commands/create-task.handler.tsimport { CommandHandler, ICommandHandler, EventBus } from "@nestjs/cqrs";import { CreateTaskCommand } from "./create-task.command";import { TaskCreatedEvent } from "../events/task-created.event";
@CommandHandler(CreateTaskCommand)export class CreateTaskHandler implements ICommandHandler<CreateTaskCommand> { constructor( private readonly taskRepository: TaskRepository, private readonly eventBus: EventBus, ) {}
async execute(command: CreateTaskCommand): Promise<void> { const { title, description, userId } = command;
// 1. 쓰기 DB에 저장 (Command 모델) const task = await this.taskRepository.create({ title, description, userId, });
// 2. 이벤트 발행 → 읽기 DB 동기화 트리거 this.eventBus.publish(new TaskCreatedEvent(task.id, title, userId)); }}Query 정의 및 Handler
export class GetAllTasksQuery { constructor(public readonly userId: string) {}}
// queries/get-all-tasks.handler.tsimport { QueryHandler, IQueryHandler } from "@nestjs/cqrs";import { GetAllTasksQuery } from "./get-all-tasks.query";
@QueryHandler(GetAllTasksQuery)export class GetAllTasksHandler implements IQueryHandler<GetAllTasksQuery> { constructor(private readonly taskReadRepository: TaskReadRepository) {}
async execute(query: GetAllTasksQuery): Promise<TaskDto[]> { // 읽기 전용 DB에서 조회 (역정규화된 뷰) return this.taskReadRepository.findAllByUser(query.userId); }}Controller에서 CommandBus / QueryBus 사용
import { Controller, Get, Post, Body } from "@nestjs/common";import { CommandBus, QueryBus } from "@nestjs/cqrs";import { CreateTaskCommand } from "./commands/create-task.command";import { GetAllTasksQuery } from "./queries/get-all-tasks.query";
@Controller("tasks")export class TaskController { constructor( private readonly commandBus: CommandBus, private readonly queryBus: QueryBus, ) {}
@Post() async createTask(@Body() dto: CreateTaskDto) { // Command 실행 → 쓰기 모델 await this.commandBus.execute( new CreateTaskCommand(dto.title, dto.description, dto.userId), ); return { message: "태스크가 생성되었습니다." }; }
@Get() async getAllTasks(@Query("userId") userId: string) { // Query 실행 → 읽기 모델 return this.queryBus.execute(new GetAllTasksQuery(userId)); }}예상 출력
POST /tasksBody: { "title": "CQRS 학습", "description": "패턴 이해하기", "userId": "user-1" }
→ Response: { "message": "태스크가 생성되었습니다." } (내부적으로 TaskCreatedEvent 발행, 읽기 DB 비동기 업데이트)
GET /tasks?userId=user-1→ Response: [ { "id": "uuid-1", "title": "CQRS 학습", "status": "open", "createdAt": "..." } ]CQRS 고급: Outbox 패턴 — 이벤트 유실 방지
섹션 제목: “CQRS 고급: Outbox 패턴 — 이벤트 유실 방지”Command Handler에서 DB에 저장하고 이벤트를 발행(EventBus.publish)하는 사이에 서버가 다운되면 이벤트가 유실될 수 있다. Outbox 패턴은 이 문제를 해결한다.
비유: 편지를 보내려면 먼저 발신함(Outbox)에 넣어두고, 우체부(별도 프로세스)가 주기적으로 발신함을 확인해서 배달한다. 편지를 직접 던지는 것보다 훨씬 안전하다.
왜 이렇게 동작하는가:
- DB 저장과 이벤트를 같은 트랜잭션에 묶는다 (atomic 보장)
- 이벤트 발행은 DB 커밋이 완료된 후 별도 프로세스가 처리한다
- 서버 다운 후 재시작 시 미발행 이벤트를 다시 처리할 수 있다
// Outbox 테이블 스키마 (PostgreSQL)// CREATE TABLE outbox (// id UUID PRIMARY KEY,// aggregate_type VARCHAR(50),// aggregate_id VARCHAR(100),// event_type VARCHAR(100),// payload JSONB,// created_at TIMESTAMP DEFAULT NOW(),// published_at TIMESTAMP NULL -- NULL이면 미발행// );
// Command Handler with Outbox Pattern@CommandHandler(CreateTaskCommand)export class CreateTaskHandler implements ICommandHandler<CreateTaskCommand> { constructor(private readonly dataSource: DataSource) {}
async execute(command: CreateTaskCommand): Promise<void> { // 단일 트랜잭션 내에서 도메인 저장 + Outbox 저장 await this.dataSource.transaction(async (manager) => { // 1. 도메인 데이터 저장 const task = manager.create(Task, { title: command.title, userId: command.userId, }); await manager.save(task);
// 2. Outbox에 이벤트 저장 (같은 트랜잭션) await manager.insert(Outbox, { id: randomUUID(), aggregateType: "Task", aggregateId: task.id, eventType: "TaskCreated", payload: { taskId: task.id, title: task.title }, }); // 트랜잭션 커밋 → 둘 다 저장 또는 둘 다 롤백 }); // 실제 이벤트 발행은 별도 OutboxProcessor가 담당 }}📖 더 보기: Designing Scalable Systems with CQRS and Event Sourcing — 프로덕션 수준의 CQRS/Event Sourcing 설계 패턴 (Outbox, Snapshot 포함, 중급)
CQRS 고급: 낙관적 동시성 제어 (Optimistic Concurrency)
섹션 제목: “CQRS 고급: 낙관적 동시성 제어 (Optimistic Concurrency)”두 사용자가 동시에 같은 Command를 보내면 경쟁 조건(Race Condition)이 발생할 수 있다. 비관적 락(SELECT FOR UPDATE)은 성능을 희생하는 반면, 낙관적 동시성 제어는 충돌이 드문 경우에 성능을 유지하면서 정합성을 보장한다.
// 낙관적 동시성 제어를 위한 버전 필드@Entity()export class Task { @PrimaryGeneratedColumn("uuid") id: string;
@Column() title: string;
@VersionColumn() // TypeORM이 자동으로 버전 관리 version: number; // UPDATE 시 version을 WHERE 조건에 추가}
// 동시 수정 시 동작:// 트랜잭션 A: task.version=1 로 읽고 → version=2 로 UPDATE 시도 (성공)// 트랜잭션 B: task.version=1 로 읽고 → version=2 로 UPDATE 시도 (실패! version이 이미 2)// → TypeORM이 OptimisticLockVersionMismatchError 발생// → 애플리케이션이 재시도 로직 실행흐름 정리: CQRS는 읽기/쓰기 모델을 분리하는 구조를 만든다. 그런데 쓰기 모델 변경 사항을 읽기 모델에 어떻게 전달할까? 이 질문의 답이 Event Sourcing이다. 아래에서 Event Sourcing의 원리를 이해하고, 이어지는 3-3절에서 두 패턴이 결합되는 방식을 살펴본다.
3-2. Event Sourcing (이벤트 소싱)
섹션 제목: “3-2. Event Sourcing (이벤트 소싱)”비유: 은행 통장 거래 내역
섹션 제목: “비유: 은행 통장 거래 내역”전통적인 CRUD는 통장 잔액(현재 상태)만 저장한다. 잔액이 100만원이라는 사실만 알 뿐, 어떻게 100만원이 됐는지는 모른다.
Event Sourcing은 거래 내역(이벤트)을 모두 저장한다.
입금 +50만원 (2024-01-01)출금 -10만원 (2024-01-05)입금 +30만원 (2024-01-10)출금 -20만원 (2024-01-15)입금 +50만원 (2024-01-20)────────────────────────────현재 잔액: 100만원 (이벤트 재생으로 계산)거래 내역을 순서대로 재생(replay)하면 어느 시점의 잔액이든 계산할 수 있다.
Command (입금 요청) │ ▼이벤트 생성 (MoneyDepositedEvent { amount: 500000, at: ... }) │ ▼이벤트 스토어 (Event Store) — append-only, 절대 수정 불가 │ ├─ 이벤트 재생(Replay) ──▶ 현재 상태(Aggregate) 복원 │ └─ 이벤트 발행(Publish) ──▶ Read Model 업데이트 (CQRS와 결합)왜 이렇게 동작하는가 — Append-Only의 근본 이유
섹션 제목: “왜 이렇게 동작하는가 — Append-Only의 근본 이유”왜 이벤트를 수정하거나 삭제하면 안 되는가? 이벤트 소싱에서 이벤트는 “이미 발생한 사실” 이다. “1월 5일에 10만원을 출금했다”는 사실은 변경할 수 없다. 잘못된 이벤트가 있다면 삭제하는 것이 아니라 보상 이벤트(Compensating Event) 를 새로 추가한다. 예를 들어 잘못된 출금이 있었다면 “1월 6일에 10만원 출금 취소” 이벤트를 append한다.
이 원칙 덕분에 다음이 보장된다:
- 완벽한 감사 추적: 어떤 이벤트도 사라지지 않으므로 모든 변경의 전체 이력이 남는다
- 동시성 안전: 이벤트는 추가만 하므로 UPDATE 충돌이 발생하지 않는다
- 디버깅 용이: 버그 발생 시점의 이벤트 시퀀스를 그대로 재생하면 정확한 버그 원인을 파악할 수 있다
전통 CRUD vs Event Sourcing의 근본적 차이:
전통 CRUD (현재 상태 저장): accounts 테이블: { id: 1, balance: 100만원, updated_at: 1/20 } → "왜 100만원인가?" 알 수 없음 → UPDATE가 이전 값을 덮어씀
Event Sourcing (이벤트 이력 저장): events 테이블: { type: DEPOSIT, amount: 50만, at: 1/01 } { type: WITHDRAW, amount: 10만, at: 1/05 } { type: DEPOSIT, amount: 30만, at: 1/10 } { type: WITHDRAW, amount: 20만, at: 1/15 } { type: DEPOSIT, amount: 50만, at: 1/20 } → 100만원이 된 경위를 완벽하게 알 수 있음 → INSERT만 발생, UPDATE/DELETE 없음핵심 특성
섹션 제목: “핵심 특성”- Append-Only: 이벤트는 추가만 가능하다. 수정이나 삭제는 없다. 과거는 바꿀 수 없다.
- 이벤트 재생(Replay): 처음부터 모든 이벤트를 순서대로 적용하면 어느 시점이든 상태를 복원할 수 있다.
- 시간 여행(Time Travel): 특정 시점의 상태를 재현할 수 있다. “3일 전 오후 2시의 주문 상태는?”에 답할 수 있다.
- 스냅샷(Snapshot): 이벤트가 너무 많아지면 재생 시간이 길어진다. 특정 시점의 상태를 스냅샷으로 저장해 두고, 그 이후 이벤트만 재생하면 성능을 개선할 수 있다.
📖 더 보기: Martin Fowler - Event Sourcing — Event Sourcing 패턴의 원형 정의. “왜 상태가 아니라 이벤트를 저장하는가”에 대한 근본 철학 (중급)
TypeScript 구현 예시
섹션 제목: “TypeScript 구현 예시”// 이벤트 타입 정의interface DomainEvent { eventId: string; occurredAt: Date; aggregateId: string;}
interface MoneyDepositedEvent extends DomainEvent { type: "MONEY_DEPOSITED"; amount: number;}
interface MoneyWithdrawnEvent extends DomainEvent { type: "MONEY_WITHDRAWN"; amount: number;}
type BankEvent = MoneyDepositedEvent | MoneyWithdrawnEvent;
// 이벤트 스토어 (간단한 메모리 구현)class EventStore { private events: Map<string, BankEvent[]> = new Map();
append(accountId: string, event: BankEvent): void { const existing = this.events.get(accountId) ?? []; this.events.set(accountId, [...existing, event]); console.log( `[EventStore] 이벤트 저장: ${event.type} (accountId: ${accountId})`, ); }
getEvents(accountId: string): BankEvent[] { return this.events.get(accountId) ?? []; }}
// Aggregate: 이벤트 재생으로 상태 복원class BankAccount { private balance: number = 0; private accountId: string;
constructor(accountId: string) { this.accountId = accountId; }
// 이벤트를 순서대로 적용해서 상태 복원 static rehydrate(accountId: string, events: BankEvent[]): BankAccount { const account = new BankAccount(accountId); for (const event of events) { account.apply(event); } console.log( `[Rehydrate] ${events.length}개 이벤트 재생 완료. 잔액: ${account.balance}원`, ); return account; }
private apply(event: BankEvent): void { switch (event.type) { case "MONEY_DEPOSITED": this.balance += event.amount; break; case "MONEY_WITHDRAWN": this.balance -= event.amount; break; } }
getBalance(): number { return this.balance; }}
// 사용 예시const store = new EventStore();const accountId = "account-1";
// 이벤트 저장 (append-only)store.append(accountId, { eventId: "e1", type: "MONEY_DEPOSITED", amount: 500000, aggregateId: accountId, occurredAt: new Date("2024-01-01"),});store.append(accountId, { eventId: "e2", type: "MONEY_WITHDRAWN", amount: 100000, aggregateId: accountId, occurredAt: new Date("2024-01-05"),});store.append(accountId, { eventId: "e3", type: "MONEY_DEPOSITED", amount: 300000, aggregateId: accountId, occurredAt: new Date("2024-01-10"),});
// 이벤트 재생으로 현재 상태 복원const account = BankAccount.rehydrate(accountId, store.getEvents(accountId));console.log(`현재 잔액: ${account.getBalance()}원`);예상 출력
[EventStore] 이벤트 저장: MONEY_DEPOSITED (accountId: account-1)[EventStore] 이벤트 저장: MONEY_WITHDRAWN (accountId: account-1)[EventStore] 이벤트 저장: MONEY_DEPOSITED (accountId: account-1)[Rehydrate] 3개 이벤트 재생 완료. 잔액: 700000원현재 잔액: 700000원NestJS에서 Event Sourcing 라이브러리 활용
섹션 제목: “NestJS에서 Event Sourcing 라이브러리 활용”직접 구현하는 대신 커뮤니티 라이브러리를 활용하면 보일러플레이트를 크게 줄일 수 있다.
# @ocoda/event-sourcing — NestJS 전용 Event Sourcing 라이브러리npm install @ocoda/event-sourcing
# 지원하는 이벤트 스토어 백엔드# - In-Memory (개발/테스트용)# - PostgreSQL# - MongoDB# - DynamoDB (AWS 환경에 적합)// @ocoda/event-sourcing 기본 사용 예시import { Aggregate, AggregateRoot, EventHandler } from "@ocoda/event-sourcing";
@Aggregate({ streamName: "bank-account" })export class BankAccount extends AggregateRoot { private balance: number = 0;
deposit(amount: number): void { this.apply(new MoneyDepositedEvent(amount)); }
@EventHandler(MoneyDepositedEvent) onMoneyDeposited(event: MoneyDepositedEvent): void { this.balance += event.amount; }
getBalance(): number { return this.balance; }}
// 예상 출력: 이벤트 스토어에 append-only 저장 + 자동 rehydrate📖 더 보기: GitHub - ocoda/event-sourcing — NestJS 전용 Event Sourcing 라이브러리. PostgreSQL/DynamoDB 백엔드 지원. AWS 환경에 바로 적용 가능 (입문)
Event Sourcing 장단점
섹션 제목: “Event Sourcing 장단점”| 장점 | 단점 |
|---|---|
| 완벽한 감사 로그(Audit Log) | 구현 복잡도가 높음 |
| 시간 여행(Time Travel) 가능 | 이벤트 스키마 변경이 어려움 |
| 버그 재현 및 디버깅 용이 | 이벤트 스토어 크기가 계속 증가 |
| CQRS와 자연스럽게 결합 | 쿼리가 불편함 (재생 필요) |
| 이벤트 기반 마이크로서비스와 궁합 좋음 | 러닝 커브가 가파름 |
CQRS + Event Sourcing + Kafka 결합 심화: Projection 패턴
섹션 제목: “CQRS + Event Sourcing + Kafka 결합 심화: Projection 패턴”CQRS에서 이벤트를 Kafka로 발행하면 여러 읽기 모델(Projection)을 독립적으로 구성할 수 있다. 이것이 “한 번 쓰고, 여러 곳에서 읽는(Write Once, Read Many Views)” 아키텍처다.
비유: 회사 공지사항 게시판(Event Store/Kafka)에 공지가 올라가면, 팀별 전담 담당자(각 Projection Consumer)가 그 공지를 자기 팀 형식에 맞게 해석해 별도 메모(각 읽기 모델)를 만든다.
왜 이렇게 동작하는가:
- Kafka는 이벤트를 append-only 로그(토픽)로 저장 — Event Store와 동일한 철학
- 각 Consumer Group은 독립적으로 오프셋을 관리 → 읽기 모델 재구성 시 오프셋을 0으로 초기화해 전체 재생 가능
- 새 읽기 모델이 필요하면 코드 추가 후 처음부터 이벤트를 재생 → 기존 시스템 무중단
이벤트 (OrderCreated, OrderShipped...) │ ▼Kafka 토픽 (영구 보존, append-only) │ ├─ Consumer 1: Redis Projection (실시간 주문 현황 캐시) ├─ Consumer 2: PostgreSQL Projection (주문 이력 검색 테이블) ├─ Consumer 3: Elasticsearch Projection (전문 검색 인덱스) └─ Consumer 4: 통계 집계 서비스 (배치 집계 테이블)// NestJS에서 Kafka Consumer를 Projection으로 구성import { Controller } from "@nestjs/common";import { EventPattern, Payload } from "@nestjs/microservices";
@Controller()export class OrderProjectionHandler { constructor( private readonly redisClient: Redis, private readonly orderReadRepository: OrderReadRepository, ) {}
// Kafka 'order.created' 토픽 구독 → 읽기 모델 업데이트 @EventPattern("order.created") async handleOrderCreated(@Payload() event: OrderCreatedEvent): Promise<void> { // 1. Redis에 실시간 현황 캐싱 await this.redisClient.setex( `order:${event.orderId}`, 3600, // 1시간 TTL JSON.stringify({ status: "CREATED", ...event }), );
// 2. PostgreSQL 읽기 전용 뷰 테이블에 역정규화 저장 await this.orderReadRepository.upsert({ orderId: event.orderId, customerId: event.customerId, customerName: event.customerName, // 역정규화: JOIN 불필요 totalAmount: event.totalAmount, status: "CREATED", createdAt: event.occurredAt, }); }}
// 예상 동작:// order.created 이벤트 수신 → Redis 캐시 업데이트 (수ms)// → PostgreSQL 읽기 테이블 업데이트 (수십ms)// → GET /orders?customerId=xxx 조회 시 JOIN 없이 즉시 반환Kafka Offset 재설정으로 읽기 모델 재구성하기:
# 읽기 모델을 처음부터 다시 만들어야 할 때 (예: 새 컬럼 추가)# Consumer Group의 오프셋을 처음으로 리셋kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group order-projection-consumer \ --topic order.created \ --reset-offsets --to-earliest --execute
# 예상 출력:# GROUP TOPIC PARTITION NEW-OFFSET# order-projection-consumer order.created 0 0 ← 처음부터 재생# order-projection-consumer order.created 1 0# order-projection-consumer order.created 2 0# → Consumer 재시작하면 전체 이벤트를 처음부터 재처리해 읽기 모델 재구성📖 더 보기: Event Sourcing and CQRS with Apache Kafka — Confluent — Kafka를 Event Store로 활용하는 CQRS 구현 패턴. Projection Consumer 설계 방법 포함 (입문~중급)
3-3. CQRS + Event Sourcing 결합: WAL 기반 CDC 동기화
섹션 제목: “3-3. CQRS + Event Sourcing 결합: WAL 기반 CDC 동기화”CQRS와 Event Sourcing이 결합할 때 핵심 과제는 “쓰기 모델의 변경을 어떻게 읽기 모델에 전파하는가” 다. 애플리케이션 코드에서 직접 이벤트를 발행하는 방식(Outbox 패턴) 외에, DB 레이어에서 CDC(Change Data Capture)로 자동 동기화하는 방식도 널리 쓰인다.
WAL(Write-Ahead Log)의 상세 원리(Redo Log, Undo Log, MVCC)는 transaction-basics.md 를 참고하세요.
여기서는 WAL이 CQRS 아키텍처에서 어떻게 활용되는지, 즉 CDC 기반 읽기 모델 동기화 관점에 집중한다.
WAL 기반 CDC가 CQRS 읽기 모델 동기화에 쓰이는 방식:
PostgreSQL의 WAL은 모든 DB 변경을 순서대로 기록한 append-only 로그다. Debezium 같은 CDC 커넥터가 이 WAL을 읽어 변경 이벤트를 Kafka로 스트리밍하면, CQRS의 읽기 모델(Elasticsearch, Redis 등)이 이 이벤트를 구독해 자신의 뷰를 업데이트한다. 애플리케이션 코드에 동기화 로직을 넣지 않아도 DB 변경이 자동으로 읽기 모델에 전파된다.
PostgreSQL (쓰기 DB, WAL 생성) │ │ WAL logical decoding ▼Debezium (CDC 커넥터) — DB 변경을 이벤트 스트림으로 변환 │ ▼Kafka 토픽 (변경 이벤트) │ ├─ CQRS 읽기 모델 1: Elasticsearch (전문 검색) ├─ CQRS 읽기 모델 2: Redis (실시간 캐시) └─ CQRS 읽기 모델 3: 집계 테이블 (대시보드)이 구조는 쓰기 모델(정규화된 PostgreSQL)과 읽기 모델(역정규화된 각 뷰)의 동기화를 DB 레이어에서 자동으로 처리한다. 애플리케이션 Command Handler는 쓰기 DB에만 집중하면 되고, 읽기 모델 동기화는 CDC 파이프라인이 담당한다.
3-4. AWS + TypeORM 관점에서의 CQRS / Event Sourcing
섹션 제목: “3-4. AWS + TypeORM 관점에서의 CQRS / Event Sourcing”AWS RDS + TypeORM으로 CQRS 읽기/쓰기 DB 분리
섹션 제목: “AWS RDS + TypeORM으로 CQRS 읽기/쓰기 DB 분리”실제 프로덕션 환경에서 CQRS의 물리적 구현은 RDS Primary(쓰기) + Aurora Read Replica 또는 ElastiCache(읽기) 조합이 가장 일반적이다. CQRS 맥락에서 중요한 점은 Command Handler는 항상 Master(Primary)에, Query Handler는 항상 Replica(Reader)에 연결되도록 라우팅을 고정하는 것이다.
- Command 경로(쓰기): RDS Primary → 데이터 정합성 및 트랜잭션 보장
- Query 경로(읽기): Aurora Reader Endpoint → Auto Scaling으로 읽기 부하를 수평 분산
TypeORM의 replication 설정 코드 및 Aurora Reader Endpoint 활용법은 db-replication-sharding.md 를 참조하세요. CQRS 관점에서 이 설정의 핵심은 애플리케이션 코드 변경 없이 읽기 트래픽을 Replica로 자동 분산한다는 점이다.
TypeORM DataSource로 Command/Query Handler에서 트랜잭션 제어
섹션 제목: “TypeORM DataSource로 Command/Query Handler에서 트랜잭션 제어”CQRS Command Handler에서 Outbox 패턴을 TypeORM DataSource.transaction()과 결합하면 데이터 저장과 이벤트 발행을 원자적으로 보장할 수 있다.
import { CommandHandler, ICommandHandler } from "@nestjs/cqrs";import { InjectDataSource } from "@nestjs/typeorm";import { DataSource } from "typeorm";
@CommandHandler(CreateOrderCommand)export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> { constructor(@InjectDataSource() private readonly dataSource: DataSource) {}
async execute(command: CreateOrderCommand): Promise<string> { return this.dataSource.transaction(async (manager) => { // 1. 주문 저장 (Master DB) const order = manager.create(Order, { customerId: command.customerId, items: command.items, status: "PENDING", }); await manager.save(order);
// 2. Outbox 이벤트 저장 (같은 트랜잭션 — Master DB) await manager.insert(OutboxEvent, { aggregateId: order.id, aggregateType: "Order", eventType: "OrderCreated", payload: JSON.stringify({ orderId: order.id, customerId: command.customerId, }), occurredAt: new Date(), });
// 3. 트랜잭션 성공 → 둘 다 커밋 // 트랜잭션 실패 → 둘 다 롤백 (이벤트 유실 없음) return order.id; }); // OutboxProcessor가 별도로 이벤트를 폴링하여 읽기 모델(ElastiCache/Elasticsearch) 업데이트 }}예상 출력
POST /ordersBody: { "customerId": "user-42", "items": [{"productId": "p-1", "qty": 2}] }
→ Response: { "orderId": "uuid-xxx" }
[DB 내부] orders 테이블: id=uuid-xxx, status=PENDING ← Master DB에 저장 outbox_events: aggregateId=uuid-xxx, eventType=OrderCreated ← 같은 트랜잭션
[OutboxProcessor 실행 후] ElastiCache(Redis): order:uuid-xxx 캐시 갱신 ← 읽기 모델 업데이트AWS DynamoDB를 Event Store로 사용 (대규모 이벤트 처리)
섹션 제목: “AWS DynamoDB를 Event Store로 사용 (대규모 이벤트 처리)”대용량 이벤트(수억 건)를 처리해야 하거나 운영 부담을 최소화하고 싶을 때 DynamoDB를 Event Store로 사용할 수 있다. AWS SDK for JavaScript v3 기반 예시:
import { DynamoDBClient, PutItemCommand, QueryCommand,} from "@aws-sdk/client-dynamodb";import { marshall, unmarshall } from "@aws-sdk/util-dynamodb";
// DynamoDB Event Store 테이블 설계// PK: aggregateId (파티션 키)// SK: version (정렬 키) — 이벤트 순서 보장// GSI: eventType-occurredAt-index — 이벤트 타입별 조회
const client = new DynamoDBClient({ region: "ap-northeast-2" });
export class DynamoDBEventStore { private readonly tableName = process.env.EVENT_STORE_TABLE!; // "order-events"
async appendEvent(event: DomainEvent): Promise<void> { await client.send( new PutItemCommand({ TableName: this.tableName, Item: marshall({ aggregateId: event.aggregateId, version: event.version, eventType: event.type, payload: JSON.stringify(event), occurredAt: event.occurredAt.toISOString(), }), // 낙관적 동시성: version이 이미 존재하면 에러 (중복 이벤트 방지) ConditionExpression: "attribute_not_exists(version)", }), ); }
async getEvents( aggregateId: string, fromVersion = 0, ): Promise<DomainEvent[]> { const result = await client.send( new QueryCommand({ TableName: this.tableName, KeyConditionExpression: "aggregateId = :id AND version >= :v", ExpressionAttributeValues: marshall({ ":id": aggregateId, ":v": fromVersion, }), }), ); return (result.Items ?? []).map((item) => JSON.parse(unmarshall(item).payload), ); }}
// 예상 동작:// appendEvent({ aggregateId: "order-1", version: 1, type: "OrderCreated", ... })// → DynamoDB에 저장 성공// appendEvent({ aggregateId: "order-1", version: 1, type: "OrderCreated", ... })// → ConditionalCheckFailedException (중복 버전 방지)// getEvents("order-1") → [OrderCreated, OrderPaid, OrderShipped]DynamoDB Event Store의 장단점:
| 항목 | DynamoDB | PostgreSQL |
|---|---|---|
| 운영 부담 | 없음 (서버리스) | 높음 (백업, 튜닝) |
| 확장성 | 무제한 | 제한적 (수직 확장) |
| 이벤트 수 | 수십억 건 부담 없음 | 수억 건부터 관리 필요 |
| 트랜잭션 | DynamoDB Transactions 지원 (최대 100개 아이템) | 완전한 ACID |
| 비용 | 요청 수 기반 (대용량에서 비쌀 수 있음) | 인스턴스 크기 기반 |
📖 더 보기: GitHub - ocoda/event-sourcing — NestJS 전용 Event Sourcing 라이브러리. PostgreSQL/MongoDB/DynamoDB 백엔드를 선택적으로 사용 가능 (입문~중급)
4. 실무에서 어떻게 쓰이나
섹션 제목: “4. 실무에서 어떻게 쓰이나”CQRS 실무 적용
섹션 제목: “CQRS 실무 적용”- 대형 이커머스: 주문 생성(Command)과 주문 목록 조회(Query)를 분리. 조회 DB는 Elasticsearch로 전문 검색 최적화.
- 실시간 대시보드: Command로 데이터를 쓰고, 읽기 모델은 Redis에 캐싱해서 빠른 응답.
- 결제 시스템: Command로 결제를 처리하고, Query 모델에 영수증 조회용 역정규화 뷰를 별도 관리.
Event Sourcing 실무 적용
섹션 제목: “Event Sourcing 실무 적용”- 금융 거래: 모든 입출금을 이벤트로 저장. 어느 시점의 잔액도 재현 가능.
- 주문 상태 추적:
OrderPlaced → OrderPaid → OrderShipped → OrderDelivered이벤트 스트림. - 감사 로그(Audit Log): 누가 언제 무엇을 변경했는지 완벽하게 추적.
Event Store 선택 가이드
섹션 제목: “Event Store 선택 가이드”Event Sourcing을 도입할 때 이벤트를 어디에 저장할지 결정해야 한다. 주요 선택지와 비교:
| 저장소 | 특성 | 적합한 상황 |
|---|---|---|
| PostgreSQL | ACID, 친숙한 스키마, 트랜잭션 내 Outbox 패턴 가능 | 팀이 PostgreSQL에 익숙, 기존 인프라 활용 |
| EventStoreDB / KurrentDB | 이벤트 소싱에 최적화, Persistent Subscriptions, 스트림 단위 관리 | 이벤트 소싱이 핵심 아키텍처, 높은 이벤트 처리량 |
| DynamoDB | 서버리스, 무한 확장, AWS 네이티브 | AWS 환경, 운영 부담 최소화 |
| Kafka | 높은 처리량, 스트리밍, Consumer Group 기반 여러 Projection | 대용량 이벤트 스트리밍이 동시에 필요 |
EventStoreDB(KurrentDB) vs Kafka의 근본 차이:
두 기술은 경쟁 관계가 아니라 역할이 다르다.
EventStoreDB(KurrentDB): "데이터베이스" — 이벤트를 저장하고 조회하는 것이 핵심Kafka: "메시지 브로커" — 이벤트를 전달하고 스트리밍하는 것이 핵심
→ EventStoreDB에서 이벤트를 저장하고, Kafka로 외부 서비스에 전파하는 "함께 쓰는(hybrid)" 패턴이 실무에서 자주 사용된다.NestJS에서 EventStoreDB 연동:
// @nestjs/cqrs의 IEventPublisher를 커스터마이징해서// EventStoreDB에 이벤트를 영속화import { IEventPublisher, IMessageSource } from "@nestjs/cqrs";import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";
export class EventStorePublisher implements IEventPublisher { constructor(private readonly client: EventStoreDBClient) {}
async publish<T extends IEvent>(event: T): Promise<void> { const streamName = `${event.constructor.name}-${event["aggregateId"]}`; await this.client.appendToStream(streamName, [ jsonEvent({ type: event.constructor.name, data: event }), ]); }}// → EventBus.publisher를 위 클래스로 교체하면// 모든 이벤트가 자동으로 EventStoreDB에 영속화됨WAL 기반 CDC 실무 활용 (CQRS 맥락)
섹션 제목: “WAL 기반 CDC 실무 활용 (CQRS 맥락)”WAL의 내부 동작 원리(Redo/Undo, MVCC)는 transaction-basics.md 참조.
CQRS 맥락에서 WAL의 핵심 역할은 CDC(Change Data Capture) 파이프라인의 데이터 원천이다.
- Debezium + Kafka: Debezium이 PostgreSQL WAL을 읽어 변경 이벤트를 Kafka로 스트리밍 → CQRS 읽기 모델(Elasticsearch, Redis 등)이 자동 동기화
- 애플리케이션 코드 분리: Command Handler는 쓰기 DB에만 집중하고, 읽기 모델 동기화는 CDC 파이프라인이 담당 → 관심사 분리 극대화
5. 내 업무와의 연결고리
섹션 제목: “5. 내 업무와의 연결고리”BackOps 엔지니어 관점 (Nest.js / AWS)
@nestjs/cqrs모듈을 도입하면 기존 Service 중심 코드보다 Command/Query 경계가 명확해진다. 새 기능 추가 시 어디에 코드를 넣을지 고민이 줄어든다.- 운영 중인 서비스에서 특정 API가 DB에 너무 큰 부담을 준다면, CQRS로 읽기 모델을 분리하고 Redis 캐시를 Query 모델로 활용할 수 있다.
- AWS 환경에서는 RDS(쓰기) + ElastiCache or Aurora Read Replica(읽기) 조합이 CQRS의 물리적 구현이다.
- PostgreSQL WAL 기반의 CDC를 Debezium + MSK(Kafka)로 구성하면 마이크로서비스 간 데이터 동기화를 이벤트 기반으로 처리할 수 있다.
- 관리자 시스템에서 “누가 이 데이터를 언제 바꿨는가”를 추적해야 할 때 Event Sourcing의 감사 로그 기능이 직접적으로 필요하다.
6. 비슷한 개념과 비교
섹션 제목: “6. 비슷한 개념과 비교”CQRS vs 전통적인 CRUD
섹션 제목: “CQRS vs 전통적인 CRUD”| 항목 | 전통 CRUD | CQRS |
|---|---|---|
| 모델 | 읽기/쓰기 하나의 모델 | 읽기/쓰기 분리 |
| 복잡도 | 낮음 | 높음 |
| 최적화 | 어느 쪽도 최적화 어려움 | 각자 독립 최적화 |
| 적합한 규모 | 소규모 CRUD 앱 | 읽기/쓰기 불균형이 큰 서비스 |
Event Sourcing vs 전통적인 CRUD
섹션 제목: “Event Sourcing vs 전통적인 CRUD”| 항목 | 전통 CRUD | Event Sourcing |
|---|---|---|
| 저장 대상 | 현재 상태 | 이벤트 이력 전체 |
| 과거 상태 조회 | 불가능 | 가능 (이벤트 재생) |
| 저장 공간 | 적음 | 계속 증가 |
| 쿼리 | 간단 | 복잡 (재생 필요) |
| 적합한 도메인 | 일반적인 서비스 | 금융, 주문, 감사 로그 |
CQRS와 Event Sourcing의 관계
섹션 제목: “CQRS와 Event Sourcing의 관계”CQRS와 Event Sourcing은 별개의 패턴이지만 자주 함께 쓰인다.
- CQRS 없이 Event Sourcing만 쓸 수 있다. (이벤트로 상태 저장, 단일 모델로 조회)
- Event Sourcing 없이 CQRS만 쓸 수 있다. (쓰기 DB와 읽기 DB만 분리)
- 둘을 함께 쓰면 Event Sourcing의 이벤트가 자연스럽게 CQRS의 읽기 모델 동기화 트리거가 된다.
Saga 패턴과 CQRS — 분산 트랜잭션 처리
섹션 제목: “Saga 패턴과 CQRS — 분산 트랜잭션 처리”CQRS와 마이크로서비스를 함께 사용하면 분산 트랜잭션 문제가 발생한다. “주문 생성 → 결제 처리 → 재고 차감” 처럼 여러 서비스에 걸친 작업이 원자적으로 성공하거나 실패해야 할 때, 단일 DB 트랜잭션(2PC)은 마이크로서비스 환경에서 사용하기 어렵다.
Saga 패턴은 이 문제를 해결하는 대표적인 방법이다. 분산 트랜잭션을 일련의 로컬 트랜잭션과 보상 트랜잭션(Compensating Transaction) 으로 분해한다.
주문 Saga 예시:
성공 경로: 1. OrderService: 주문 생성 (OrderCreated 이벤트 발행) 2. PaymentService: 결제 처리 (PaymentCompleted 이벤트 발행) 3. InventoryService: 재고 차감 (InventoryReserved 이벤트 발행) 4. ShippingService: 배송 준비 (ShippingScheduled 이벤트 발행)
실패 경로 (재고 부족으로 3단계 실패): 1. OrderService: 주문 생성 ✅ 2. PaymentService: 결제 완료 ✅ 3. InventoryService: 재고 부족 ❌ → InventoryReservationFailed 이벤트 4. PaymentService: 결제 취소 (보상 트랜잭션) ← 보상 이벤트로 롤백 5. OrderService: 주문 취소 (보상 트랜잭션) ← 보상 이벤트로 롤백Saga에는 두 가지 구현 방식이 있다.
Choreography Saga (이벤트 기반): 각 서비스가 이벤트를 구독하고 직접 다음 단계를 처리한다. 중앙 조율자 없이 분산되어 결합도가 낮지만, 전체 흐름을 파악하기 어렵다.
Orchestration Saga (중앙 오케스트레이터): 하나의 Saga Orchestrator가 각 서비스에 Command를 보내고 결과를 기다린다. NestJS에서는 @nestjs/cqrs의 Saga 기능이 이 방식을 지원한다.
// NestJS Saga (Orchestration 방식)// Saga는 Observable 스트림으로 이벤트를 받아 다음 Command를 발행import { Saga, ICommand, ofType } from "@nestjs/cqrs";import { Observable } from "rxjs";import { map, filter } from "rxjs/operators";
@Injectable()export class OrderSaga { @Saga() orderCreated = (events$: Observable<any>): Observable<ICommand> => { return events$.pipe( ofType(OrderCreatedEvent), // OrderCreated 이벤트 감지 map((event) => new ProcessPaymentCommand(event.orderId, event.amount)), // → PaymentService에 결제 처리 Command 발행 ); };
@Saga() paymentCompleted = (events$: Observable<any>): Observable<ICommand> => { return events$.pipe( ofType(PaymentCompletedEvent), map((event) => new ReserveInventoryCommand(event.orderId, event.items)), ); };
@Saga() paymentFailed = (events$: Observable<any>): Observable<ICommand> => { return events$.pipe( ofType(PaymentFailedEvent), map((event) => new CancelOrderCommand(event.orderId)), // 보상 트랜잭션 ); };}CQRS와 Saga의 관계:
- Command가 로컬 트랜잭션을 수행하고 Domain Event를 발행한다
- Saga가 이벤트를 구독하고 다음 서비스에 Command를 발행한다
- 실패 시 보상 Command를 발행해 이미 성공한 단계를 되돌린다
📖 더 보기: NestJS CQRS 공식 문서 — Sagas — NestJS에서 Saga를 구현하는 공식 가이드. RxJS Observable 기반 Saga 패턴 설명 (입문)
6.5. 트러블슈팅
섹션 제목: “6.5. 트러블슈팅”문제 1: CQRS 읽기 모델 동기화 지연 (Eventual Consistency)
섹션 제목: “문제 1: CQRS 읽기 모델 동기화 지연 (Eventual Consistency)”증상
POST /tasks → 201 Created (태스크 생성 성공)GET /tasks → [] (빈 배열 반환 — 방금 만든 태스크가 안 보임)사용자가 데이터를 저장했는데 바로 조회하면 없다고 나오는 상황.
원인
Command가 쓰기 DB에 저장하고 이벤트를 발행한 뒤, 읽기 모델 업데이트는 비동기로 처리된다. 이벤트 핸들러가 읽기 DB를 업데이트하기 전에 클라이언트가 Query를 보내면 이전 상태가 반환된다.
해결 방법
- 클라이언트 측 낙관적 업데이트(Optimistic Update): 클라이언트가 응답을 받으면 로컬 상태에 즉시 반영하고, 다음 조회 시에는 서버 상태로 동기화.
- 일관성 보장이 필요한 경우 동기 처리: 이벤트 핸들러를 동기로 실행하거나, Command Handler 내에서 읽기 DB도 함께 업데이트.
- 클라이언트에게 지연 안내: “저장되었습니다. 반영까지 수초가 걸릴 수 있습니다.”
- 읽기 모델 폴링 또는 WebSocket 알림: 읽기 모델 업데이트 완료 시 클라이언트에 Push.
// 동기 처리 예시 (성능 vs 일관성 트레이드오프)@CommandHandler(CreateTaskCommand)export class CreateTaskHandler implements ICommandHandler<CreateTaskCommand> { async execute(command: CreateTaskCommand): Promise<void> { const task = await this.writeRepo.create(command); // 읽기 DB도 즉시 업데이트 (동기) await this.readRepo.upsert(task); // 이후 이벤트는 외부 시스템 알림용으로만 사용 this.eventBus.publish(new TaskCreatedEvent(task.id)); }}문제 2: 이벤트 스토어 크기 폭증
섹션 제목: “문제 2: 이벤트 스토어 크기 폭증”증상
ERROR: disk quota exceeded on event-store volume이벤트 스토어 DB 용량: 500GB (서비스 오픈 1년 만에)BankAccount.rehydrate() 응답 시간: 12초 (이벤트 수 과다)원인
Event Sourcing은 append-only 특성상 이벤트가 계속 쌓인다. 활성 사용자가 많고 이벤트 발생 빈도가 높으면 스토어 용량이 빠르게 증가한다. 또한 이벤트가 많아질수록 Aggregate 재수화(rehydrate) 시간이 선형으로 증가한다.
해결 방법
- 스냅샷(Snapshot) 도입: 일정 이벤트 수마다 현재 상태를 스냅샷으로 저장. 재수화 시 가장 최근 스냅샷 + 그 이후 이벤트만 재생.
class EventStore { async getEventsAfterSnapshot(aggregateId: string): Promise<{ snapshot: AggregateSnapshot | null; events: DomainEvent[]; }> { const snapshot = await this.snapshotRepo.findLatest(aggregateId); const events = await this.eventRepo.findAfter( aggregateId, snapshot?.version ?? 0, ); return { snapshot, events }; }}
// 스냅샷 생성 기준: 100개 이벤트마다if (aggregate.version % 100 === 0) { await snapshotRepo.save(aggregate.toSnapshot());}- 오래된 이벤트 아카이빙: 일정 기간(예: 2년) 이상의 이벤트를 S3 등 저비용 스토리지로 이동.
- 이벤트 압축(Compaction): 최종 결과가 동일한 연속 이벤트를 하나로 합치기.
문제 3: 이벤트 스키마 변경 — 하위 호환성 파손
섹션 제목: “문제 3: 이벤트 스키마 변경 — 하위 호환성 파손”증상
TypeError: Cannot read property 'recipientEmail' of undefined at OrderShippedEvent.apply (order.aggregate.ts:45)
// 기존 이벤트에는 'recipientEmail' 필드가 없었는데// 새 코드에서 이 필드를 필수로 읽으려 함원인
Event Sourcing의 이벤트는 영구 저장된다. 새 버전의 코드가 기존 이벤트를 재생할 때, 예전 이벤트에 없는 필드를 읽으려 하면 오류가 발생한다.
해결 방법
- 이벤트 버전 관리: 이벤트에 버전 필드 추가, Handler에서 버전별 분기 처리.
interface OrderShippedEvent { version: number; orderId: string; shippedAt: Date; // v2부터 추가 recipientEmail?: string;}
// Aggregate에서 버전별 처리private applyOrderShipped(event: OrderShippedEvent): void { this.status = 'SHIPPED'; this.shippedAt = event.shippedAt; // 하위 호환: v2 이상에만 존재하는 필드 if (event.version >= 2 && event.recipientEmail) { this.notificationEmail = event.recipientEmail; }}- Upcaster 패턴: 이벤트를 읽을 때 구버전을 최신 버전으로 변환하는 레이어.
- 필드 추가는 Optional로: 새 필드는 반드시 Optional(
?)로 추가. 기존 이벤트에는 해당 필드가 없을 수 있음을 항상 고려.
문제 4: CQRS에서 Command 실패 시 읽기 모델 불일치
섹션 제목: “문제 4: CQRS에서 Command 실패 시 읽기 모델 불일치”증상
POST /orders → 500 Internal Server Error (Command 실패)그런데 읽기 모델(Elasticsearch)에는 이미 주문 데이터가 반영됨→ 사용자가 주문 목록에서 존재하지 않는 주문을 보게 됨원인
Command Handler에서 쓰기 DB 저장과 이벤트 발행을 별개의 작업으로 처리하는 경우, 이벤트가 먼저 발행된 후 DB 저장이 실패하면 읽기 모델만 업데이트되는 불일치가 발생한다. 또는 반대로, DB 저장 후 이벤트 발행 전에 서버가 다운되면 읽기 모델이 업데이트되지 않는다.
해결 방법
// 핵심: Outbox 패턴으로 원자적 보장// DB 저장과 이벤트를 같은 트랜잭션에 묶는다
@CommandHandler(CreateOrderCommand)export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> { async execute(command: CreateOrderCommand): Promise<void> { await this.dataSource.transaction(async (manager) => { // 1. 도메인 데이터 저장 const order = manager.create(Order, { ...command }); await manager.save(order);
// 2. Outbox에 이벤트 저장 (같은 트랜잭션!) await manager.insert(Outbox, { aggregateId: order.id, eventType: "OrderCreated", payload: JSON.stringify(order), }); // → 트랜잭션 실패 시 둘 다 롤백 // → 트랜잭션 성공 시 둘 다 커밋 }); // 실제 이벤트 발행은 별도 OutboxProcessor가 폴링하여 처리 }}이 패턴의 핵심은 “이벤트 발행”을 직접 하지 않고 “이벤트를 DB에 저장”하는 것이다. DB 트랜잭션이 보장하므로 데이터와 이벤트가 항상 함께 존재하거나 함께 없다.
문제 5: TypeORM @VersionColumn 낙관적 락 충돌 — OptimisticLockVersionMismatchError
섹션 제목: “문제 5: TypeORM @VersionColumn 낙관적 락 충돌 — OptimisticLockVersionMismatchError”증상
OptimisticLockVersionMismatchError: The optimistic lock on entity Order failed, version 3 was expected, but is actually 4. at UpdateQueryBuilder.updateEntity (typeorm/query-builder/UpdateQueryBuilder.ts)동시 요청이 몰릴 때 간헐적으로 발생. 특히 마감 시간대(예: 공연 티켓 예매, 한정 수량 판매)에 집중적으로 나타난다.
원인
TypeORM @VersionColumn은 UPDATE 시 WHERE version = :expected_version 조건을 자동으로 추가한다. 두 트랜잭션이 같은 버전(3)을 읽고 동시에 업데이트를 시도하면, 먼저 성공한 트랜잭션이 버전을 4로 올리고, 나중 트랜잭션은 WHERE version = 3이 매칭되지 않아 에러가 발생한다.
확인 방법
@Entity()export class Order { @PrimaryGeneratedColumn("uuid") id: string;
@Column() status: string;
@VersionColumn() version: number; // TypeORM이 UPDATE마다 자동으로 1씩 증가}
// TypeORM이 내부적으로 실행하는 SQL:// UPDATE order SET status='PAID', version=4// WHERE id='xxx' AND version=3 ← 이 조건이 실패하면 에러 발생해결 방법
// 방법 1: 재시도 로직 (낙관적 락의 표준 패턴)@CommandHandler(PayOrderCommand)export class PayOrderHandler implements ICommandHandler<PayOrderCommand> { private readonly MAX_RETRIES = 3;
async execute(command: PayOrderCommand): Promise<void> { let attempt = 0; while (attempt < this.MAX_RETRIES) { try { const order = await this.orderRepository.findOneOrFail({ where: { id: command.orderId }, }); order.status = "PAID"; await this.orderRepository.save(order); // VersionColumn 자동 검증 return; } catch (e) { if ( e instanceof OptimisticLockVersionMismatchError && attempt < this.MAX_RETRIES - 1 ) { attempt++; await new Promise((resolve) => setTimeout(resolve, 50 * attempt)); // 지수 백오프 continue; } throw e; // 최대 재시도 초과 또는 다른 에러 } } }}
// 방법 2: 재고처럼 절대 음수가 안 되는 경우 비관적 락 사용const order = await this.orderRepository.findOne({ where: { id: command.orderId }, lock: { mode: "pessimistic_write" }, // SELECT ... FOR UPDATE});// → 한 트랜잭션이 끝날 때까지 다른 트랜잭션이 대기// → 충돌 자체를 방지하지만 동시성 처리량 감소예상 출력 (재시도 로직 적용 후)
1차 시도: OptimisticLockVersionMismatchError → 50ms 대기 후 재시도2차 시도: 성공 (다른 트랜잭션이 이미 완료됨)→ 사용자에게는 정상 응답 반환 (에러 없음)문제 6: NestJS CQRS + AWS Lambda — Cold Start 시 CommandBus 초기화 지연
섹션 제목: “문제 6: NestJS CQRS + AWS Lambda — Cold Start 시 CommandBus 초기화 지연”증상
Lambda Cold Start 시 첫 번째 API 요청 응답 시간: 8~12초그 이후 요청은 정상 (100~200ms)CloudWatch 로그: "NestFactory.create" 시간이 7초 이상원인
@nestjs/cqrs는 모듈 초기화 시 모든 CommandHandler, QueryHandler, EventHandler를 스캔하고 버스에 등록한다. Handler 수가 많아질수록(50개 이상) 초기화 시간이 증가한다. Lambda는 컨테이너를 재사용하지 않으면 매번 이 초기화를 반복한다.
해결 방법
// 방법 1: Lambda 워밍업 — CloudWatch Events로 주기적 호출// EventBridge Rule: 매 5분마다 /health 엔드포인트 호출// → 컨테이너 재사용 비율 증가 → Cold Start 빈도 감소
// 방법 2: Provisioned Concurrency 설정 (AWS Lambda)// 항상 N개의 실행 환경을 미리 초기화 상태로 유지// 비용은 증가하지만 Cold Start 완전 제거 가능
// 방법 3: Handler 지연 등록 — 초기화 비용 분산// 대신 NestJS의 LazyModuleLoader 사용import { LazyModuleLoader } from "@nestjs/core";
@Injectable()export class OrderCommandService { constructor(private lazyModuleLoader: LazyModuleLoader) {}
async processLargeOrder(command: ProcessLargeOrderCommand) { // 무거운 Handler 모듈은 실제 필요 시 로드 const { LargeOrderModule } = await import("./large-order.module"); const moduleRef = await this.lazyModuleLoader.load(() => LargeOrderModule); const handler = moduleRef.get(ProcessLargeOrderHandler); return handler.execute(command); }}
// 방법 4: ECS Fargate나 EC2로 마이그레이션 검토// CQRS 패턴의 무거운 Handler 초기화는 서버리스보다 상시 실행 환경에 더 적합CloudWatch 지표로 Cold Start 측정
CloudWatch Metrics → Lambda → FUNCTION_NAME → Init Duration→ Init Duration이 지속적으로 높으면 Handler 수 또는 의존성 최적화 필요
권장 임계값: Init Duration < 3초 → 정상 Init Duration 3~10초 → 최적화 권장 Init Duration > 10초 → 아키텍처 재검토 (ECS/EC2 고려)7. 체크리스트
섹션 제목: “7. 체크리스트”CQRS 이해 체크리스트
섹션 제목: “CQRS 이해 체크리스트”- Command와 Query의 차이를 한 문장으로 설명할 수 있다
-
@nestjs/cqrs의 CommandBus, QueryBus, EventBus의 역할을 구분할 수 있다 -
@CommandHandler,@QueryHandler데코레이터를 사용해 핸들러를 구현할 수 있다 - CQRS에서 읽기 모델과 쓰기 모델이 왜 분리되는지 설명할 수 있다
- Eventual Consistency가 무엇인지, CQRS에서 왜 발생하는지 설명할 수 있다
Event Sourcing 이해 체크리스트
섹션 제목: “Event Sourcing 이해 체크리스트”- Event Store가 왜 append-only인지 설명할 수 있다
- 이벤트 재생(Replay)으로 현재 상태를 복원하는 코드를 직접 작성할 수 있다
- 스냅샷이 왜 필요한지, 언제 사용하는지 설명할 수 있다
- CQRS와 Event Sourcing이 어떻게 결합되는지 설명할 수 있다
- 이벤트 스키마 변경 시 하위 호환성 문제와 해결 방법을 알고 있다
8. 핵심 키워드
섹션 제목: “8. 핵심 키워드”| 키워드 | 설명 |
|---|---|
| CQRS | Command Query Responsibility Segregation — 읽기/쓰기 책임 분리 |
| CommandBus | Command를 적절한 Handler로 라우팅하는 버스 |
| QueryBus | Query를 적절한 Handler로 라우팅하는 버스 |
| EventBus | 도메인 이벤트를 발행하고 Handler들에게 전달 |
| Event Sourcing | 상태 대신 이벤트 이력을 저장하는 패턴 |
| Event Store | 이벤트를 append-only로 저장하는 저장소 |
| Aggregate | 관련 도메인 객체의 묶음, 이벤트를 적용해 상태 관리 |
| Replay | 이벤트를 순서대로 재적용해 현재 상태를 복원 |
| Snapshot | 특정 시점의 Aggregate 상태를 저장해 재생 시간 단축 |
| Time Travel | 이벤트 재생으로 과거 임의 시점의 상태를 재현하는 기능 |
| Eventual Consistency | 즉각적 일관성 대신 결과적으로 일관성을 보장 |
| CDC | Change Data Capture — WAL 기반으로 DB 변경을 캡처해 읽기 모델에 전파 |
| WAL | Write-Ahead Log — CDC 파이프라인의 데이터 원천. 상세 원리는 transaction-basics.md 참조 |
8.5. 추천 리소스
섹션 제목: “8.5. 추천 리소스”📚 추천 리소스
섹션 제목: “📚 추천 리소스”- 📖 NestJS CQRS 공식 문서 —
@nestjs/cqrsCommandHandler, QueryHandler, EventHandler, Saga 기본 사용법 공식 가이드 (입문) - 📖 Microsoft Azure - CQRS Pattern — CQRS 적용 시나리오, 장단점, Event Sourcing과의 결합 방식을 클라우드 아키텍처 관점에서 설명 (입문)
- 📖 Martin Fowler - Event Sourcing — Event Sourcing 패턴의 원형 정의. “왜 상태가 아니라 이벤트를 저장하는가”에 대한 근본 철학 (중급)
- 📖 Designing Scalable Systems with CQRS and Event Sourcing — Outbox 패턴, 낙관적 동시성 제어, Snapshot 전략 등 프로덕션 수준 설계 패턴 종합 (중급)
- 📖 Event Sourcing and CQRS with Apache Kafka — Confluent — Kafka를 Event Store로 활용하는 CQRS 구현 패턴. Projection Consumer 설계 방법 포함 (입문~중급)
9. 직접 확인해보기
섹션 제목: “9. 직접 확인해보기”NestJS CQRS 모듈 설치 및 동작 확인
섹션 제목: “NestJS CQRS 모듈 설치 및 동작 확인”# 새 NestJS 프로젝트 생성npm i -g @nestjs/clinest new cqrs-demo && cd cqrs-demo
# CQRS 모듈 설치npm install @nestjs/cqrs
# 프로젝트 실행npm run start:dev예상 출력
[Nest] LOG Starting Nest application...[Nest] LOG TaskModule dependencies initialized +1ms[Nest] LOG Application is running on: http://[::1]:3000Event Sourcing 이벤트 재생 테스트
섹션 제목: “Event Sourcing 이벤트 재생 테스트”# TypeScript 코드 직접 실행 (ts-node 사용)npx ts-node --esm << 'EOF'// 간단한 Event Sourcing 테스트interface Event { type: string; amount?: number }
function rehydrate(events: Event[]): number { return events.reduce((balance, event) => { if (event.type === 'DEPOSIT') return balance + (event.amount ?? 0); if (event.type === 'WITHDRAW') return balance - (event.amount ?? 0); return balance; }, 0);}
const events: Event[] = [ { type: 'DEPOSIT', amount: 500000 }, { type: 'WITHDRAW', amount: 100000 }, { type: 'DEPOSIT', amount: 300000 },];
console.log('현재 잔액:', rehydrate(events), '원');// 시간 여행: 처음 2개 이벤트만 재생console.log('1월 5일 잔액:', rehydrate(events.slice(0, 2)), '원');EOF예상 출력
현재 잔액: 700000 원1월 5일 잔액: 400000 원CQRS API 테스트 (curl)
섹션 제목: “CQRS API 테스트 (curl)”# Command: 태스크 생성curl -X POST http://localhost:3000/tasks \ -H "Content-Type: application/json" \ -d '{"title": "CQRS 학습", "description": "패턴 이해", "userId": "user-1"}'
# Query: 태스크 목록 조회curl http://localhost:3000/tasks?userId=user-1예상 출력
// POST 응답{ "message": "태스크가 생성되었습니다." }
// GET 응답[ { "id": "550e8400-e29b-41d4-a716-446655440000", "title": "CQRS 학습", "status": "open", "createdAt": "2024-01-15T10:23:45.000Z" }]Outbox 테이블 확인 (PostgreSQL)
섹션 제목: “Outbox 테이블 확인 (PostgreSQL)”-- Outbox 테이블 생성 후 미발행 이벤트 확인 쿼리-- (CQRS + Outbox 패턴 운영 시 주기적으로 확인)
SELECT id, aggregate_type, event_type, created_at, published_at, CASE WHEN published_at IS NULL THEN '미발행' ELSE '발행 완료' END AS statusFROM outboxWHERE published_at IS NULLORDER BY created_at ASCLIMIT 10;
-- 예상 출력 (미발행 이벤트가 있는 경우):-- id | aggregate_type | event_type | created_at | published_at | status-- uuid-101 | Task | TaskCreated | 2026-04-07 10:23:45 | NULL | 미발행-- uuid-102 | Order | OrderPlaced | 2026-04-07 10:24:01 | NULL | 미발행-- → published_at이 NULL인 행이 계속 쌓이면 OutboxProcessor가 멈춘 것
-- 미발행 이벤트 수 추이 모니터링SELECT date_trunc('hour', created_at) AS hour, COUNT(*) FILTER (WHERE published_at IS NULL) AS pending, COUNT(*) FILTER (WHERE published_at IS NOT NULL) AS publishedFROM outboxWHERE created_at > NOW() - INTERVAL '24 hours'GROUP BY hourORDER BY hour DESC;10. 한 줄 요약
섹션 제목: “10. 한 줄 요약”CQRS는 읽기와 쓰기 창구를 분리해 각자 최적화하고, Event Sourcing은 현재 상태 대신 변경 이력(이벤트)을 쌓아 언제든 과거로 돌아갈 수 있게 하며, WAL은 데이터를 바꾸기 전에 로그를 먼저 써두어 서버가 죽어도 데이터를 잃지 않게 보장하는 세 가지 패턴은 모두 “신뢰할 수 있는 시스템”을 만들기 위한 핵심 설계 원칙이다.