Манитон Docs
Reference

Besu Connector - Реализация

Детальная реализация Besu Connector на Go

Besu Connector - Реализация

Обзор

Besu Connector — Go микросервис для работы с Hyperledger Besu через JSON-RPC. Принимает команды через Kafka, исполняет их в Besu, публикует события обратно в Kafka.

Структура проекта

apps/services/besu-connector/
├── cmd/
│   └── besu-connector/
│       └── main.go
├── internal/
│   ├── besu/               # Besu RPC клиент
│   │   ├── client.go
│   │   └── types.go
│   ├── config/             # Конфигурация
│   │   └── config.go
│   ├── encoding/           # Кодирование/декодирование
│   │   ├── envelope.go
│   │   └── protobuf.go
│   ├── handlers/           # Обработчики команд
│   │   ├── submit_transaction.go
│   │   ├── get_transaction_receipt.go
│   │   ├── get_block.go
│   │   └── verify_block_hash.go
│   ├── kafka/              # Kafka клиент
│   │   ├── consumer.go
│   │   └── producer.go
│   └── logging/            # Логирование
├── go.mod
├── go.sum
└── README.md

Точка входа

Main

// cmd/besu-connector/main.go
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/maniton/besu-connector/internal/config"
	"github.com/maniton/besu-connector/internal/kafka"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Загрузка конфигурации
	cfg, err := config.Load()
	if err != nil {
		log.Fatalf("Failed to load config: %v", err)
	}

	// Создание Kafka consumer и producer
	consumer, err := kafka.NewConsumer(cfg)
	if err != nil {
		log.Fatalf("Failed to create consumer: %v", err)
	}

	producer, err := kafka.NewProducer(cfg)
	if err != nil {
		log.Fatalf("Failed to create producer: %v", err)
	}

	// Запуск обработки сообщений
	go func() {
		if err := consumer.Start(ctx); err != nil {
			log.Printf("Consumer error: %v", err)
		}
	}()

	// Graceful shutdown
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	<-sigCh
	log.Println("Shutting down...")

	cancel()

	if err := consumer.Close(); err != nil {
		log.Printf("Failed to close consumer: %v", err)
	}

	if err := producer.Close(); err != nil {
		log.Printf("Failed to close producer: %v", err)
	}

	log.Println("Shutdown complete")
}

Конфигурация

Config

// internal/config/config.go
package config

import (
	"os"
	"time"
)

type Config struct {
	Kafka struct {
		Brokers        []string
		GroupID        string
		CommandTopics  []string
		EventTopic     string
		EventTopics    map[string]string
		ClientID       string
		ReadTimeout    time.Duration
		WriteTimeout   time.Duration
	}
	Besu struct {
		RPCURL         string
		RequestTimeout time.Duration
	}
	ShutdownTimeout time.Duration
}

func Load() (*Config, error) {
	cfg := &Config{
		Kafka: KafkaConfig{
			Brokers:        getEnvSlice("KAFKA_BROKERS", []string{"localhost:9092"}),
			GroupID:        getEnv("KAFKA_GROUP_ID", "besu-connector"),
			CommandTopics:  getEnvSlice("KAFKA_COMMAND_TOPICS", []string{}),
			EventTopic:     getEnv("KAFKA_EVENT_TOPIC", "besu.events"),
			EventTopics:    parseEventTopics(getEnv("KAFKA_EVENT_TOPICS", "")),
			ClientID:       getEnv("KAFKA_CLIENT_ID", "besu-connector"),
			ReadTimeout:    getDuration("KAFKA_READ_TIMEOUT", 10*time.Second),
			WriteTimeout:   getDuration("KAFKA_WRITE_TIMEOUT", 10*time.Second),
		},
		Besu: BesuConfig{
			RPCURL:         getEnv("BESU_RPC_URL", "http://localhost:8545"),
			RequestTimeout: getDuration("REQUEST_TIMEOUT", 10*time.Second),
		},
		ShutdownTimeout: getDuration("SHUTDOWN_TIMEOUT", 10*time.Second),
	}

	return cfg, nil
}

func getEnv(key, defaultValue string) string {
	if value := os.Getenv(key); value != "" {
		return value
	}
	return defaultValue
}

func getEnvSlice(key string, defaultValue []string) []string {
	if value := os.Getenv(key); value != "" {
		return strings.Split(value, ",")
	}
	return defaultValue
}

