Skip to content

CQRS 模式:Command 与 Query 的分离实现

CQRS(Command Query Responsibility Segregation)是一种架构模式,它将读操作(Query)和写操作(Command)分离,使系统能够独立优化读写性能、扩展性和复杂性处理。在复杂的业务场景中,CQRS 能够显著提高系统的可维护性和可扩展性。本文将深入探讨如何在 NestJS 中实现 CQRS 模式,使用 @nestjs/cqrs 实现事件溯源与命令总线。

1. CQRS 基础概念

1.1 什么是 CQRS?

CQRS 基于命令查询分离原则(Command Query Separation,CQS),将系统的读写操作完全分离:

typescript
// 传统 CRUD 模式
// 单一模型处理读写操作
class UserService {
  createUser(dto: CreateUserDto): Promise<User> { /* 创建用户 */ }
  updateUser(id: string, dto: UpdateUserDto): Promise<User> { /* 更新用户 */ }
  getUserById(id: string): Promise<User> { /* 获取用户 */ }
  getAllUsers(): Promise<User[]> { /* 获取所有用户 */ }
}

// CQRS 模式
// 命令模型处理写操作
class UserCommandService {
  createUser(dto: CreateUserDto): Promise<void> { /* 创建用户 */ }
  updateUser(id: string, dto: UpdateUserDto): Promise<void> { /* 更新用户 */ }
}

// 查询模型处理读操作
class UserQueryService {
  getUserById(id: string): Promise<UserDto> { /* 获取用户 */ }
  getAllUsers(): Promise<UserDto[]> { /* 获取所有用户 */ }
  searchUsers(criteria: SearchCriteria): Promise<UserDto[]> { /* 搜索用户 */ }
}

1.2 CQRS 的优势

typescript
// CQRS 的主要优势
// 1. 性能优化:读写操作可以独立优化
// 2. 可扩展性:读写服务可以独立扩展
// 3. 复杂性管理:复杂的业务逻辑可以更好地组织
// 4. 数据一致性:可以采用最终一致性模型
// 5. 团队协作:读写团队可以并行开发

// 使用场景
// - 复杂的业务逻辑
// - 读写性能要求差异很大
// - 需要高可扩展性
// - 需要复杂查询功能
// - 需要事件溯源

2. @nestjs/cqrs 核心组件

2.1 命令总线(CommandBus)

typescript
// 命令总线示例
import { CommandBus, CommandHandler, ICommand, ICommandHandler } from '@nestjs/cqrs';

// 定义命令
export class CreateUserCommand implements ICommand {
  constructor(
    public readonly name: string,
    public readonly email: string,
  ) {}
}

// 命令处理器
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(
    private readonly userRepository: UserRepository,
    private readonly eventBus: EventBus,
  ) {}
  
  async execute(command: CreateUserCommand): Promise<void> {
    // 执行业务逻辑
    const user = new User(uuidv4(), command.name, command.email);
    
    // 保存用户
    await this.userRepository.save(user);
    
    // 发布领域事件
    this.eventBus.publish(new UserCreatedEvent(user.getId(), user.getName(), user.getEmail()));
  }
}

// 使用命令总线
@Controller('users')
export class UserController {
  constructor(private readonly commandBus: CommandBus) {}
  
  @Post()
  async createUser(@Body() createUserDto: CreateUserDto) {
    const command = new CreateUserCommand(
      createUserDto.name,
      createUserDto.email,
    );
    
    await this.commandBus.execute(command);
    
    return { message: 'User created successfully' };
  }
}

2.2 查询总线(QueryBus)

typescript
// 查询总线示例
import { QueryBus, IQuery, IQueryHandler, QueryHandler } from '@nestjs/cqrs';

// 定义查询
export class GetUserQuery implements IQuery {
  constructor(
    public readonly userId: string,
  ) {}
}

// 查询处理器
@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
  constructor(private readonly userReadModelRepository: UserReadModelRepository) {}
  
  async execute(query: GetUserQuery): Promise<UserDto> {
    const user = await this.userReadModelRepository.findById(query.userId);
    if (!user) {
      throw new NotFoundException('User not found');
    }
    
    return user;
  }
}

// 使用查询总线
@Controller('users')
export class UserController {
  constructor(
    private readonly commandBus: CommandBus,
    private readonly queryBus: QueryBus,
  ) {}
  
  @Get(':id')
  async getUser(@Param('id') id: string) {
    const query = new GetUserQuery(id);
    return await this.queryBus.execute(query);
  }
}

2.3 事件总线(EventBus)

typescript
// 事件总线示例
import { EventBus, EventsHandler, IEventHandler } from '@nestjs/cqrs';

// 定义事件
export class UserCreatedEvent {
  constructor(
    public readonly userId: string,
    public readonly name: string,
    public readonly email: string,
  ) {}
}

