Манитон Docs

Каталог Kafka топиков

Полный список и описание топиков в системе

Каталог Kafka топиков

Все взаимодействие между сервисами в платформе Манитон происходит через единую шину событий Kafka. Топики разделены по доменным областям.

Правила именования

Формат: maniton.<domain>.events.v1

  • maniton — префикс организации.
  • <domain> — доменная область (identity, ledger, besu и т.д.).
  • events — тип данных.
  • v1 — версия схемы.

Список топиков

ТопикОписаниеОсновные типы событий
maniton.identity.events.v1События KYC и пользователейUserCreated, KycVerified, LimitChanged
maniton.ledger.events.v1Операционный учет и балансыBalanceUpdated, OperationCreated, HoldCreated
maniton.cfa.events.v1Жизненный цикл ЦФАCfaIssued, CfaTransferred, CfaRedeemed
maniton.besu.events.v1Команды и события DLTSubmitTx, BlockRead, TxReceiptReady
maniton.payments.events.v1Платежи и СБПFiatDeposited, WithdrawalRequested
maniton.market.events.v1Торговля и ордераOrderPlaced, TradeExecuted, OrderCancelled
maniton.audit.logs.v1Журнал аудитаSecurityEvent, AdminAction

Формат сообщения (EventEnvelope)

Каждое сообщение в Kafka оборачивается в структуру EventEnvelope, которая соответствует спецификации CloudEvents и обогащена контекстом запроса.

message EventEnvelope {
  // Уникальный ID события (UUID)
  string id = 1;
  // Источник (например, "maniton.service.ledger")
  string source = 2;
  // Тип события (например, "maniton.ledger.v1.BalanceUpdated")
  string type = 3;
  // Время возникновения
  google.protobuf.Timestamp time = 4;
  // Сами данные (сериализованный Protobuf)
  bytes data = 5;
  // Контекст запроса для трассировки и идемпотентности
  RequestContext context = 6;
}

RequestContext

Используется для сквозной трассировки (distributed tracing) и обеспечения идемпотентности во всей системе.

message RequestContext {
  string request_id = 1;        // UUID запроса
  string correlation_id = 2;    // UUID для связи запрос-ответ
  string idempotency_key = 3;   // Ключ идемпотентности
  string trace_id = 5;          // OpenTelemetry trace_id
  string user_id = 6;           // Инициатор (если есть)
  string client_ip = 8;
  Contour contour = 9;          // RF (РФ) или EXTERNAL (Внешний)
}

Гарантии доставки

Платформа использует режим At-Least-Once доставки сообщений. Это означает:

  1. Обработчики (Consumers) обязаны быть идемпотентными.
  2. Использование idempotency_key из контекста обязательно для всех записей в БД.
  3. Повторная обработка события с тем же ID не должна приводить к повторным начислениям или списаниям.

Детальное описание топиков

maniton.identity.events.v1

Назначение: События жизненного цикла пользователей и KYC

Основные события:

СобытиеОписаниеПоля
UserCreatedСоздание нового пользователяuser_id, email, phone
UserUpdatedОбновление профиля пользователяuser_id, changes
KycSubmittedПодача KYC заявкиcase_id, user_id, payload
KycStatusChangedИзменение статуса KYCuser_id, old_status, new_status
UserBlockedБлокировка пользователяuser_id, reason
UserUnblockedРазблокировка пользователяuser_id, reason
LimitChangedИзменение лимитов пользователяuser_id, limit_type, new_limit
SanctionsCheckedРезультат санкционной проверкиuser_id, status, matched_lists

Пример события:

message KycStatusChangedEvent {
  RequestContext context = 1;
  string user_id = 2;
  KycStatus old_status = 3;
  KycStatus new_status = 4;
  google.protobuf.Timestamp changed_at = 5;
}

maniton.ledger.events.v1

Назначение: События операционного учета и балансов

Основные события:

