Манитон Docs

Ledger-DB Service

Система операционного учета и двойной записи

Ledger-DB Service

Статус: Реализован. Система операционного учета с двойной записью и полной интеграцией с DLT.

Ledger-DB — это критический компонент платформы, отвечающий за операционный учет денежных средств и ЦФА. Он реализует модель двойной записи (Double-Entry Bookkeeping).

Архитектура учета

Система учета построена на следующих сущностях:

  1. Account: Глобальный счет пользователя или системы.
  2. SubAccount: Субсчет для конкретного актива (RUB, CFA-EQ, и т.д.).
  3. Operation: Группировка проводок, представляющая собой одну бизнес-операцию (например, "Торговля").
  4. Posting: Атомарная запись о движении средств между двумя субсчетами.
  5. Hold: Временная блокировка средств (например, под открытый ордер).

Технологический стек

  • Фреймворк: NestJS
  • База данных: PostgreSQL
  • ORM: Drizzle ORM
  • Связь: gRPC (для синхронных проверок) + Kafka (для асинхронного обновления).

Принципы работы

  • Атомарность: Все проводки внутри одной Operation выполняются в одной транзакции БД.
  • Неизменяемость: Существующие проводки нельзя удалять или изменять. Для исправления ошибок используются сторнирующие (reversing) записи.
  • Double Contour: Ledger-DB является проекцией данных из DLT (Besu), но также хранит промежуточные состояния (Hold), которых нет в блокчейне.

Модель данных (Entity-Relationship)

Loading diagram...

Модель данных (Protobuf)

message Account {
  string account_id = 1;
  string user_id = 2;
  AccountType type = 3;  // MAIN / INVEST / BLOCKED
}

message SubAccount {
  string subaccount_id = 1;
  string account_id = 2;
  string instrument_id = 3;  // CFA-RUB / CFA-EQ-001 / ...
  Decimal balance = 4;
  SubAccountStatus status = 5;
}

message Operation {
  string operation_id = 1;
  OperationType type = 2;
  OperationStatus status = 3;
  string user_id = 4;
  string idempotency_key = 5;
  string external_ref = 6;
  string onchain_tx_hash = 7;
}

message Posting {
  string posting_id = 1;
  string operation_id = 2;
  string debit_subaccount_id = 3;
  string credit_subaccount_id = 4;
  MonetaryAmount amount = 5;
}

message Hold {
  string hold_id = 1;
  string subaccount_id = 2;
  MonetaryAmount amount = 3;
  HoldStatus status = 4;  // ACTIVE / RELEASED / CONSUMED / EXPIRED
  string reason = 5;
}

Пример проводок

Пополнение (Deposit):

Debit:  System.Reserve.CFA-RUB      +1000
Credit: User123.Main.CFA-RUB        +1000

Покупка инвест-ЦФА:

Debit:  User123.Main.CFA-RUB        -5000
Credit: System.Reserve.CFA-RUB      +5000

Debit:  System.Reserve.CFA-EQ-001   -10 shares
Credit: User123.Invest.CFA-EQ-001   +10 shares

Kafka RPC API

Ledger использует Kafka RPC с reply-топиками *.reply.

TopicProtoОписание
ledger.getAccountGetAccountRequest/ResponseПолучить счет
ledger.listAccountsListAccountsRequest/ResponseСписок счетов пользователя
ledger.getSubAccountGetSubAccountRequest/ResponseПолучить субсчет
ledger.createOperationCreateOperationRequest/ResponseСоздать операцию с проводками
ledger.getOperationGetOperationRequest/ResponseПолучить операцию
ledger.listOperationsListOperationsRequest/ResponseСписок операций
ledger.createHoldCreateHoldRequest/ResponseСоздать блокировку
ledger.releaseHoldReleaseHoldRequest/ResponseОсвободить блокировку
ledger.consumeHoldConsumeHoldRequest/ResponseСписать блокировку

События

Исходящие (maniton.ledger.events.v1)

  • OperationCreated
  • OperationStatusChanged
  • BalanceUpdated
  • HoldCreated
  • HoldReleased
  • HoldConsumed
  • UserOnchainRegistrationRequested
  • UserOnchainRegistrationConfirmed