// 事件处理器
@EventsHandler(UserCreatedEvent)
export class UserCreatedHandler implements IEventHandler<UserCreatedEvent> {
  constructor(private readonly emailService: EmailService) {}
  
  async handle(event: UserCreatedEvent) {
    // 发送欢迎邮件
    await this.emailService.sendWelcomeEmail(event.email, event.name);
  }
}

// 在命令处理器中发布事件
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(
    private readonly userRepository: UserRepository,
    private readonly eventBus: EventBus,
  ) {}
  
  async execute(command: CreateUserCommand): Promise<void> {
    const user = new User(uuidv4(), command.name, command.email);
    await this.userRepository.save(user);
    
    // 发布事件
    this.eventBus.publish(new UserCreatedEvent(user.getId(), user.getName(), user.getEmail()));
  }
}

3. 完整的 CQRS 实现

3.1 用户管理 CQRS 实现

typescript
// user/commands/create-user.command.ts
export class CreateUserCommand {
  constructor(
    public readonly name: string,
    public readonly email: string,
    public readonly password: string,
  ) {}
}

// user/commands/create-user.handler.ts
@CommandHandler(CreateUserCommand)
export class CreateUserHandler implements ICommandHandler<CreateUserCommand> {
  constructor(
    private readonly userRepository: UserRepository,
    private readonly eventBus: EventBus,
    private readonly passwordService: PasswordService,
  ) {}
  
  async execute(command: CreateUserCommand): Promise<string> {
    // 验证业务规则
    const existingUser = await this.userRepository.findByEmail(command.email);
    if (existingUser) {
      throw new ConflictException('User with this email already exists');
    }
    
    // 处理密码
    const hashedPassword = await this.passwordService.hash(command.password);
    
    // 创建用户实体
    const user = new User(
      uuidv4(),
      command.name,
      command.email,
      hashedPassword,
    );
    
    // 保存用户
    await this.userRepository.save(user);
    
    // 发布事件
    this.eventBus.publish(new UserCreatedEvent(
      user.getId(),
      user.getName(),
      user.getEmail(),
    ));
    
    return user.getId();
  }
}

// user/queries/get-user.query.ts
export class GetUserQuery {
  constructor(
    public readonly userId: string,
  ) {}
}

// user/queries/get-user.handler.ts
@QueryHandler(GetUserQuery)
export class GetUserHandler implements IQueryHandler<GetUserQuery> {
  constructor(private readonly userReadModelRepository: UserReadModelRepository) {}
  
  async execute(query: GetUserQuery): Promise<UserDto> {
    const user = await this.userReadModelRepository.findById(query.userId);
    if (!user) {
      throw new NotFoundException('User not found');
    }
    
    return user;
  }
}

// user/events/user-created.event.ts
export class UserCreatedEvent {
  constructor(
    public readonly userId: string,
    public readonly name: string,
    public readonly email: string,
    public readonly createdAt: Date = new Date(),
  ) {}
}

// user/events/user-created.handler.ts
@EventsHandler(UserCreatedEvent)
export class UserCreatedHandler implements IEventHandler<UserCreatedEvent> {
  constructor(
    private readonly emailService: EmailService,
    private readonly userReadModelRepository: UserReadModelRepository,
  ) {}
  
  async handle(event: UserCreatedEvent) {
    // 发送欢迎邮件
    await this.emailService.sendWelcomeEmail(event.email, event.name);
    
    // 更新读模型
    await this.userReadModelRepository.create({
      id: event.userId,
      name: event.name,
      email: event.email,
      createdAt: event.createdAt,
    });
  }
}

3.2 控制器实现

typescript
// user/user.controller.ts
@Controller('users')
export class UserController {
  constructor(
    private readonly commandBus: CommandBus,
    private readonly queryBus: QueryBus,
  ) {}
  
  @Post()
  async createUser(@Body() createUserDto: CreateUserDto) {
    const command = new CreateUserCommand(
      createUserDto.name,
      createUserDto.email,
      createUserDto.password,
    );
    
    const userId = await this.commandBus.execute(command);
    
    return { id: userId, message: 'User created successfully' };
  }
  
  @Get(':id')
  async getUser(@Param('id') id: string) {
    const query = new GetUserQuery(id);
    return await this.queryBus.execute(query);
  }
  
  @Get()
  async getAllUsers(@Query() paginationDto: PaginationDto) {
    const query = new GetAllUsersQuery(
      paginationDto.page,
      paginationDto.limit,
    );
    return await this.queryBus.execute(query);
  }
}

// user/queries/get-all-users.query.ts
export class GetAllUsersQuery {
  constructor(
    public readonly page: number = 1,
    public readonly limit: number = 10,
  ) {}
}