СобытиеОписаниеПоля
AccountCreatedСоздание счета пользователяaccount_id, user_id, type
SubAccountCreatedСоздание субсчетаsubaccount_id, account_id, instrument_id
OperationCreatedСоздание операцииoperation_id, type, user_id, amount
OperationStatusChangedИзменение статуса операцииoperation_id, old_status, new_status
BalanceUpdatedОбновление балансаsubaccount_id, old_balance, new_balance
PostingCreatedСоздание проводкиposting_id, operation_id, debit, credit, amount
HoldCreatedСоздание блокировки средствhold_id, subaccount_id, amount, reason
HoldReleasedОсвобождение блокировкиhold_id, reason
HoldConsumedСписание блокированных средствhold_id

Пример события:

message BalanceUpdatedEvent {
  RequestContext context = 1;
  string subaccount_id = 2;
  Decimal old_balance = 3;
  Decimal new_balance = 4;
  string operation_id = 5;
  google.protobuf.Timestamp updated_at = 6;
}

maniton.cfa.events.v1

Назначение: События жизненного цикла ЦФА

Основные события:

СобытиеОписаниеПоля
InstrumentRegisteredРегистрация инструментаinstrument_id, symbol, name, type
CfaIssuedВыпуск ЦФАoperation_id, instrument_id, holder_id, amount
CfaRedeemedПогашение ЦФАoperation_id, instrument_id, holder_id, amount
CfaTransferredПеревод ЦФАoperation_id, instrument_id, from_user, to_user, amount
IssuanceStartedНачало процесса выпускаoperation_id, user_id, amount
IssuanceFinalizedФинализация выпускаoperation_id, tx_hash
RedemptionStartedНачало процесса погашенияoperation_id, user_id, amount
RedemptionFinalizedФинализация погашенияoperation_id, tx_hash
IntegrityCheckedПроверка целостностиblock_number, block_hash, valid

Пример события:

message CfaIssuedEvent {
  RequestContext context = 1;
  string operation_id = 2;
  string instrument_id = 3;
  string holder_id = 4;
  MonetaryAmount amount = 5;
  string tx_hash = 6;
  google.protobuf.Timestamp issued_at = 7;
}

maniton.besu.events.v1

Назначение: События DLT-слоя и блокчейна

Основные события:

СобытиеОписаниеПоля
TransactionSubmittedТранзакция отправлена в сетьtx_hash, status
TransactionConfirmedТранзакция подтвержденаtx_hash, block_number
TransactionFailedТранзакция не прошлаtx_hash, reason
TransactionReceiptReadyКвитанция транзакции готоваtx_hash, receipt
BlockReadБлок прочитанblock_number, block_hash, transactions
BlockHashVerifiedХэш блока проверенblock_number, block_hash, valid, computed_hash

Пример события:

message TransactionReceiptReadyEvent {
  RequestContext context = 1;
  TransactionReceipt receipt = 2;
}

message TransactionReceipt {
  string tx_hash = 1;
  BesuTxStatus status = 2;
  uint64 block_number = 3;
  string block_hash = 4;
  uint64 gas_used = 5;
  google.protobuf.Timestamp mined_at = 6;
}

maniton.payments.events.v1

Назначение: События платежей и интеграций с банками

Основные события:

СобытиеОписаниеПоля
FiatDepositedФиатные средства поступилиuser_id, amount, payment_id
WithdrawalRequestedЗапрос на вывод средствuser_id, amount, payout_id
WithdrawalCompletedВывод средств завершенuser_id, amount, payout_id
PaymentIntentCreatedСоздание намерения платежаpayment_id, user_id, amount, method
PaymentStatusChangedИзменение статуса платежаpayment_id, old_status, new_status
ReconciliationCompletedСверка завершенаreport_id, balanced, notes

Пример события:

message FiatDepositedEvent {
  RequestContext context = 1;
  string user_id = 2;
  Money amount = 3;
  string payment_id = 4;
  google.protobuf.Timestamp deposited_at = 5;
}

maniton.market.events.v1

Назначение: События торгов и ордеров

Основные события:

СобытиеОписаниеПоля
OrderPlacedОрдер размещенorder_id, user_id, instrument_id, side, type, price, quantity
OrderCancelledОрдер отмененorder_id, reason
OrderFilledОрдер исполненorder_id, filled_quantity, remaining_quantity
TradeExecutedСделка исполненаtrade_id, buy_order_id, sell_order_id, instrument_id, price, quantity, fee
OrderBookUpdatedСтакан обновленinstrument_id, bids, asks

Пример события:

message TradeExecutedEvent {
  RequestContext context = 1;
  string trade_id = 2;
  string buyer_id = 3;
  string seller_id = 4;
  string instrument_id = 5;
  Decimal quantity = 6;
  Decimal price = 7;
  Fee fee = 8;
  google.protobuf.Timestamp executed_at = 9;
}

maniton.audit.logs.v1

Назначение: Аудит логи и события безопасности

Основные события:

СобытиеОписаниеПоля
SecurityEventСобытие безопасностиevent_type, user_id, ip, details
AdminActionДействие администратораadmin_id, action, target_user_id, details
ComplianceViolationНарушение комплаенсviolation_type, user_id, details
SystemEventСистемное событиеevent_type, service, details

Пример события:

message SecurityEvent {
  RequestContext context = 1;
  string event_type = 2;
  string user_id = 3;
  string ip = 4;
  map<string, string> details = 5;
  google.protobuf.Timestamp occurred_at = 6;
}

Конфигурация топиков

Ретеншн и компакция

ТопикРетеншнКомпакцияПричина
maniton.identity.events.v17 днейНетKYC данные не критичны для долгого хранения
maniton.ledger.events.v130 днейДаОперации важны для аудита
maniton.cfa.events.v130 днейДаЖизненный цикл ЦФА важен для аудита
maniton.besu.events.v17 днейНетБлокчейн данные доступны в узлах
maniton.payments.events.v130 днейДаПлатежи требуют долгого хранения
maniton.market.events.v17 днейНетТорговые данные быстро устаревают
maniton.audit.logs.v1365 днейДаАудит требует долгого хранения

Партиционирование

# Создание топика с партиционированием
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic maniton.ledger.events.v1 \
  --partitions 10 \
  --replication-factor 3 \
  --config retention.ms=2592000000 \
  --config cleanup.policy=compact \
  --config segment.ms=86400000

Конфигурация продюсеров

import { Producer } from 'kafkajs';

const producer = kafka.producer({
  maxInFlightRequests: 1,
  idempotent: true,
  transactionTimeout: 30000,
  retries: {
    maxAttempts: 5,
    initialRetryTime: 100,
    maxRetryTime: 1000,
  },
});

await producer.connect();

// Отправка события
await producer.send({
  topic: 'maniton.ledger.events.v1',
  messages: [
    {
      key: operationId,
      value: Buffer.from(eventEnvelope.toBinary()),
      headers: {
        'event-type': 'OperationCreated',
        'content-type': 'application/protobuf',
      },
    },
  ],
});

Конфигурация консьюмеров

import { Consumer } from 'kafkajs';

const consumer = kafka.consumer({
  groupId: 'ledger-service',
  maxWaitTimeInMs: 1000,
  maxBytesPerPartition: 1048576, // 1MB
  sessionTimeout: 30000,
  heartbeatInterval: 3000,
});

await consumer.subscribe({
  topic: 'maniton.cfa.events.v1',
  fromBeginning: false,
});

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const eventEnvelope = EventEnvelope.fromBinary(message.value);

      // Идемпотентность
      const existing = await this.operationRepository.findByIdempotencyKey(
        eventEnvelope.context.idempotencyKey
      );

      if (existing) {
        return; // Уже обработано
      }

      // Обработка события
      await this.handleEvent(eventEnvelope);
    },
  },
});

Best Practices

1. Идемпотентность

async handleEvent(event: EventEnvelope) {
  // Проверка идемпотентности
  const processed = await this.processedEventsRepository.findById(
    event.id
  );

  if (processed) {
    this.logger.warn('Event already processed', { eventId: event.id });
    return;
  }

  // Обработка события
  await this.processEvent(event);

  // Сохранение метки обработки
  await this.processedEventsRepository.save({
    eventId: event.id,
    processedAt: new Date(),
  });
}