func getDuration(key string, defaultValue time.Duration) time.Duration {
	if value := os.Getenv(key); value != "" {
		if duration, err := time.ParseDuration(value); err == nil {
			return duration
		}
	}
	return defaultValue
}

func parseEventTopics(s string) map[string]string {
	topics := make(map[string]string)
	if s == "" {
		return topics
	}

	pairs := strings.Split(s, ",")
	for _, pair := range pairs {
		parts := strings.Split(pair, "=")
		if len(parts) == 2 {
			topics[parts[0]] = parts[1]
		}
	}

	return topics
}

Kafka Consumer

Consumer

// internal/kafka/consumer.go
package kafka

import (
	"context"
	"log"
	"time"

	"github.com/IBM/sarama"
	"github.com/maniton/besu-connector/internal/config"
	"github.com/maniton/besu-connector/internal/encoding"
	"github.com/maniton/besu-connector/internal/handlers"
)

type Consumer struct {
	consumer sarama.ConsumerGroup
	cfg      *config.Config
}

func NewConsumer(cfg *config.Config) (*Consumer, error) {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true

	consumerGroup, err := sarama.NewConsumerGroup(
		cfg.Kafka.Brokers,
		cfg.Kafka.GroupID,
		sarama.NewBalancer(),
		config,
	)
	if err != nil {
		return nil, err
	}

	return &Consumer{
		consumer: consumerGroup,
		cfg:      cfg,
	}, nil
}

func (c *Consumer) Start(ctx context.Context) error {
	topics := c.cfg.Kafka.CommandTopics

	handler := &MessageHandler{
		cfg: c.cfg,
	}

	consumeCtx, cancel := context.WithCancel(context.Background())

	go func() {
		for {
			if err := c.consumer.Consume(consumeCtx, topics, handler); err != nil {
				log.Printf("Consumer error: %v", err)
				time.Sleep(5 * time.Second)
			}
		}
	}()

	<-ctx.Done()
	cancel()

	return c.consumer.Close()
}

func (c *Consumer) Close() error {
	return c.consumer.Close()
}

type MessageHandler struct {
	cfg *config.Config
}

func (h *MessageHandler) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (h *MessageHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *MessageHandler) ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error {
	return nil
}

func (h *MessageHandler) Consume(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {
	// Декодирование сообщения
	envelope, err := encoding.DecodeEnvelope(msg.Value)
	if err != nil {
		log.Printf("Failed to decode envelope: %v", err)
		return nil
	}

	// Обработка команды
	if err := h.handleCommand(envelope); err != nil {
		log.Printf("Failed to handle command: %v", err)
		return nil
	}

	return nil
}

func (h *MessageHandler) handleCommand(envelope *encoding.EventEnvelope) error {
	// Определение типа команды
	switch envelope.Event.Type {
	case "maniton.besu.v1.SubmitTransactionCommand":
		return handlers.SubmitTransaction(h.cfg, envelope)
	case "maniton.besu.v1.GetTransactionReceiptCommand":
		return handlers.GetTransactionReceipt(h.cfg, envelope)
	case "maniton.besu.v1.GetBlockCommand":
		return handlers.GetBlock(h.cfg, envelope)
	case "maniton.besu.v1.VerifyBlockHashCommand":
		return handlers.VerifyBlockHash(h.cfg, envelope)
	default:
		log.Printf("Unknown command type: %s", envelope.Event.Type)
		return nil
	}
}

Kafka Producer

Producer

// internal/kafka/producer.go
package kafka

import (
	"log"

	"github.com/IBM/sarama"
	"github.com/maniton/besu-connector/internal/config"
	"github.com/maniton/besu-connector/internal/encoding"
)

type Producer struct {
	producer sarama.SyncProducer
	cfg      *config.Config
}

func NewProducer(cfg *config.Config) (*Producer, error) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true

	producer, err := sarama.NewSyncProducer(cfg.Kafka.Brokers, config)
	if err != nil {
		return nil, err
	}

	return &Producer{
		producer: producer,
		cfg:      cfg,
	}, nil
}