// user/queries/get-all-users.handler.ts
@QueryHandler(GetAllUsersQuery)
export class GetAllUsersHandler implements IQueryHandler<GetAllUsersQuery> {
  constructor(private readonly userReadModelRepository: UserReadModelRepository) {}
  
  async execute(query: GetAllUsersQuery): Promise<PaginatedResult<UserDto>> {
    return await this.userReadModelRepository.findAll(query.page, query.limit);
  }
}

4. 读写模型分离

4.1 写模型(Write Model)

typescript
// 写模型:关注业务逻辑和数据一致性
// user/domain/entities/user.entity.ts
export class User {
  private readonly id: string;
  private name: string;
  private email: string;
  private password: string;
  private isActive: boolean;
  private createdAt: Date;
  private updatedAt: Date;
  
  constructor(
    id: string,
    name: string,
    email: string,
    password: string,
  ) {
    this.id = id;
    this.name = name;
    this.email = email;
    this.password = password;
    this.isActive = true;
    this.createdAt = new Date();
    this.updatedAt = new Date();
  }
  
  // 业务行为
  changePassword(oldPassword: string, newPassword: string, passwordService: PasswordService): void {
    if (!passwordService.compare(oldPassword, this.password)) {
      throw new Error('Invalid old password');
    }
    
    this.password = passwordService.hash(newPassword);
    this.updatedAt = new Date();
  }
  
  deactivate(): void {
    this.isActive = false;
    this.updatedAt = new Date();
  }
  
  // Getter 方法
  getId(): string {
    return this.id;
  }
  
  getName(): string {
    return this.name;
  }
  
  getEmail(): string {
    return this.email;
  }
  
  getPassword(): string {
    return this.password;
  }
  
  isActive(): boolean {
    return this.isActive;
  }
}

4.2 读模型(Read Model)

typescript
// 读模型:优化查询性能
// user/infrastructure/read-models/user.read-model.ts
export interface UserDto {
  id: string;
  name: string;
  email: string;
  createdAt: Date;
  isActive: boolean;
}

// user/infrastructure/repositories/user-read-model.repository.ts
@Injectable()
export class UserReadModelRepository {
  constructor(private readonly prisma: PrismaService) {}
  
  async findById(id: string): Promise<UserDto | null> {
    return await this.prisma.userView.findUnique({
      where: { id },
    });
  }
  
  async findAll(page: number, limit: number): Promise<PaginatedResult<UserDto>> {
    const [users, total] = await Promise.all([
      this.prisma.userView.findMany({
        skip: (page - 1) * limit,
        take: limit,
        orderBy: { createdAt: 'desc' },
      }),
      this.prisma.userView.count(),
    ]);
    
    return {
      data: users,
      total,
      page,
      limit,
    };
  }
  
  async searchByName(name: string): Promise<UserDto[]> {
    return await this.prisma.userView.findMany({
      where: {
        name: {
          contains: name,
          mode: 'insensitive',
        },
      },
      orderBy: { name: 'asc' },
    });
  }
  
  async create(userData: Omit<UserDto, 'isActive'>): Promise<void> {
    await this.prisma.userView.create({
      data: {
        ...userData,
        isActive: true,
      },
    });
  }
  
  async update(id: string, userData: Partial<UserDto>): Promise<void> {
    await this.prisma.userView.update({
      where: { id },
      data: userData,
    });
  }
}

5. 事件溯源实现

5.1 事件存储

typescript
// 事件存储实现
// shared/infrastructure/event-store/event.entity.ts
export class Event {
  constructor(
    public readonly id: string,
    public readonly aggregateId: string,
    public readonly eventType: string,
    public readonly eventData: any,
    public readonly timestamp: Date,
    public readonly version: number,
  ) {}
}

// shared/infrastructure/event-store/event-store.repository.ts
@Injectable()
export class EventStoreRepository {
  constructor(private readonly prisma: PrismaService) {}
  
  async save(event: Event): Promise<void> {
    await this.prisma.event.create({
      data: {
        id: event.id,
        aggregateId: event.aggregateId,
        eventType: event.eventType,
        eventData: event.eventData,
        timestamp: event.timestamp,
        version: event.version,
      },
    });
  }
  
  async findByAggregateId(aggregateId: string): Promise<Event[]> {
    return await this.prisma.event.findMany({
      where: { aggregateId },
      orderBy: { version: 'asc' },
    });
  }
  
  async findEventsByType(eventType: string, after?: Date): Promise<Event[]> {
    const where: any = { eventType };
    if (after) {
      where.timestamp = { gte: after };
    }
    
    return await this.prisma.event.findMany({
      where,
      orderBy: { timestamp: 'asc' },
    });
  }
}

5.2 聚合根与事件溯源

typescript
// 聚合根基类
export abstract class AggregateRoot {
  protected events: DomainEvent[] = [];
  