Входящие

  • maniton.identity.events.v1: UserCreated, UserBlocked, UserUnblocked
  • maniton.cfa.events.v1: CfaIssued, CfaTransferred, CfaRedeemed
  • maniton.besu.events.v1: TransactionReceiptReadyEvent, BlockReadEvent

Сверка с DLT (Reconciliation)

Ledger использует события Besu для подтверждения финальности операций:

  1. При получении TransactionReceiptReadyEvent операция переводится в FINALIZED.
  2. На BlockReadEvent может запускаться reconciliation-процесс для восстановления пропущенных операций.

API (gRPC Connect)

LedgerService

GetAccount

Получение счета пользователя.

message GetAccountRequest {
  RequestContext context = 1;
  string account_id = 2;
}

message GetAccountResponse {
  Account account = 1;
}

ListAccounts

Получение списка счетов пользователя.

message ListAccountsRequest {
  RequestContext context = 1;
  string user_id = 2;
  PaginationRequest pagination = 3;
}

message ListAccountsResponse {
  repeated Account accounts = 1;
  PaginationResponse pagination = 2;
}

GetSubAccount

Получение субсчета.

message GetSubAccountRequest {
  RequestContext context = 1;
  string subaccount_id = 2;
}

message GetSubAccountResponse {
  SubAccount subaccount = 1;
}

CreateOperation

Создание операции с проводками.

message CreateOperationRequest {
  RequestContext context = 1;
  Operation operation = 2;
  repeated Posting postings = 3;
}

message CreateOperationResponse {
  Operation operation = 1;
  repeated Posting postings = 2;
}

GetOperation

Получение операции.

message GetOperationRequest {
  RequestContext context = 1;
  string operation_id = 2;
}

message GetOperationResponse {
  Operation operation = 1;
  repeated Posting postings = 2;
}

ListOperations

Получение списка операций пользователя.

message ListOperationsRequest {
  RequestContext context = 1;
  string user_id = 2;
  OperationType type = 3;
  OperationStatus status = 4;
  PaginationRequest pagination = 5;
}

message ListOperationsResponse {
  repeated Operation operations = 1;
  PaginationResponse pagination = 2;
}

CreateHold

Создание блокировки средств.

message CreateHoldRequest {
  RequestContext context = 1;
  Hold hold = 2;
}

message CreateHoldResponse {
  Hold hold = 1;
}

ReleaseHold

Освобождение блокировки.

message ReleaseHoldRequest {
  RequestContext context = 1;
  string hold_id = 2;
  string reason = 3;
}

message ReleaseHoldResponse {
  Hold hold = 1;
}

ConsumeHold

Потребление блокированных средств.

message ConsumeHoldRequest {
  RequestContext context = 1;
  string hold_id = 2;
}

message ConsumeHoldResponse {
  Hold hold = 1;
}

События Kafka

Исходящие (maniton.ledger.events.v1)

OperationCreatedEvent

message OperationCreatedEvent {
  RequestContext context = 1;
  string operation_id = 2;
  OperationType type = 3;
  string user_id = 4;
  MonetaryAmount amount = 5;
  google.protobuf.Timestamp created_at = 6;
}

BalanceUpdatedEvent

message BalanceUpdatedEvent {
  RequestContext context = 1;
  string subaccount_id = 2;
  Decimal old_balance = 3;
  Decimal new_balance = 4;
  string operation_id = 5;
  google.protobuf.Timestamp updated_at = 6;
}

HoldCreatedEvent

message HoldCreatedEvent {
  RequestContext context = 1;
  string hold_id = 2;
  string subaccount_id = 3;
  MonetaryAmount amount = 4;
  string reason = 5;
  google.protobuf.Timestamp created_at = 6;
}

HoldReleasedEvent

message HoldReleasedEvent {
  RequestContext context = 1;
  string hold_id = 2;
  string reason = 3;
  google.protobuf.Timestamp released_at = 4;
}

HoldConsumedEvent

message HoldConsumedEvent {
  RequestContext context = 1;
  string hold_id = 2;
  google.protobuf.Timestamp consumed_at = 3;
}