func (p *Producer) Publish(topic string, envelope *encoding.EventEnvelope) error {
	// Кодирование сообщения
	bytes, err := encoding.EncodeEnvelope(envelope)
	if err != nil {
		return err
	}

	// Определение topic
	eventTopic := p.getEventTopic(envelope.Event.Type)

	// Отправка сообщения
	msg := &sarama.ProducerMessage{
		Topic: eventTopic,
		Value: sarama.ByteEncoder(bytes),
	}

	partition, offset, err := p.producer.SendMessage(msg)
	if err != nil {
		log.Printf("Failed to send message: %v", err)
		return err
	}

	log.Printf("Message sent to topic %s, partition %d, offset %d", topic, partition, offset)
	return nil
}

func (p *Producer) getEventTopic(eventType string) string {
	// Проверка карты типов событий в топики
	if topic, ok := p.cfg.Kafka.EventTopics[eventType]; ok {
		return topic
	}
	// Использование дефолтного топика
	return p.cfg.Kafka.EventTopic
}

func (p *Producer) Close() error {
	return p.producer.Close()
}

Обработчики команд

SubmitTransaction

// internal/handlers/submit_transaction.go
package handlers

import (
	"context"
	"log"
	"time"

	"github.com/maniton/besu-connector/internal/besu"
	"github.com/maniton/besu-connector/internal/config"
	"github.com/maniton/besu-connector/internal/encoding"
	"github.com/maniton/besu-connector/internal/kafka"
)

func SubmitTransaction(cfg *config.Config, envelope *encoding.EventEnvelope) error {
	// Декодирование команды
	cmd, err := encoding.DecodeSubmitTransactionCommand(envelope)
	if err != nil {
		log.Printf("Failed to decode command: %v", err)
		return err
	}

	// Создание клиента Besu
	client, err := besu.NewClient(cfg.Besu.RPCURL, cfg.Besu.RequestTimeout)
	if err != nil {
		log.Printf("Failed to create Besu client: %v", err)
		return err
	}

	// Отправка транзакции
	ctx, cancel := context.WithTimeout(context.Background(), cfg.Besu.RequestTimeout)
	defer cancel()

	txHash, err := client.SendRawTransaction(ctx, cmd.Payload.Data)
	if err != nil {
		log.Printf("Failed to send transaction: %v", err)

		// Публикация события об ошибке
		event := encoding.NewTransactionFailedEvent(
			envelope.Context,
			cmd.Payload.Data,
			err.Error(),
		)

		publishEvent(cfg, event)
		return err
	}

	// Публикация события об успехе
	event := encoding.NewTransactionSubmittedEvent(
		envelope.Context,
		txHash,
	)

	return publishEvent(cfg, event)
}

func publishEvent(cfg *config.Config, event *encoding.EventEnvelope) error {
	producer, err := kafka.NewProducer(cfg)
	if err != nil {
		return err
	}
	defer producer.Close()

	return producer.Publish(event.Event.Type, event)
}

GetTransactionReceipt

// internal/handlers/get_transaction_receipt.go
package handlers

import (
	"context"
	"log"

	"github.com/maniton/besu-connector/internal/besu"
	"github.com/maniton/besu-connector/internal/config"
	"github.com/maniton/besu-connector/internal/encoding"
)

func GetTransactionReceipt(cfg *config.Config, envelope *encoding.EventEnvelope) error {
	// Декодирование команды
	cmd, err := encoding.DecodeGetTransactionReceiptCommand(envelope)
	if err != nil {
		log.Printf("Failed to decode command: %v", err)
		return err
	}

	// Создание клиента Besu
	client, err := besu.NewClient(cfg.Besu.RPCURL, cfg.Besu.RequestTimeout)
	if err != nil {
		log.Printf("Failed to create Besu client: %v", err)
		return err
	}

	// Получение receipt
	ctx, cancel := context.WithTimeout(context.Background(), cfg.Besu.RequestTimeout)
	defer cancel()

	receipt, err := client.GetTransactionReceipt(ctx, cmd.TxHash)
	if err != nil {
		log.Printf("Failed to get transaction receipt: %v", err)
		return err
	}

	// Публикация события
	event := encoding.NewTransactionReceiptReadyEvent(
		envelope.Context,
		cmd.TxHash,
		receipt,
	)

	return publishEvent(cfg, event)
}

GetBlock

// internal/handlers/get_block.go
package handlers

import (
	"context"
	"log"

	"github.com/maniton/besu-connector/internal/besu"
	"github.com/maniton/besu-connector/internal/config"
	"github.com/maniton/besu-connector/internal/encoding"
)