2. Обработка ошибок

async processEvent(event: EventEnvelope) {
  try {
    await this.handleEvent(event);
  } catch (error) {
    // Логирование ошибки
    this.logger.error('Event processing failed', {
      eventId: event.id,
      error: error.message,
      stack: error.stack,
    });

    // Отправка в Dead Letter Queue
    await this.dlqProducer.send({
      topic: 'maniton.dlq.v1',
      messages: [
        {
          key: event.id,
          value: Buffer.from(event.toBinary()),
          headers: {
            'original-topic': event.source,
            'error': error.message,
          },
        },
      ],
    });

    // Повторная попытка или ручной разбор
    throw error;
  }
}

3. Валидация событий

function validateEventEnvelope(envelope: EventEnvelope): void {
  if (!envelope.id) {
    throw new Error('Event ID is required');
  }

  if (!envelope.type) {
    throw new Error('Event type is required');
  }

  if (!envelope.context) {
    throw new Error('Request context is required');
  }

  if (!envelope.context.requestId) {
    throw new Error('Request ID is required');
  }

  if (!envelope.context.idempotencyKey) {
    throw new Error('Idempotency key is required');
  }
}

4. Логирование событий

this.logger.log('Event received', {
  eventId: event.id,
  type: event.type,
  source: event.source,
  userId: event.context?.userId,
  traceId: event.context?.traceId,
});

this.logger.log('Event processed', {
  eventId: event.id,
  type: event.type,
  duration: `${Date.now() - startTime}ms`,
});

Мониторинг

Метрики Kafka

import { Counter, Histogram, Gauge } from 'prom-client';

export const kafkaMessagesProducedTotal = new Counter({
  name: 'kafka_messages_produced_total',
  help: 'Total number of messages produced',
  labelNames: ['topic'],
});

export const kafkaMessagesConsumedTotal = new Counter({
  name: 'kafka_messages_consumed_total',
  help: 'Total number of messages consumed',
  labelNames: ['topic', 'consumer_group'],
});

export const kafkaConsumerLag = new Gauge({
  name: 'kafka_consumer_lag',
  help: 'Consumer lag by topic and partition',
  labelNames: ['topic', 'partition', 'consumer_group'],
});

export const kafkaMessageProcessingDuration = new Histogram({
  name: 'kafka_message_processing_duration_seconds',
  help: 'Time spent processing Kafka messages',
  labelNames: ['topic'],
  buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
});

Алерты

groups:
  - name: kafka
    rules:
      - alert: KafkaConsumerLagHigh
        expr: kafka_consumer_lag > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: 'Kafka consumer lag is high'
          description: 'Consumer lag is {{ $value }} for {{ $labels.topic }}'

      - alert: KafkaMessageProcessingSlow
        expr: histogram_quantile(0.95, kafka_message_processing_duration_seconds) > 1
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: 'Kafka message processing is slow'
          description: 'P95 processing time is {{ $value }}s'

      - alert: KafkaBrokerDown
        expr: up{job="kafka"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: 'Kafka broker is down'

Troubleshooting

Проблема: Consumer lag

Диагностика:

# Проверка consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group ledger-service --describe

# Проверка позиций консьюмеров
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group ledger-service --members

Решение:

  1. Увеличьте количество консьюмеров
  2. Оптимизируйте обработку событий
  3. Увеличьте количество партиций

Проблема: Дубликаты сообщений

Причина:

  • Producer retry
  • Consumer restart

Решение:

  • Используйте идемпотентность
  • Проверяйте idempotency_key
  • Храните обработанные события

Проблема: Сообщения не доставляются

Диагностика:

# Проверка топиков
kafka-topics.sh --list --bootstrap-server localhost:9092

# Проверка сообщений
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic maniton.ledger.events.v1 --from-beginning

# Проверка консьюмеров
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --list

Решение:

  1. Проверьте конфигурацию producer
  2. Проверьте сетевое подключение
  3. Проверьте логи брокера

Дополнительные ресурсы

On this page