CQRS 模式:Command 与 Query 的分离实现
CQRS(Command Query Responsibility Segregation)是一种架构模式,它将读操作(Query)和写操作(Command)分离,使系统能够独立优化读写性能、扩展性和复杂性处理。在复杂的业务场景中,CQRS 能够显著提高系统的可维护性和可扩展性。本文将深入探讨如何在 NestJS 中实现 CQRS 模式,使用 @nestjs/cqrs 实现事件溯源与命令总线。
1. CQRS 基础概念
1.1 什么是 CQRS?
CQRS 基于命令查询分离原则(Command Query Separation,CQS),将系统的读写操作完全分离:
typescript
// 传统 CRUD 模式
// 单一模型处理读写操作
class UserService {
createUser(dto: CreateUserDto): Promise<User> { /* 创建用户 */ }
updateUser(id: string, dto: UpdateUserDto): Promise<User> { /* 更新用户 */ }
getUserById(id: string): Promise<User> { /* 获取用户 */ }
getAllUsers(): Promise<User[]> { /* 获取所有用户 */ }
}
// CQRS 模式
// 命令模型处理写操作
class UserCommandService {
createUser(dto: CreateUserDto): Promise<void> { /* 创建用户 */ }
updateUser(id: string, dto: UpdateUserDto): Promise<void> { /* 更新用户 */ }
}
// 查询模型处理读操作
class UserQueryService {
getUserById(id: string): Promise<UserDto> { /* 获取用户 */ }
getAllUsers(): Promise<UserDto[]> { /* 获取所有用户 */ }
searchUsers(criteria: SearchCriteria): Promise<UserDto[]> { /* 搜索用户 */ }
}1.2 CQRS 的优势
typescript
// CQRS 的主要优势
// 1. 性能优化:读写操作可以独立优化
// 2. 可扩展性:读写服务可以独立扩展
// 3. 复杂性管理:复杂的业务逻辑可以更好地组织
// 4. 数据一致性:可以采用最终一致性模型
// 5. 团队协作:读写团队可以并行开发
// 使用场景
// - 复杂的业务逻辑
// - 读写性能要求差异很大
// - 需要高可扩展性
// - 需要复杂查询功能
// - 需要事件溯源2. @nestjs/cqrs 核心组件
2.1 命令总线(CommandBus)
typescript
// 命令总线示例
import { CommandBus, CommandHandler, ICommand, ICommandHandler } from '@nestjs/cqrs';
// 定义命令
export class CreateUserCommand implements ICommand {
constructor(
public readonly name: string,
public readonly email: string,
) {}
}
// 命令处理器
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(
private readonly userRepository: UserRepository,
private readonly eventBus: EventBus,
) {}
async execute(command: CreateUserCommand): Promise<void> {
// 执行业务逻辑
const user = new User(uuidv4(), command.name, command.email);
// 保存用户
await this.userRepository.save(user);
// 发布领域事件
this.eventBus.publish(new UserCreatedEvent(user.getId(), user.getName(), user.getEmail()));
}
}
// 使用命令总线
@Controller('users')
export class UserController {
constructor(private readonly commandBus: CommandBus) {}
@Post()
async createUser(@Body() createUserDto: CreateUserDto) {
const command = new CreateUserCommand(
createUserDto.name,
createUserDto.email,
);
await this.commandBus.execute(command);
return { message: 'User created successfully' };
}
}2.2 查询总线(QueryBus)
typescript
// 查询总线示例
import { QueryBus, IQuery, IQueryHandler, QueryHandler } from '@nestjs/cqrs';
// 定义查询
export class GetUserQuery implements IQuery {
constructor(
public readonly userId: string,
) {}
}
// 查询处理器
@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
constructor(private readonly userReadModelRepository: UserReadModelRepository) {}
async execute(query: GetUserQuery): Promise<UserDto> {
const user = await this.userReadModelRepository.findById(query.userId);
if (!user) {
throw new NotFoundException('User not found');
}
return user;
}
}
// 使用查询总线
@Controller('users')
export class UserController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Get(':id')
async getUser(@Param('id') id: string) {
const query = new GetUserQuery(id);
return await this.queryBus.execute(query);
}
}2.3 事件总线(EventBus)
typescript
// 事件总线示例
import { EventBus, EventsHandler, IEventHandler } from '@nestjs/cqrs';
// 定义事件
export class UserCreatedEvent {
constructor(
public readonly userId: string,
public readonly name: string,
public readonly email: string,
) {}
}
// 事件处理器
@EventsHandler(UserCreatedEvent)
export class UserCreatedHandler implements IEventHandler<UserCreatedEvent> {
constructor(private readonly emailService: EmailService) {}
async handle(event: UserCreatedEvent) {
// 发送欢迎邮件
await this.emailService.sendWelcomeEmail(event.email, event.name);
}
}
// 在命令处理器中发布事件
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(
private readonly userRepository: UserRepository,
private readonly eventBus: EventBus,
) {}
async execute(command: CreateUserCommand): Promise<void> {
const user = new User(uuidv4(), command.name, command.email);
await this.userRepository.save(user);
// 发布事件
this.eventBus.publish(new UserCreatedEvent(user.getId(), user.getName(), user.getEmail()));
}
}3. 完整的 CQRS 实现
3.1 用户管理 CQRS 实现
typescript
// user/commands/create-user.command.ts
export class CreateUserCommand {
constructor(
public readonly name: string,
public readonly email: string,
public readonly password: string,
) {}
}
// user/commands/create-user.handler.ts
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
constructor(
private readonly userRepository: UserRepository,
private readonly eventBus: EventBus,
private readonly passwordService: PasswordService,
) {}
async execute(command: CreateUserCommand): Promise<string> {
// 验证业务规则
const existingUser = await this.userRepository.findByEmail(command.email);
if (existingUser) {
throw new ConflictException('User with this email already exists');
}
// 处理密码
const hashedPassword = await this.passwordService.hash(command.password);
// 创建用户实体
const user = new User(
uuidv4(),
command.name,
command.email,
hashedPassword,
);
// 保存用户
await this.userRepository.save(user);
// 发布事件
this.eventBus.publish(new UserCreatedEvent(
user.getId(),
user.getName(),
user.getEmail(),
));
return user.getId();
}
}
// user/queries/get-user.query.ts
export class GetUserQuery {
constructor(
public readonly userId: string,
) {}
}
// user/queries/get-user.handler.ts
@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
constructor(private readonly userReadModelRepository: UserReadModelRepository) {}
async execute(query: GetUserQuery): Promise<UserDto> {
const user = await this.userReadModelRepository.findById(query.userId);
if (!user) {
throw new NotFoundException('User not found');
}
return user;
}
}
// user/events/user-created.event.ts
export class UserCreatedEvent {
constructor(
public readonly userId: string,
public readonly name: string,
public readonly email: string,
public readonly createdAt: Date = new Date(),
) {}
}
// user/events/user-created.handler.ts
@EventsHandler(UserCreatedEvent)
export class UserCreatedHandler implements IEventHandler<UserCreatedEvent> {
constructor(
private readonly emailService: EmailService,
private readonly userReadModelRepository: UserReadModelRepository,
) {}
async handle(event: UserCreatedEvent) {
// 发送欢迎邮件
await this.emailService.sendWelcomeEmail(event.email, event.name);
// 更新读模型
await this.userReadModelRepository.create({
id: event.userId,
name: event.name,
email: event.email,
createdAt: event.createdAt,
});
}
}3.2 控制器实现
typescript
// user/user.controller.ts
@Controller('users')
export class UserController {
constructor(
private readonly commandBus: CommandBus,
private readonly queryBus: QueryBus,
) {}
@Post()
async createUser(@Body() createUserDto: CreateUserDto) {
const command = new CreateUserCommand(
createUserDto.name,
createUserDto.email,
createUserDto.password,
);
const userId = await this.commandBus.execute(command);
return { id: userId, message: 'User created successfully' };
}
@Get(':id')
async getUser(@Param('id') id: string) {
const query = new GetUserQuery(id);
return await this.queryBus.execute(query);
}
@Get()
async getAllUsers(@Query() paginationDto: PaginationDto) {
const query = new GetAllUsersQuery(
paginationDto.page,
paginationDto.limit,
);
return await this.queryBus.execute(query);
}
}
// user/queries/get-all-users.query.ts
export class GetAllUsersQuery {
constructor(
public readonly page: number = 1,
public readonly limit: number = 10,
) {}
}
// user/queries/get-all-users.handler.ts
@QueryHandler(GetAllUsersQuery)
export class GetAllUsersHandler implements IQueryHandler<GetAllUsersQuery> {
constructor(private readonly userReadModelRepository: UserReadModelRepository) {}
async execute(query: GetAllUsersQuery): Promise<PaginatedResult<UserDto>> {
return await this.userReadModelRepository.findAll(query.page, query.limit);
}
}4. 读写模型分离
4.1 写模型(Write Model)
typescript
// 写模型:关注业务逻辑和数据一致性
// user/domain/entities/user.entity.ts
export class User {
private readonly id: string;
private name: string;
private email: string;
private password: string;
private isActive: boolean;
private createdAt: Date;
private updatedAt: Date;
constructor(
id: string,
name: string,
email: string,
password: string,
) {
this.id = id;
this.name = name;
this.email = email;
this.password = password;
this.isActive = true;
this.createdAt = new Date();
this.updatedAt = new Date();
}
// 业务行为
changePassword(oldPassword: string, newPassword: string, passwordService: PasswordService): void {
if (!passwordService.compare(oldPassword, this.password)) {
throw new Error('Invalid old password');
}
this.password = passwordService.hash(newPassword);
this.updatedAt = new Date();
}
deactivate(): void {
this.isActive = false;
this.updatedAt = new Date();
}
// Getter 方法
getId(): string {
return this.id;
}
getName(): string {
return this.name;
}
getEmail(): string {
return this.email;
}
getPassword(): string {
return this.password;
}
isActive(): boolean {
return this.isActive;
}
}4.2 读模型(Read Model)
typescript
// 读模型:优化查询性能
// user/infrastructure/read-models/user.read-model.ts
export interface UserDto {
id: string;
name: string;
email: string;
createdAt: Date;
isActive: boolean;
}
// user/infrastructure/repositories/user-read-model.repository.ts
@Injectable()
export class UserReadModelRepository {
constructor(private readonly prisma: PrismaService) {}
async findById(id: string): Promise<UserDto | null> {
return await this.prisma.userView.findUnique({
where: { id },
});
}
async findAll(page: number, limit: number): Promise<PaginatedResult<UserDto>> {
const [users, total] = await Promise.all([
this.prisma.userView.findMany({
skip: (page - 1) * limit,
take: limit,
orderBy: { createdAt: 'desc' },
}),
this.prisma.userView.count(),
]);
return {
data: users,
total,
page,
limit,
};
}
async searchByName(name: string): Promise<UserDto[]> {
return await this.prisma.userView.findMany({
where: {
name: {
contains: name,
mode: 'insensitive',
},
},
orderBy: { name: 'asc' },
});
}
async create(userData: Omit<UserDto, 'isActive'>): Promise<void> {
await this.prisma.userView.create({
data: {
...userData,
isActive: true,
},
});
}
async update(id: string, userData: Partial<UserDto>): Promise<void> {
await this.prisma.userView.update({
where: { id },
data: userData,
});
}
}5. 事件溯源实现
5.1 事件存储
typescript
// 事件存储实现
// shared/infrastructure/event-store/event.entity.ts
export class Event {
constructor(
public readonly id: string,
public readonly aggregateId: string,
public readonly eventType: string,
public readonly eventData: any,
public readonly timestamp: Date,
public readonly version: number,
) {}
}
// shared/infrastructure/event-store/event-store.repository.ts
@Injectable()
export class EventStoreRepository {
constructor(private readonly prisma: PrismaService) {}
async save(event: Event): Promise<void> {
await this.prisma.event.create({
data: {
id: event.id,
aggregateId: event.aggregateId,
eventType: event.eventType,
eventData: event.eventData,
timestamp: event.timestamp,
version: event.version,
},
});
}
async findByAggregateId(aggregateId: string): Promise<Event[]> {
return await this.prisma.event.findMany({
where: { aggregateId },
orderBy: { version: 'asc' },
});
}
async findEventsByType(eventType: string, after?: Date): Promise<Event[]> {
const where: any = { eventType };
if (after) {
where.timestamp = { gte: after };
}
return await this.prisma.event.findMany({
where,
orderBy: { timestamp: 'asc' },
});
}
}5.2 聚合根与事件溯源
typescript
// 聚合根基类
export abstract class AggregateRoot {
protected events: DomainEvent[] = [];
protected addEvent(event: DomainEvent): void {
this.events.push(event);
}
public getUncommittedEvents(): DomainEvent[] {
return [...this.events];
}
public clearUncommittedEvents(): void {
this.events = [];
}
public loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.apply(event, true);
}
}
protected apply(event: DomainEvent, isReplaying: boolean = false): void {
const handler = `on${this.getEventName(event)}`;
if (typeof this[handler] === 'function') {
this[handler](event);
}
if (!isReplaying) {
this.addEvent(event);
}
}
private getEventName(event: DomainEvent): string {
const constructorString = event.constructor.toString();
const className = constructorString.substr(9, constructorString.indexOf('(') - 9);
return className;
}
}
// 用户聚合根
export class UserAggregate extends AggregateRoot {
private id: string;
private name: string;
private email: string;
private isActive: boolean;
constructor() {
super();
}
public static create(id: string, name: string, email: string): UserAggregate {
const user = new UserAggregate();
user.apply(new UserCreatedEvent(id, name, email));
return user;
}
public changeName(name: string): void {
if (this.name !== name) {
this.apply(new UserNameChangedEvent(this.id, name));
}
}
public deactivate(): void {
if (this.isActive) {
this.apply(new UserDeactivatedEvent(this.id));
}
}
// 事件处理器
private onUserCreated(event: UserCreatedEvent): void {
this.id = event.userId;
this.name = event.name;
this.email = event.email;
this.isActive = true;
}
private onUserNameChanged(event: UserNameChangedEvent): void {
this.name = event.name;
}
private onUserDeactivated(event: UserDeactivatedEvent): void {
this.isActive = false;
}
// Getter 方法
getId(): string {
return this.id;
}
getName(): string {
return this.name;
}
getEmail(): string {
return this.email;
}
isActive(): boolean {
return this.isActive;
}
}6. CQRS 模块配置
6.1 模块注册
typescript
// user/user.module.ts
@Module({
imports: [
CqrsModule,
SharedModule,
],
controllers: [
UserController,
],
providers: [
// 命令处理器
CreateUserHandler,
UpdateUserHandler,
DeactivateUserHandler,
// 查询处理器
GetUserHandler,
GetAllUsersHandler,
SearchUsersHandler,
// 事件处理器
UserCreatedHandler,
UserNameChangedHandler,
UserDeactivatedHandler,
// 仓储
{
provide: 'UserRepository',
useClass: UserDatabaseRepository,
},
UserReadModelRepository,
],
})
export class UserModule {}
// app.module.ts
@Module({
imports: [
UserModule,
// 其他模块...
CqrsModule,
],
})
export class AppModule {}6.2 事件总线配置
typescript
// 事件总线配置
@Injectable()
export class ConfigurableEventBus extends EventBus {
constructor(
private readonly eventStore: EventStoreRepository,
private readonly logger: Logger,
) {
super();
}
async publish<T extends DomainEvent>(event: T): Promise<void> {
// 持久化事件
await this.eventStore.save(new Event(
uuidv4(),
event.aggregateId,
event.constructor.name,
event,
new Date(),
event.version,
));
// 记录日志
this.logger.log(`Event published: ${event.constructor.name}`);
// 发布事件
await super.publish(event);
}
}7. 总结
CQRS 模式在 NestJS 中的实现要点:
- 命令总线:处理写操作,确保业务逻辑的正确执行
- 查询总线:处理读操作,优化查询性能
- 事件总线:实现组件间松耦合通信
- 读写模型分离:针对不同需求优化数据模型
- 事件溯源:通过事件存储实现完整的审计和回放能力
CQRS 的优势:
- 性能优化:读写操作可以独立优化
- 可扩展性:读写服务可以独立扩展
- 复杂性管理:复杂的业务逻辑可以更好地组织
- 审计能力:通过事件存储实现完整的操作审计
- 最终一致性:通过事件机制实现最终一致性
通过合理应用 CQRS 模式,我们可以构建出:
- 高性能:读写操作独立优化
- 高可扩展性:服务可以按需扩展
- 高可维护性:清晰的职责分离
- 高可靠性:通过事件溯源实现数据恢复能力
在下一篇文章中,我们将探讨健康检查与 Terminus,了解如何暴露 /health 端点,检查数据库、Redis 是否可用。