func GetBlock(cfg *config.Config, envelope *encoding.EventEnvelope) error {
	// Декодирование команды
	cmd, err := encoding.DecodeGetBlockCommand(envelope)
	if err != nil {
		log.Printf("Failed to decode command: %v", err)
		return err
	}

	// Создание клиента Besu
	client, err := besu.NewClient(cfg.Besu.RPCURL, cfg.Besu.RequestTimeout)
	if err != nil {
		log.Printf("Failed to create Besu client: %v", err)
		return err
	}

	// Получение блока
	ctx, cancel := context.WithTimeout(context.Background(), cfg.Besu.RequestTimeout)
	defer cancel()

	block, err := client.GetBlock(ctx, cmd.BlockNumber)
	if err != nil {
		log.Printf("Failed to get block: %v", err)
		return err
	}

	// Публикация события
	event := encoding.NewBlockReadEvent(
		envelope.Context,
		block,
	)

	return publishEvent(cfg, event)
}

Besu Client

Client

// internal/besu/client.go
package besu

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"
)

type Client struct {
	rpcURL string
	client *http.Client
}

type RPCRequest struct {
	Jsonrpc string `json:"jsonrpc"`
	Method  string `json:"method"`
	Params  []any  `json:"params"`
	ID      int    `json:"id"`
}

type RPCResponse struct {
	Jsonrpc string `json:"jsonrpc"`
	ID      int    `json:"id"`
	Result  any    `json:"result,omitempty"`
	Error   *struct {
		Code    int    `json:"code"`
		Message string `json:"message"`
	} `json:"error,omitempty"`
}

func NewClient(rpcURL string, timeout time.Duration) (*Client, error) {
	return &Client{
		rpcURL: rpcURL,
		client: &http.Client{
			Timeout: timeout,
		},
	}, nil
}

func (c *Client) SendRawTransaction(ctx context.Context, data string) (string, error) {
	req := RPCRequest{
		Jsonrpc: "2.0",
		Method:  "eth_sendRawTransaction",
		Params:  []any{data},
		ID:      1,
	}

	resp, err := c.doRequest(ctx, req)
	if err != nil {
		return "", err
	}

	if resp.Error != nil {
		return "", fmt.Errorf("RPC error: %s", resp.Error.Message)
	}

	txHash, ok := resp.Result.(string)
	if !ok {
		return "", fmt.Errorf("invalid response type")
	}

	return txHash, nil
}

func (c *Client) GetTransactionReceipt(ctx context.Context, txHash string) (any, error) {
	req := RPCRequest{
		Jsonrpc: "2.0",
		Method:  "eth_getTransactionReceipt",
		Params:  []any{txHash},
		ID:      1,
	}

	resp, err := c.doRequest(ctx, req)
	if err != nil {
		return nil, err
	}

	if resp.Error != nil {
		return nil, fmt.Errorf("RPC error: %s", resp.Error.Message)
	}

	return resp.Result, nil
}

func (c *Client) GetBlock(ctx context.Context, blockNumber int) (any, error) {
	req := RPCRequest{
		Jsonrpc: "2.0",
		Method:  "eth_getBlockByNumber",
		Params:  []any{fmt.Sprintf("0x%x", blockNumber)},
		ID:      1,
	}

	resp, err := c.doRequest(ctx, req)
	if err != nil {
		return nil, err
	}

	if resp.Error != nil {
		return nil, fmt.Errorf("RPC error: %s", resp.Error.Message)
	}

	return resp.Result, nil
}

func (c *Client) doRequest(ctx context.Context, req RPCRequest) (*RPCResponse, error) {
	body, err := json.Marshal(req)
	if err != nil {
		return nil, err
	}

	httpReq, err := http.NewRequestWithContext(ctx, "POST", c.rpcURL, bytes.NewReader(body))
	if err != nil {
		return nil, err
	}

	httpReq.Header.Set("Content-Type", "application/json")

	resp, err := c.client.Do(httpReq)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	var rpcResp RPCResponse
	if err := json.NewDecoder(resp.Body).Decode(&rpcResp); err != nil {
		return nil, err
	}

	return &rpcResp, nil
}

Кодирование

Envelope

// internal/encoding/envelope.go
package encoding

