Манитон Docs
Reference

Market Service - Реализация

Детальная реализация Market Service

Market Service - Реализация

Обзор

Market Service — микросервис на NestJS для управления торговлей, ордербуком и матчингом. Отвечает за размещение ордеров, исполнение сделок и DvP settlement.

Структура проекта

apps/services/market/src/
├── app/
│   ├── app.controller.ts
│   └── app.service.ts
├── app.module.ts
├── config/
│   └── config.module.ts
├── domains/
│   └── trading/
│       ├── application/
│       │   └── use-cases/
│       ├── domain/
│       │   ├── repositories/
│       │   ├── services/
│       │   └── entities/
│       └── presentation/
│           └── market-grpc.gateway.ts
├── infrastructure/
│   ├── database/
│   ├── kafka/
│   ├── redis/
│   └── logging/
├── main.ts
└── shared/

Модули

AppModule

// app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { DrizzleModule } from '@maniton/nestjs-common/database';
import { KafkaModule } from '@maniton/nestjs-common/kafka';
import { RedisModule } from '@maniton/nestjs-common/redis';
import { LoggingModule } from '@maniton/nestjs-common/logging';
import { TradingModule } from './domains/trading/trading.module';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      envFilePath: ['.env.local', '.env'],
    }),
    DrizzleModule.forRoot({
      url: process.env.DATABASE_URL,
    }),
    KafkaModule.forRoot({
      brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
      clientId: 'market-service',
    }),
    RedisModule.forRoot({
      url: process.env.REDIS_URL || 'redis://localhost:6379',
    }),
    LoggingModule.forRoot({
      level: process.env.LOG_LEVEL || 'info',
    }),
    TradingModule,
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Use Cases

PlaceOrderUseCase

Размещение ордера.

// domains/trading/application/use-cases/place-order.use-case.ts
import { Injectable } from '@nestjs/common';
import { OrderRepository } from '../../domain/repositories/order.repository';
import { MatchingEngine } from '../../domain/services/matching-engine';
import { EventPublisher } from '../../domain/services/event-publisher';
import { OrderSide, OrderType, TimeInForce } from '@maniton/contracts/gen/ts/maniton/common/v1/common_pb';

@Injectable()
export class PlaceOrderUseCase {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly matchingEngine: MatchingEngine,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(request: {
    context: any;
    userId: string;
    instrumentId: string;
    side: OrderSide;
    type: OrderType;
    price?: { amount: string; currencyCode: string };
    quantity: number;
    timeInForce: TimeInForce;
  }): Promise<any> {
    const { context, userId, instrumentId, side, type, price, quantity, timeInForce } = request;

    // 1. Создание ордера
    const order = await this.orderRepository.create({
      userId,
      instrumentId,
      side,
      type,
      price,
      quantity,
      timeInForce,
      status: 'OPEN',
      createdAt: new Date(),
    });

    // 2. Добавление в matching engine
    this.matchingEngine.addOrder(order);

    // 3. Попытка матчинга
    const trades = this.matchingEngine.match(instrumentId);

    // 4. Исполнение сделок
    for (const trade of trades) {
      await this.executeTrade(trade);
    }

    // 5. Публикация события
    await this.eventPublisher.publish({
      type: 'OrderPlaced',
      payload: order,
    });

    return { order, trades };
  }

  private async executeTrade(trade: any): Promise<void> {
    // Создание сделки
    await this.tradeRepository.create({
      ...trade,
      createdAt: new Date(),
    });

    // Публикация события
    await this.eventPublisher.publish({
      type: 'TradeExecuted',
      payload: trade,
    });
  }
}

CancelOrderUseCase

Отмена ордера.

// domains/trading/application/use-cases/cancel-order.use-case.ts
import { Injectable } from '@nestjs/common';
import { OrderRepository } from '../../domain/repositories/order.repository';
import { MatchingEngine } from '../../domain/services/matching-engine';
import { EventPublisher } from '../../domain/services/event-publisher';

@Injectable()
export class CancelOrderUseCase {
  constructor(
    private readonly orderRepository: OrderRepository,
    private readonly matchingEngine: MatchingEngine,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(orderId: string): Promise<void> {
    const order = await this.orderRepository.findById(orderId);

    if (!order) {
      throw new Error('Order not found');
    }

    if (order.status !== 'OPEN') {
      throw new Error('Order is not open');
    }

    // Удаление из matching engine
    this.matchingEngine.removeOrder(orderId);

    // Обновление статуса
    await this.orderRepository.update(orderId, {
      status: 'CANCELLED',
      cancelledAt: new Date(),
    });

    // Публикация события
    await this.eventPublisher.publish({
      type: 'OrderCancelled',
      payload: order,
    });
  }
}

Connect RPC Gateway

MarketGrpcGateway

// domains/trading/presentation/market-grpc.gateway.ts
import { ConnectGateway, ConnectRouter } from '@maniton/nestjs-common/connect';
import { MarketService } from '@maniton/contracts/gen/ts/maniton/market/v1/market_pb';
import type { MarketUseCases } from './market-grpc.gateway';

export class MarketGrpcGateway extends ConnectGateway {
  protected readonly options = {
    requestPathPrefix: '/market-service',
  };