Входящие

  • maniton.identity.events.v1: UserCreated, UserBlocked, UserUnblocked
  • maniton.cfa.events.v1: CfaIssued, CfaTransferred, CfaRedeemed
  • maniton.besu.events.v1: TransactionReceiptReadyEvent, BlockReadEvent

Use Cases

CreateOperationUseCase

Создание операции с проводками в рамках транзакции БД.

async execute(request: CreateOperationRequest): Promise<CreateOperationResponse> {
  const { context, operation, postings } = request;

  // 1. Валидация проводок
  this.validatePostings(postings);

  // 2. Создание операции и проводок
  return await this.unitOfWork.executeWithEvents(async (tx: DbTransaction) => {
    // Создание операции
    const created = await this.operationRepository.save(operation, tx);

    // Создание проводок
    const createdPostings = await Promise.all(
      postings.map(p => this.postingRepository.save(p, tx))
    );

    // Публикация событий
    await this.eventPublisher.publish({
      type: 'OperationCreated',
      payload: created,
    });

    for (const posting of createdPostings) {
      await this.eventPublisher.publish({
        type: 'PostingCreated',
        payload: posting,
      });
    }

    return {
      operation: created,
      postings: createdPostings,
    };
  });
}

CreateHoldUseCase

Создание блокировки средств.

async execute(request: CreateHoldRequest): Promise<CreateHoldResponse> {
  const { context, hold } = request;

  // 1. Проверка баланса
  const balance = await this.subAccountRepository.getBalance(hold.subaccountId);

  if (balance < hold.amount.amount.value) {
    throw new Error('Insufficient balance');
  }

  // 2. Создание холда
  const created = await this.holdRepository.save(hold);

  // 3. Публикация события
  await this.eventPublisher.publish({
    type: 'HoldCreated',
    payload: created,
  });

  return { hold: created };
}

ConsumeHoldUseCase

Потребление блокированных средств.

async execute(request: ConsumeHoldRequest): Promise<ConsumeHoldResponse> {
  const { context, holdId } = request;

  // 1. Получение холда
  const hold = await this.holdRepository.findById(holdId);

  if (!hold) {
    throw new Error('Hold not found');
  }

  if (hold.status !== HoldStatus.ACTIVE) {
    throw new Error('Hold is not active');
  }

  // 2. Проверка срока действия
  if (hold.expiresAt && new Date() > hold.expiresAt) {
    hold.status = HoldStatus.EXPIRED;
    await this.holdRepository.save(hold);

    await this.eventPublisher.publish({
      type: 'HoldReleased',
      payload: {
        holdId: hold.holdId,
        reason: 'Expired',
      },
    });

    throw new Error('Hold has expired');
  }

  // 3. Потребление холда
  hold.status = HoldStatus.CONSUMED;
  await this.holdRepository.save(hold);

  // 4. Публикация события
  await this.eventPublisher.publish({
    type: 'HoldConsumed',
    payload: hold,
  });

  return { hold };
}

Модель данных

Account

Счет пользователя для хранения активов.

class Account {
  accountId: string;
  userId: string;
  type: AccountType; // MAIN, INVEST, BLOCKED
  createdAt: Date;
  updatedAt: Date;
}

enum AccountType {
  MAIN = 'MAIN',
  INVEST = 'INVEST',
  BLOCKED = 'BLOCKED',
}

SubAccount

Субсчет для конкретного актива.

class SubAccount {
  subaccountId: string;
  accountId: string;
  instrumentId: string;
  balance: string; // Decimal
  status: SubAccountStatus;
  createdAt: Date;
  updatedAt: Date;
}

enum SubAccountStatus {
  ACTIVE = 'ACTIVE',
  FROZEN = 'FROZEN',
  CLOSED = 'CLOSED',
}

Operation

Операция над активами.

class Operation {
  operationId: string;
  type: OperationType;
  status: OperationStatus;
  userId: string;
  idempotencyKey: string;
  externalRef?: string;
  onchainTxHash?: string;
  rejectionReason?: RejectionReasonCode;
  rejectionMessage?: string;
  createdAt: Date;
  finalizedAt?: Date;
}