import (
	"encoding/json"
	"fmt"

	"github.com/golang/protobuf/proto"
	"github.com/golang/protobuf/ptypes/any"
	"github.com/maniton/contracts/gen/go/maniton/events/v1"
	"github.com/maniton/contracts/gen/go/maniton/besu/v1"
	"github.com/maniton/contracts/gen/go/maniton/common/v1"
)

type EventEnvelope struct {
	Event   *events.EventEnvelope
	Context *common.RequestContext
}

func DecodeEnvelope(data []byte) (*EventEnvelope, error) {
	var envelope events.EventEnvelope
	if err := proto.Unmarshal(data, &envelope); err != nil {
		return nil, fmt.Errorf("failed to unmarshal envelope: %w", err)
	}

	return &EventEnvelope{
		Event:   &envelope,
		Context: envelope.Context,
	}, nil
}

func EncodeEnvelope(envelope *EventEnvelope) ([]byte, error) {
	env := &events.EventEnvelope{
		Event:   envelope.Event,
		Context: envelope.Context,
	}

	return proto.Marshal(env)
}

func DecodeSubmitTransactionCommand(envelope *EventEnvelope) (*besu.SubmitTransactionCommand, error) {
	any := envelope.Event.Data
	if any == nil {
		return nil, fmt.Errorf("no data in envelope")
	}

	cmd := &besu.SubmitTransactionCommand{}
	if err := any.UnmarshalTo(cmd); err != nil {
		return nil, fmt.Errorf("failed to unmarshal command: %w", err)
	}

	return cmd, nil
}

func NewTransactionSubmittedEvent(context *common.RequestContext, txHash string) *EventEnvelope {
	cmd := &besu.TransactionSubmittedEvent{
		Context: context,
		TxHash:  txHash,
	}

	any, err := any.New(cmd, proto.MarshalOptions{})
	if err != nil {
		return nil, fmt.Errorf("failed to create Any: %w", err)
	}

	event := &events.CloudEvent{
		Id:             uuid.New().String(),
		Source:         "besu-connector",
		SpecVersion:    "1.0",
		Type:           "maniton.besu.v1.TransactionSubmittedEvent",
		DataContentType: "application/protobuf",
		Data:           any,
	}

	return &EventEnvelope{
		Event:   event,
		Context: context,
	}
}

func NewTransactionFailedEvent(context *common.RequestContext, data string, reason string) *EventEnvelope {
	cmd := &besu.TransactionFailedEvent{
		Context: context,
		Data:    data,
		Reason:  reason,
	}

	any, err := any.New(cmd, proto.MarshalOptions{})
	if err != nil {
		return nil, fmt.Errorf("failed to create Any: %w", err)
	}

	event := &events.CloudEvent{
		Id:             uuid.New().String(),
		Source:         "besu-connector",
		SpecVersion:    "1.0",
		Type:           "maniton.besu.v1.TransactionFailedEvent",
		DataContentType: "application/protobuf",
		Data:           any,
	}

	return &EventEnvelope{
		Event:   event,
		Context: context,
	}
}

func NewTransactionReceiptReadyEvent(context *common.RequestContext, txHash string, receipt any) *EventEnvelope {
	cmd := &besu.TransactionReceiptReadyEvent{
		Context: context,
		TxHash:  txHash,
		Receipt: receipt,
	}

	any, err := any.New(cmd, proto.MarshalOptions{})
	if err != nil {
		return nil, fmt.Errorf("failed to create Any: %w", err)
	}

	event := &events.CloudEvent{
		Id:             uuid.New().String(),
		Source:         "besu-connector",
		SpecVersion:    "1.0",
		Type:           "maniton.besu.v1.TransactionReceiptReadyEvent",
		DataContentType: "application/protobuf",
		Data:           any,
	}

	return &EventEnvelope{
		Event:   event,
		Context: context,
	}
}

func NewBlockReadEvent(context *common.RequestContext, block any) *EventEnvelope {
	cmd := &besu.BlockReadEvent{
		Context: context,
		Block:   block,
	}

	any, err := any.New(cmd, proto.MarshalOptions{})
	if err != nil {
		return nil, fmt.Errorf("failed to create Any: %w", err)
	}

	event := &events.CloudEvent{
		Id:             uuid.New().String(),
		Source:         "besu-connector",
		SpecVersion:    "1.0",
		Type:           "maniton.besu.v1.BlockReadEvent",
		DataContentType: "application/protobuf",
		Data:           any,
	}

	return &EventEnvelope{
		Event:   event,
		Context: context,
	}
}