  protected registerRoutes(router: ConnectRouter): void {
    router.service(MarketService, {
      placeOrder: (req) => this.useCases.placeOrder.execute(req),
      cancelOrder: (req) => this.useCases.cancelOrder.execute(req),
      getOrder: (req) => this.useCases.getOrder.execute(req),
      listOrders: (req) => this.useCases.listOrders.execute(req),
      listTrades: (req) => this.useCases.listTrades.execute(req),
      getOrderBook: (req) => this.useCases.getOrderBook.execute(req),
    });
  }
}

Сервисы

MatchingEngine

// domains/trading/domain/services/matching-engine.ts
import { Injectable } from '@nestjs/common';
import { PriorityQueue } from 'priorityqueue';

interface Order {
  id: string;
  userId: string;
  instrumentId: string;
  side: 'BUY' | 'SELL';
  price: number;
  quantity: number;
  status: 'OPEN' | 'FILLED' | 'CANCELLED';
}

@Injectable()
export class MatchingEngine {
  private orderBooks: Map<string, {
    bids: PriorityQueue<Order>;
    asks: PriorityQueue<Order>;
  }> = new Map();

  getOrderBook(instrumentId: string) {
    if (!this.orderBooks.has(instrumentId)) {
      this.orderBooks.set(instrumentId, {
        bids: new PriorityQueue((a, b) => b.price - a.price),
        asks: new PriorityQueue((a, b) => a.price - b.price),
      });
    }

    return this.orderBooks.get(instrumentId);
  }

  addOrder(order: Order): void {
    const book = this.getOrderBook(order.instrumentId);

    if (order.side === 'BUY') {
      book.bids.push(order);
    } else {
      book.asks.push(order);
    }
  }

  removeOrder(orderId: string): void {
    for (const [instrumentId, book] of this.orderBooks.entries()) {
      // Удаление из bids
      const bids = book.bids.toArray();
      for (const bid of bids) {
        if (bid.id === orderId) {
          book.bids.remove(bid);
          break;
        }
      }

      // Удаление из asks
      const asks = book.asks.toArray();
      for (const ask of asks) {
        if (ask.id === orderId) {
          book.asks.remove(ask);
          break;
        }
      }
    }
  }

  match(instrumentId: string): any[] {
    const book = this.getOrderBook(instrumentId);
    const trades: any[] = [];

    while (book.bids.peek() && book.asks.peek()) {
      const bestBid = book.bids.peek();
      const bestAsk = book.asks.peek();

      if (bestBid.price >= bestAsk.price) {
        // Исполнение сделки
        const quantity = Math.min(bestBid.quantity, bestAsk.quantity);
        const price = bestAsk.price;

        trades.push({
          buyerId: bestBid.userId,
          sellerId: bestAsk.userId,
          instrumentId,
          quantity,
          price,
          timestamp: new Date(),
        });

        // Обновление количества
        bestBid.quantity -= quantity;
        bestAsk.quantity -= quantity;

        // Удаление полностью исполненных ордеров
        if (bestBid.quantity === 0) {
          book.bids.pop();
        }

        if (bestAsk.quantity === 0) {
          book.asks.pop();
        }
      } else {
        break;
      }
    }

    return trades;
  }
}

OrderBookStorage

// domains/trading/domain/services/order-book-storage.ts
import { Injectable } from '@nestjs/common';
import { Redis } from 'ioredis';

@Injectable()
export class OrderBookStorage {
  constructor(private readonly redis: Redis) {}

  async saveSnapshot(instrumentId: string, snapshot: any): Promise<void> {
    const key = `orderbook:${instrumentId}`;
    await this.redis.setex(key, 3600, JSON.stringify(snapshot));
  }

  async getSnapshot(instrumentId: string): Promise<any | null> {
    const key = `orderbook:${instrumentId}`;
    const data = await this.redis.get(key);
    return data ? JSON.parse(data) : null;
  }

  async publishUpdate(instrumentId: string, snapshot: any): Promise<void> {
    await this.redis.publish(`orderbook:${instrumentId}`, JSON.stringify(snapshot));
  }
}

Репозитории

OrderRepository

// domains/trading/domain/repositories/order.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { orders } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';

@Injectable()
export class OrderRepository {
  constructor(private readonly drizzle: DrizzleService) {}

  async findById(id: string) {
    const [order] = await this.drizzle.db
      .select()
      .from(orders)
      .where(eq(orders.id, id))
      .limit(1);

    return order;
  }