  protected addEvent(event: DomainEvent): void {
    this.events.push(event);
  }
  
  public getUncommittedEvents(): DomainEvent[] {
    return [...this.events];
  }
  
  public clearUncommittedEvents(): void {
    this.events = [];
  }
  
  public loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.apply(event, true);
    }
  }
  
  protected apply(event: DomainEvent, isReplaying: boolean = false): void {
    const handler = `on${this.getEventName(event)}`;
    if (typeof this[handler] === 'function') {
      this[handler](event);
    }
    
    if (!isReplaying) {
      this.addEvent(event);
    }
  }
  
  private getEventName(event: DomainEvent): string {
    const constructorString = event.constructor.toString();
    const className = constructorString.substr(9, constructorString.indexOf('(') - 9);
    return className;
  }
}

// 用户聚合根
export class UserAggregate extends AggregateRoot {
  private id: string;
  private name: string;
  private email: string;
  private isActive: boolean;
  
  constructor() {
    super();
  }
  
  public static create(id: string, name: string, email: string): UserAggregate {
    const user = new UserAggregate();
    user.apply(new UserCreatedEvent(id, name, email));
    return user;
  }
  
  public changeName(name: string): void {
    if (this.name !== name) {
      this.apply(new UserNameChangedEvent(this.id, name));
    }
  }
  
  public deactivate(): void {
    if (this.isActive) {
      this.apply(new UserDeactivatedEvent(this.id));
    }
  }
  
  // 事件处理器
  private onUserCreated(event: UserCreatedEvent): void {
    this.id = event.userId;
    this.name = event.name;
    this.email = event.email;
    this.isActive = true;
  }
  
  private onUserNameChanged(event: UserNameChangedEvent): void {
    this.name = event.name;
  }
  
  private onUserDeactivated(event: UserDeactivatedEvent): void {
    this.isActive = false;
  }
  
  // Getter 方法
  getId(): string {
    return this.id;
  }
  
  getName(): string {
    return this.name;
  }
  
  getEmail(): string {
    return this.email;
  }
  
  isActive(): boolean {
    return this.isActive;
  }
}

6. CQRS 模块配置

6.1 模块注册

typescript
// user/user.module.ts
@Module({
  imports: [
    CqrsModule,
    SharedModule,
  ],
  controllers: [
    UserController,
  ],
  providers: [
    // 命令处理器
    CreateUserHandler,
    UpdateUserHandler,
    DeactivateUserHandler,
    
    // 查询处理器
    GetUserHandler,
    GetAllUsersHandler,
    SearchUsersHandler,
    
    // 事件处理器
    UserCreatedHandler,
    UserNameChangedHandler,
    UserDeactivatedHandler,
    
    // 仓储
    {
      provide: 'UserRepository',
      useClass: UserDatabaseRepository,
    },
    UserReadModelRepository,
  ],
})
export class UserModule {}

// app.module.ts
@Module({
  imports: [
    UserModule,
    // 其他模块...
    CqrsModule,
  ],
})
export class AppModule {}

6.2 事件总线配置

typescript
// 事件总线配置
@Injectable()
export class ConfigurableEventBus extends EventBus {
  constructor(
    private readonly eventStore: EventStoreRepository,
    private readonly logger: Logger,
  ) {
    super();
  }
  
  async publish<T extends DomainEvent>(event: T): Promise<void> {
    // 持久化事件
    await this.eventStore.save(new Event(
      uuidv4(),
      event.aggregateId,
      event.constructor.name,
      event,
      new Date(),
      event.version,
    ));
    
    // 记录日志
    this.logger.log(`Event published: ${event.constructor.name}`);
    
    // 发布事件
    await super.publish(event);
  }
}

7. 总结

CQRS 模式在 NestJS 中的实现要点:

  1. 命令总线:处理写操作,确保业务逻辑的正确执行
  2. 查询总线:处理读操作,优化查询性能
  3. 事件总线:实现组件间松耦合通信
  4. 读写模型分离:针对不同需求优化数据模型
  5. 事件溯源:通过事件存储实现完整的审计和回放能力

CQRS 的优势:

  1. 性能优化:读写操作可以独立优化
  2. 可扩展性:读写服务可以独立扩展
  3. 复杂性管理:复杂的业务逻辑可以更好地组织
  4. 审计能力:通过事件存储实现完整的操作审计
  5. 最终一致性:通过事件机制实现最终一致性

通过合理应用 CQRS 模式,我们可以构建出:

  1. 高性能:读写操作独立优化
  2. 高可扩展性:服务可以按需扩展
  3. 高可维护性:清晰的职责分离
  4. 高可靠性:通过事件溯源实现数据恢复能力

在下一篇文章中,我们将探讨健康检查与 Terminus,了解如何暴露 /health 端点,检查数据库、Redis 是否可用。