Тестирование

Unit Tests

// internal/besu/client_test.go
package besu

import (
	"context"
	"testing"
	"time"
)

func TestClient_SendRawTransaction(t *testing.T) {
	client, err := NewClient("http://localhost:8545", 10*time.Second)
	if err != nil {
		t.Fatalf("Failed to create client: %v", err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	txHash, err := client.SendRawTransaction(ctx, "0x02f8...")
	if err != nil {
		t.Fatalf("Failed to send transaction: %v", err)
	}

	if txHash == "" {
		t.Fatal("Empty transaction hash")
	}
}

Integration Tests

// internal/integration_test.go
package integration

import (
	"testing"
	"time"

	"github.com/maniton/besu-connector/internal/config"
	"github.com/maniton/besu-connector/internal/kafka"
)

func TestIntegration(t *testing.T) {
	cfg := &config.Config{
		Kafka: config.KafkaConfig{
			Brokers:       []string{"localhost:9092"},
			GroupID:       "test-group",
			CommandTopics: []string{"besu.command.getBlock"},
			EventTopic:    "besu.events",
		},
		Besu: config.BesuConfig{
			RPCURL:         "http://localhost:8545",
			RequestTimeout: 10 * time.Second,
		},
	}

	consumer, err := kafka.NewConsumer(cfg)
	if err != nil {
		t.Fatalf("Failed to create consumer: %v", err)
	}

	producer, err := kafka.NewProducer(cfg)
	if err != nil {
		t.Fatalf("Failed to create producer: %v", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		if err := consumer.Start(ctx); err != nil {
			t.Logf("Consumer error: %v", err)
		}
	}()

	// Ожидание обработки сообщений
	time.Sleep(5 * time.Second)

	cancel()
}

Мониторинг

Метрики

// internal/metrics/metrics.go
package metrics

import (
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
	commandsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "besu_commands_total",
		Help: "Total number of Besu commands",
	}, []string{"command", "status"})

	commandDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Name: "besu_command_duration_seconds",
		Help: "Time spent on Besu commands",
	}, []string{"command"})

	eventsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "besu_events_total",
		Help: "Total number of Besu events",
	}, []string{"event"})
)

	kafkaErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
		Name: "kafka_errors_total",
		Help: "Total number of Kafka errors",
	})
)

func RecordCommand(command string, status string) {
	commandsTotal.WithLabelValues(command, status).Inc()
}

func RecordCommandDuration(command string, duration float64) {
	commandDuration.WithLabelValues(command).Observe(duration)
}

func RecordEvent(event string) {
	eventsTotal.WithLabelValues(event).Inc()
}

func RecordKafkaError() {
	kafkaErrorsTotal.Inc()
}

Логи

// internal/logging/logger.go
package logging

import (
	"log"
	"os"
)

type Logger struct {
	prefix string
}

func NewLogger(prefix string) *Logger {
	return &Logger{
		prefix: prefix,
	}
}

func (l *Logger) Info(message string, args ...interface{}) {
	log.Printf("[INFO] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
}

func (l *Logger) Error(message string, err error, args ...interface{}) {
	if err != nil {
		log.Printf("[ERROR] [%s] %s: %v", l.prefix, fmt.Sprintf(message, args...), err)
	} else {
		log.Printf("[ERROR] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
	}
}

func (l *Logger) Warn(message string, args ...interface{}) {
	log.Printf("[WARN] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
}

func (l *Logger) Debug(message string, args ...interface{}) {
	if os.Getenv("LOG_LEVEL") == "debug" {
		log.Printf("[DEBUG] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
	}
}

Troubleshooting

Проблема: Kafka consumer не подключается

Решение:

  1. Проверьте, что Kafka брокеры доступны
  2. Проверьте, что group ID уникален
  3. Проверьте логи consumer

Проблема: Besu RPC не отвечает

Решение:

  1. Проверьте, что RPC URL правильный
  2. Проверьте, что Besu запущен
  3. Проверьте таймаут запроса

Проблема: События не публикуются

Решение:

  1. Проверьте, что producer создан
  2. Проверьте, что topic существует
  3. Проверьте логи producer

Дополнительные ресурсы

On this page