enum OperationType {
  DEPOSIT = 'DEPOSIT',
  WITHDRAWAL = 'WITHDRAWAL',
  P2P_TRANSFER = 'P2P_TRANSFER',
  PAYMENT = 'PAYMENT',
  ISSUANCE = 'ISSUANCE',
  REDEMPTION = 'REDEMPTION',
  TRADE_BUY = 'TRADE_BUY',
  TRADE_SELL = 'TRADE_SELL',
  DIVIDEND = 'DIVIDEND',
  COUPON = 'COUPON',
  EXTERNAL_WITHDRAWAL = 'EXTERNAL_WITHDRAWAL',
  EXTERNAL_TRANSFER = 'EXTERNAL_TRANSFER',
  TRADING = 'TRADING',
  FEE = 'FEE',
}

enum OperationStatus {
  CREATED = 'CREATED',
  KYC_REQUIRED = 'KYC_REQUIRED',
  COMPLIANCE_CHECK = 'COMPLIANCE_CHECK',
  AWAITING_PAYMENT = 'AWAITING_PAYMENT',
  PROCESSING = 'PROCESSING',
  ONCHAIN_SUBMITTED = 'ONCHAIN_SUBMITTED',
  FINALIZED = 'FINALIZED',
  SETTLED = 'SETTLED',
  REJECTED = 'REJECTED',
  BLOCKED = 'BLOCKED',
  FAILED = 'FAILED',
  REFUNDED = 'REFUNDED',
}

Posting

Бухгалтерская проводка.

class Posting {
  postingId: string;
  operationId: string;
  debitSubaccountId: string;
  creditSubaccountId: string;
  amount: MonetaryAmount;
  createdAt: Date;
}

Hold

Блокировка средств.

class Hold {
  holdId: string;
  subaccountId: string;
  amount: MonetaryAmount;
  status: HoldStatus;
  reason: string;
  createdAt: Date;
  expiresAt?: Date;
}

enum HoldStatus {
  ACTIVE = 'ACTIVE',
  RELEASED = 'RELEASED',
  CONSUMED = 'CONSUMED',
  EXPIRED = 'EXPIRED',
}

Мониторинг

Метрики

import { Counter, Histogram, Gauge } from 'prom-client';

export const operationsTotal = new Counter({
  name: 'ledger_operations_total',
  help: 'Total number of ledger operations',
  labelNames: ['type', 'status'],
});

export const postingsTotal = new Counter({
  name: 'ledger_postings_total',
  help: 'Total number of postings',
});

export const holdsTotal = new Gauge({
  name: 'ledger_holds_total',
  help: 'Total number of active holds',
});

export const operationDuration = new Histogram({
  name: 'ledger_operation_duration_seconds',
  help: 'Time spent on ledger operations',
  buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
});

Логи

this.logger.log('Operation created', {
  operationId: operation.operationId,
  type: operation.type,
  userId: operation.userId,
  amount: operation.amount,
  postingsCount: postings.length,
});

this.logger.log('Hold created', {
  holdId: hold.holdId,
  subaccountId: hold.subaccountId,
  amount: hold.amount,
  reason: hold.reason,
});

this.logger.log('Hold consumed', {
  holdId: hold.holdId,
  subaccountId: hold.subaccountId,
  amount: hold.amount,
});

Troubleshooting

Проблема: Балансы не совпадают

Диагностика:

# Проверка баланса пользователя
curl http://ledger-service:3003/accounts/$ACCOUNT_ID

# Проверка операций
curl http://ledger-service:3003/operations?user_id=$USER_ID

# Проверка проводок
curl http://ledger-service:3003/postings?operation_id=$OPERATION_ID

Решение:

  1. Проверьте, что все проводки созданы
  2. Проверьте, что операции финализированы
  3. Запустите reconciliation процесс

Проблема: Холды не освобождаются

Диагностика:

# Проверка статуса холда
curl http://ledger-service:3003/holds/$HOLD_ID

# Проверка срока действия
curl http://ledger-service:3003/holds?status=EXPIRED

Решение:

  1. Проверьте, что срок действия не истек
  2. Проверьте, что холд не был уже потреблен
  3. Ручное освобождение если нужно

Дополнительные ресурсы

On this page