Reference
Market Service - Реализация
Детальная реализация Market Service
Market Service - Реализация
Обзор
Market Service — микросервис на NestJS для управления торговлей, ордербуком и матчингом. Отвечает за размещение ордеров, исполнение сделок и DvP settlement.
Структура проекта
apps/services/market/src/
├── app/
│ ├── app.controller.ts
│ └── app.service.ts
├── app.module.ts
├── config/
│ └── config.module.ts
├── domains/
│ └── trading/
│ ├── application/
│ │ └── use-cases/
│ ├── domain/
│ │ ├── repositories/
│ │ ├── services/
│ │ └── entities/
│ └── presentation/
│ └── market-grpc.gateway.ts
├── infrastructure/
│ ├── database/
│ ├── kafka/
│ ├── redis/
│ └── logging/
├── main.ts
└── shared/Модули
AppModule
// app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { DrizzleModule } from '@maniton/nestjs-common/database';
import { KafkaModule } from '@maniton/nestjs-common/kafka';
import { RedisModule } from '@maniton/nestjs-common/redis';
import { LoggingModule } from '@maniton/nestjs-common/logging';
import { TradingModule } from './domains/trading/trading.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
DrizzleModule.forRoot({
url: process.env.DATABASE_URL,
}),
KafkaModule.forRoot({
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
clientId: 'market-service',
}),
RedisModule.forRoot({
url: process.env.REDIS_URL || 'redis://localhost:6379',
}),
LoggingModule.forRoot({
level: process.env.LOG_LEVEL || 'info',
}),
TradingModule,
],
controllers: [],
providers: [],
})
export class AppModule {}Use Cases
PlaceOrderUseCase
Размещение ордера.
// domains/trading/application/use-cases/place-order.use-case.ts
import { Injectable } from '@nestjs/common';
import { OrderRepository } from '../../domain/repositories/order.repository';
import { MatchingEngine } from '../../domain/services/matching-engine';
import { EventPublisher } from '../../domain/services/event-publisher';
import { OrderSide, OrderType, TimeInForce } from '@maniton/contracts/gen/ts/maniton/common/v1/common_pb';
@Injectable()
export class PlaceOrderUseCase {
constructor(
private readonly orderRepository: OrderRepository,
private readonly matchingEngine: MatchingEngine,
private readonly eventPublisher: EventPublisher,
) {}
async execute(request: {
context: any;
userId: string;
instrumentId: string;
side: OrderSide;
type: OrderType;
price?: { amount: string; currencyCode: string };
quantity: number;
timeInForce: TimeInForce;
}): Promise<any> {
const { context, userId, instrumentId, side, type, price, quantity, timeInForce } = request;
// 1. Создание ордера
const order = await this.orderRepository.create({
userId,
instrumentId,
side,
type,
price,
quantity,
timeInForce,
status: 'OPEN',
createdAt: new Date(),
});
// 2. Добавление в matching engine
this.matchingEngine.addOrder(order);
// 3. Попытка матчинга
const trades = this.matchingEngine.match(instrumentId);
// 4. Исполнение сделок
for (const trade of trades) {
await this.executeTrade(trade);
}
// 5. Публикация события
await this.eventPublisher.publish({
type: 'OrderPlaced',
payload: order,
});
return { order, trades };
}
private async executeTrade(trade: any): Promise<void> {
// Создание сделки
await this.tradeRepository.create({
...trade,
createdAt: new Date(),
});
// Публикация события
await this.eventPublisher.publish({
type: 'TradeExecuted',
payload: trade,
});
}
}CancelOrderUseCase
Отмена ордера.
// domains/trading/application/use-cases/cancel-order.use-case.ts
import { Injectable } from '@nestjs/common';
import { OrderRepository } from '../../domain/repositories/order.repository';
import { MatchingEngine } from '../../domain/services/matching-engine';
import { EventPublisher } from '../../domain/services/event-publisher';
@Injectable()
export class CancelOrderUseCase {
constructor(
private readonly orderRepository: OrderRepository,
private readonly matchingEngine: MatchingEngine,
private readonly eventPublisher: EventPublisher,
) {}
async execute(orderId: string): Promise<void> {
const order = await this.orderRepository.findById(orderId);
if (!order) {
throw new Error('Order not found');
}
if (order.status !== 'OPEN') {
throw new Error('Order is not open');
}
// Удаление из matching engine
this.matchingEngine.removeOrder(orderId);
// Обновление статуса
await this.orderRepository.update(orderId, {
status: 'CANCELLED',
cancelledAt: new Date(),
});
// Публикация события
await this.eventPublisher.publish({
type: 'OrderCancelled',
payload: order,
});
}
}Connect RPC Gateway
MarketGrpcGateway
// domains/trading/presentation/market-grpc.gateway.ts
import { ConnectGateway, ConnectRouter } from '@maniton/nestjs-common/connect';
import { MarketService } from '@maniton/contracts/gen/ts/maniton/market/v1/market_pb';
import type { MarketUseCases } from './market-grpc.gateway';
export class MarketGrpcGateway extends ConnectGateway {
protected readonly options = {
requestPathPrefix: '/market-service',
};
protected registerRoutes(router: ConnectRouter): void {
router.service(MarketService, {
placeOrder: (req) => this.useCases.placeOrder.execute(req),
cancelOrder: (req) => this.useCases.cancelOrder.execute(req),
getOrder: (req) => this.useCases.getOrder.execute(req),
listOrders: (req) => this.useCases.listOrders.execute(req),
listTrades: (req) => this.useCases.listTrades.execute(req),
getOrderBook: (req) => this.useCases.getOrderBook.execute(req),
});
}
}Сервисы
MatchingEngine
// domains/trading/domain/services/matching-engine.ts
import { Injectable } from '@nestjs/common';
import { PriorityQueue } from 'priorityqueue';
interface Order {
id: string;
userId: string;
instrumentId: string;
side: 'BUY' | 'SELL';
price: number;
quantity: number;
status: 'OPEN' | 'FILLED' | 'CANCELLED';
}
@Injectable()
export class MatchingEngine {
private orderBooks: Map<string, {
bids: PriorityQueue<Order>;
asks: PriorityQueue<Order>;
}> = new Map();
getOrderBook(instrumentId: string) {
if (!this.orderBooks.has(instrumentId)) {
this.orderBooks.set(instrumentId, {
bids: new PriorityQueue((a, b) => b.price - a.price),
asks: new PriorityQueue((a, b) => a.price - b.price),
});
}
return this.orderBooks.get(instrumentId);
}
addOrder(order: Order): void {
const book = this.getOrderBook(order.instrumentId);
if (order.side === 'BUY') {
book.bids.push(order);
} else {
book.asks.push(order);
}
}
removeOrder(orderId: string): void {
for (const [instrumentId, book] of this.orderBooks.entries()) {
// Удаление из bids
const bids = book.bids.toArray();
for (const bid of bids) {
if (bid.id === orderId) {
book.bids.remove(bid);
break;
}
}
// Удаление из asks
const asks = book.asks.toArray();
for (const ask of asks) {
if (ask.id === orderId) {
book.asks.remove(ask);
break;
}
}
}
}
match(instrumentId: string): any[] {
const book = this.getOrderBook(instrumentId);
const trades: any[] = [];
while (book.bids.peek() && book.asks.peek()) {
const bestBid = book.bids.peek();
const bestAsk = book.asks.peek();
if (bestBid.price >= bestAsk.price) {
// Исполнение сделки
const quantity = Math.min(bestBid.quantity, bestAsk.quantity);
const price = bestAsk.price;
trades.push({
buyerId: bestBid.userId,
sellerId: bestAsk.userId,
instrumentId,
quantity,
price,
timestamp: new Date(),
});
// Обновление количества
bestBid.quantity -= quantity;
bestAsk.quantity -= quantity;
// Удаление полностью исполненных ордеров
if (bestBid.quantity === 0) {
book.bids.pop();
}
if (bestAsk.quantity === 0) {
book.asks.pop();
}
} else {
break;
}
}
return trades;
}
}OrderBookStorage
// domains/trading/domain/services/order-book-storage.ts
import { Injectable } from '@nestjs/common';
import { Redis } from 'ioredis';
@Injectable()
export class OrderBookStorage {
constructor(private readonly redis: Redis) {}
async saveSnapshot(instrumentId: string, snapshot: any): Promise<void> {
const key = `orderbook:${instrumentId}`;
await this.redis.setex(key, 3600, JSON.stringify(snapshot));
}
async getSnapshot(instrumentId: string): Promise<any | null> {
const key = `orderbook:${instrumentId}`;
const data = await this.redis.get(key);
return data ? JSON.parse(data) : null;
}
async publishUpdate(instrumentId: string, snapshot: any): Promise<void> {
await this.redis.publish(`orderbook:${instrumentId}`, JSON.stringify(snapshot));
}
}Репозитории
OrderRepository
// domains/trading/domain/repositories/order.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { orders } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';
@Injectable()
export class OrderRepository {
constructor(private readonly drizzle: DrizzleService) {}
async findById(id: string) {
const [order] = await this.drizzle.db
.select()
.from(orders)
.where(eq(orders.id, id))
.limit(1);
return order;
}
async create(data: typeof orders.$inferInsert) {
const [order] = await this.drizzle.db
.insert(orders)
.values(data)
.returning();
return order;
}
async update(id: string, data: Partial<typeof orders.$inferInsert>) {
const [order] = await this.drizzle.db
.update(orders)
.set(data)
.where(eq(orders.id, id))
.returning();
return order;
}
async list(pagination?: { limit?: number; offset?: number }) {
return this.drizzle.db
.select()
.from(orders)
.limit(pagination?.limit)
.offset(pagination?.offset);
}
}Мониторинг
Метрики
// infrastructure/metrics/metrics.service.ts
import { Injectable } from '@nestjs/common';
import { Counter, Histogram, Gauge } from 'prom-client';
@Injectable()
export class MetricsService {
private readonly ordersPlacedTotal = new Counter({
name: 'market_orders_placed_total',
help: 'Total number of orders placed',
labelNames: ['instrument_id', 'side'],
});
private readonly tradesExecutedTotal = new Counter({
name: 'market_trades_executed_total',
help: 'Total number of trades executed',
labelNames: ['instrument_id'],
});
private readonly orderBookDepth = new Gauge({
name: 'market_order_book_depth',
help: 'Order book depth by instrument',
labelNames: ['instrument_id', 'side'],
});
incrementOrderPlaced(instrumentId: string, side: string) {
this.ordersPlacedTotal.inc({ instrument_id: instrumentId, side });
}
incrementTradeExecuted(instrumentId: string) {
this.tradesExecutedTotal.inc({ instrument_id: instrumentId });
}
setOrderBookDepth(instrumentId: string, side: string, depth: number) {
this.orderBookDepth.set({ instrument_id: instrumentId, side }, depth);
}
}Логи
// infrastructure/logging/logger.service.ts
import { Injectable } from '@nestjs/common';
@Injectable()
export class LoggerService {
private readonly context = 'MarketService';
info(message: string, meta?: Record<string, any>) {
console.log(`[${this.context}] [INFO] ${message}`, meta);
}
error(message: string, error?: Error, meta?: Record<string, any>) {
if (error) {
console.error(`[${this.context}] [ERROR] ${message}`, error, meta);
} else {
console.error(`[${this.context}] [ERROR] ${message}`, meta);
}
}
warn(message: string, meta?: Record<string, any>) {
console.warn(`[${this.context}] [WARN] ${message}`, meta);
}
debug(message: string, meta?: Record<string, any>) {
if (process.env.LOG_LEVEL === 'debug') {
console.debug(`[${this.context}] [DEBUG] ${message}`, meta);
}
}
}Тестирование
Unit Tests
// domains/trading/application/use-cases/place-order.use-case.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { PlaceOrderUseCase } from './place-order.use-case';
import { OrderRepository } from '../../domain/repositories/order.repository';
import { MatchingEngine } from '../../domain/services/matching-engine';
describe('PlaceOrderUseCase', () => {
let useCase: PlaceOrderUseCase;
let orderRepository: jest.Mocked<OrderRepository>;
let matchingEngine: jest.Mocked<MatchingEngine>;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
PlaceOrderUseCase,
{
provide: OrderRepository,
useValue: {
create: jest.fn(),
},
},
{
provide: MatchingEngine,
useValue: {
addOrder: jest.fn(),
match: jest.fn().mockReturnValue([]),
},
},
],
}).compile();
useCase = module.get(PlaceOrderUseCase);
orderRepository = module.get(OrderRepository);
matchingEngine = module.get(MatchingEngine);
});
it('should place order', async () => {
const request = {
context: { requestId: '1', correlationId: '1', idempotencyKey: '1' },
userId: 'user-123',
instrumentId: 'instrument-456',
side: 'BUY',
type: 'LIMIT',
price: { amount: '100', currencyCode: 'RUB' },
quantity: 10,
timeInForce: 'GTC',
};
const result = await useCase.execute(request);
expect(orderRepository.create).toHaveBeenCalled();
expect(matchingEngine.addOrder).toHaveBeenCalled();
});
});Troubleshooting
Проблема: Ордер не исполняется
Решение:
- Проверьте, что цена соответствует стакану
- Проверьте, что холды созданы
- Проверьте логи MatchingEngine
Проблема: Сделка не проходит
Решение:
- Проверьте, что транзакция прошла в блокчейне
- Проверьте, что холды потреблены
- Проверьте логи TradeRepository