  async create(data: typeof orders.$inferInsert) {
    const [order] = await this.drizzle.db
      .insert(orders)
      .values(data)
      .returning();

    return order;
  }

  async update(id: string, data: Partial<typeof orders.$inferInsert>) {
    const [order] = await this.drizzle.db
      .update(orders)
      .set(data)
      .where(eq(orders.id, id))
      .returning();

    return order;
  }

  async list(pagination?: { limit?: number; offset?: number }) {
    return this.drizzle.db
      .select()
      .from(orders)
      .limit(pagination?.limit)
      .offset(pagination?.offset);
  }
}

Мониторинг

Метрики

// infrastructure/metrics/metrics.service.ts
import { Injectable } from '@nestjs/common';
import { Counter, Histogram, Gauge } from 'prom-client';

@Injectable()
export class MetricsService {
  private readonly ordersPlacedTotal = new Counter({
    name: 'market_orders_placed_total',
    help: 'Total number of orders placed',
    labelNames: ['instrument_id', 'side'],
  });

  private readonly tradesExecutedTotal = new Counter({
    name: 'market_trades_executed_total',
    help: 'Total number of trades executed',
    labelNames: ['instrument_id'],
  });

  private readonly orderBookDepth = new Gauge({
    name: 'market_order_book_depth',
    help: 'Order book depth by instrument',
    labelNames: ['instrument_id', 'side'],
  });

  incrementOrderPlaced(instrumentId: string, side: string) {
    this.ordersPlacedTotal.inc({ instrument_id: instrumentId, side });
  }

  incrementTradeExecuted(instrumentId: string) {
    this.tradesExecutedTotal.inc({ instrument_id: instrumentId });
  }

  setOrderBookDepth(instrumentId: string, side: string, depth: number) {
    this.orderBookDepth.set({ instrument_id: instrumentId, side }, depth);
  }
}

Логи

// infrastructure/logging/logger.service.ts
import { Injectable } from '@nestjs/common';

@Injectable()
export class LoggerService {
  private readonly context = 'MarketService';

  info(message: string, meta?: Record<string, any>) {
    console.log(`[${this.context}] [INFO] ${message}`, meta);
  }

  error(message: string, error?: Error, meta?: Record<string, any>) {
    if (error) {
      console.error(`[${this.context}] [ERROR] ${message}`, error, meta);
    } else {
      console.error(`[${this.context}] [ERROR] ${message}`, meta);
    }
  }

  warn(message: string, meta?: Record<string, any>) {
    console.warn(`[${this.context}] [WARN] ${message}`, meta);
  }

  debug(message: string, meta?: Record<string, any>) {
    if (process.env.LOG_LEVEL === 'debug') {
      console.debug(`[${this.context}] [DEBUG] ${message}`, meta);
    }
  }
}

Тестирование

Unit Tests

// domains/trading/application/use-cases/place-order.use-case.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { PlaceOrderUseCase } from './place-order.use-case';
import { OrderRepository } from '../../domain/repositories/order.repository';
import { MatchingEngine } from '../../domain/services/matching-engine';

describe('PlaceOrderUseCase', () => {
  let useCase: PlaceOrderUseCase;
  let orderRepository: jest.Mocked<OrderRepository>;
  let matchingEngine: jest.Mocked<MatchingEngine>;

  beforeEach(async () => {
    const module: TestingModule = await Test.createTestingModule({
      providers: [
        PlaceOrderUseCase,
        {
          provide: OrderRepository,
          useValue: {
            create: jest.fn(),
          },
        },
        {
          provide: MatchingEngine,
          useValue: {
            addOrder: jest.fn(),
            match: jest.fn().mockReturnValue([]),
          },
        },
      ],
    }).compile();

    useCase = module.get(PlaceOrderUseCase);
    orderRepository = module.get(OrderRepository);
    matchingEngine = module.get(MatchingEngine);
  });

  it('should place order', async () => {
    const request = {
      context: { requestId: '1', correlationId: '1', idempotencyKey: '1' },
      userId: 'user-123',
      instrumentId: 'instrument-456',
      side: 'BUY',
      type: 'LIMIT',
      price: { amount: '100', currencyCode: 'RUB' },
      quantity: 10,
      timeInForce: 'GTC',
    };

    const result = await useCase.execute(request);

    expect(orderRepository.create).toHaveBeenCalled();
    expect(matchingEngine.addOrder).toHaveBeenCalled();
  });
});

Troubleshooting

Проблема: Ордер не исполняется

Решение:

  1. Проверьте, что цена соответствует стакану
  2. Проверьте, что холды созданы
  3. Проверьте логи MatchingEngine

Проблема: Сделка не проходит

Решение:

  1. Проверьте, что транзакция прошла в блокчейне
  2. Проверьте, что холды потреблены
  3. Проверьте логи TradeRepository

Дополнительные ресурсы

On this page