This commit is contained in:
r8zavetr8v 2024-08-11 04:51:18 +03:00
commit 85499b626f
32 changed files with 9963 additions and 0 deletions

4
.dockerignore Normal file
View File

@ -0,0 +1,4 @@
build
.vscode
.gitignore
Makefile

20
.gitea/test.yml Normal file
View File

@ -0,0 +1,20 @@
name: checks
on:
- push
- pull_request
jobs:
lint:
name: check and test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'
- name: vet checks
run: make vet
- name: build
run: make build
- name: test
run: make test

29
.github/workflows/go.yml vendored Normal file
View File

@ -0,0 +1,29 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
name: Go
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
jobs:
test-and-lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22.3'
- name: test
run: go test -v ./...
- name: lint
run: make get.tools && make lint

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
build
.env
.vscode
tools/golangci-lint
tools

54
.golangci-lint.yaml Normal file
View File

@ -0,0 +1,54 @@
run:
concurrency: 8
timeout: 10m
issues-exit-code: 1
tests: true
modules-download-mode: readonly
allow-parallel-runners: true
allow-serial-runners: false
output:
print-issued-lines: false
print-linter-name: true
uniq-by-line: false
path-prefix: ""
sort-results: true
issues:
fix: false
linters:
disable-all: true
enable:
- errcheck
- dupl
- exhaustive
- unparam
- unused
- usestdlibvars
- wastedassign
- prealloc
- rowserrcheck
- sqlclosecheck
- gocritic
- godox
- gofmt
- lll
- misspell
- wsl
- unconvert
- funlen
fast: false
linters-settings:
errcheck:
check-type-assertions: true
check-blank: true
disable-default-exclusions: true
lll:
line-length: 110
tab-width: 8

24
Dockerfile Normal file
View File

@ -0,0 +1,24 @@
FROM golang:alpine AS builder
LABEL stage=gobuilder
ENV CGO_ENABLED 0
RUN apk update --no-cache && apk add --no-cache tzdata
WORKDIR /build
ADD go.mod .
ADD go.sum .
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /app/blk cmd/app/main.go
FROM alpine:latest
WORKDIR /build
COPY --from=builder /app/blk .
RUN chmod +x ./blk
EXPOSE 8080
CMD ["./blk"]

16
Dockerfile.test Normal file
View File

@ -0,0 +1,16 @@
FROM golang:alpine AS builder
LABEL stage=gobuilder
ENV CGO_ENABLED 0
RUN apk update --no-cache && apk add --no-cache tzdata
WORKDIR /test
ADD go.mod .
ADD go.sum .
RUN go mod download
COPY . .
CMD ["go", "test", "-v", "./..."]

33
Makefile Normal file
View File

@ -0,0 +1,33 @@
PROJECT_DIR = $(CURDIR)
PROJECT_BIN = ${PROJECT_DIR}/bin
TOOLS_BIN = ${PROJECT_DIR}/tools
.PHONY: bin.build
bin.build:
mkdir -p ${PROJECT_DIR}/build
rm -f ${PROJECT_DIR}/build/blk
go build -ldflags="-s -w" -o ${PROJECT_DIR}/build/blk ${PROJECT_DIR}/cmd/app/main.go
.PHONY: up
up:
sudo docker compose -f docker-compose.yaml up --build -d
.PHONY: run.local
run.local: bin.build
${PROJECT_DIR}/build/blk
.PHONY: test
test:
sudo docker compose -f docker-compose.test.yaml up --build --abort-on-container-exit
sudo docker compose -f docker-compose.test.yaml down --volumes
.PHONY: get.tools
get.tools:
mkdir -p ${TOOLS_BIN}
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b ${TOOLS_BIN} v1.59.0
.PHONY: lint
lint:
${TOOLS_BIN}/golangci-lint run --config ./.golangci-lint.yaml ./...

54
README.md Normal file
View File

@ -0,0 +1,54 @@
# BLK
Fetches an address with the largest balance delta over the last N ETH blocks
## Build
### Docker
1. Install docker
2. Create account at [getblock.io](https://www.getblock.io/) and get an access token.
3. In a root of the project, create *.env* file and fill it with the following:
```
BLK_GETBLOCK_ACCESS_TOKEN=my0access0toke0here ## Access token
BLK_LOG_LEVEL=info ## Log level [debug / info]
BLK_HTTP_ADDR=0.0.0.0:8085 ## Listen address
```
4. Build it
```bash
make up
```
### Locally
*min go version go1.22.3*
```bash
make bin.build
BLK_GETBLOCK_ACCESS_TOKEN=TOKEN BLK_LOG_LEVEL=info BLK_HTTP_ADDR=0.0.0.0:8085 $(pwd)/build/blk
```
## API
### GET /most-changed?blocks=$1
Request parameters:
* blocks - type: uint (optional). Limits amount of blocks chat will be checked from head.
Default: 100, Max: 150
Example:
```bash
curl --request GET \
--url 'http://localhost:8085/most-changed'
```
Response:
```json
{
"address": "0x3f0c3faeeeb9dad6ef6eb5fbab61039ff9067a07",
}
```
## Testing
### Run tests (docker)
```bash
make test
```
### Lint
```bash
make get.tools
make lint
```

19
cmd/app/main.go Normal file
View File

@ -0,0 +1,19 @@
package main
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/optclblast/blk/internal/app"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
if err := app.Init(ctx); err != nil {
panic(err)
}
}

8
docker-compose.test.yaml Normal file
View File

@ -0,0 +1,8 @@
services:
blk:
container_name: blk-test
build:
context: .
dockerfile: ./Dockerfile.test
env_file:
- .env

10
docker-compose.yaml Normal file
View File

@ -0,0 +1,10 @@
services:
blk:
container_name: blk
build:
context: .
dockerfile: ./Dockerfile
env_file:
- .env
ports:
- 8085:8085

10
go.mod Normal file
View File

@ -0,0 +1,10 @@
module github.com/optclblast/blk
go 1.22.3
require (
github.com/alitto/pond v1.8.3
github.com/go-chi/chi/v5 v5.0.12
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/ybbus/jsonrpc/v3 v3.1.5
)

16
go.sum Normal file
View File

@ -0,0 +1,16 @@
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.0.12 h1:9euLV5sTrTNTRUU9POmDUvfxyj6LAABLUcEWO+JJb4s=
github.com/go-chi/chi/v5 v5.0.12/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/ybbus/jsonrpc/v3 v3.1.5 h1:0cC/QzS8OCuXYqqDbYnKKhsEe+IZLrNlDx8KPCieeW0=
github.com/ybbus/jsonrpc/v3 v3.1.5/go.mod h1:U1QbyNfL5Pvi2roT0OpRbJeyvGxfWYSgKJHjxWdAEeE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

82
internal/app/app.go Normal file
View File

@ -0,0 +1,82 @@
package app
import (
"context"
"log/slog"
"os"
"github.com/optclblast/blk/internal/controller/http"
"github.com/optclblast/blk/internal/infrastructure/getblock"
"github.com/optclblast/blk/internal/logger"
"github.com/optclblast/blk/internal/server"
"github.com/optclblast/blk/internal/usecase"
)
const (
// API access token
getblockAccessTokenEnv = "BLK_GETBLOCK_ACCESS_TOKEN"
// Log level
logLevelEnv = "BLK_LOG_LEVEL"
// Listen address
httpAddrEnv = "BLK_HTTP_ADDR"
)
// Init is a main function in our application lifecycle.
// Init is responsible for bringing all the system's components together.
func Init(ctx context.Context) error {
// Fetch env vars
getblockAccessToken := os.Getenv(getblockAccessTokenEnv)
logLevel := os.Getenv(logLevelEnv)
httpAddr := os.Getenv(httpAddrEnv)
// Build logger
log := logger.NewBuilder().
WithLevel(logger.MapLevel(logLevel)).
Build()
log.Info(
"starting blk server 0w0",
slog.String("address", httpAddr),
slog.String("log level", logLevel),
)
// Initialize node provider client
getblockClient := getblock.NewClient(
log.WithGroup("getblock-client"),
getblockAccessToken,
)
// Initialize application layer
ethInteractor := usecase.NewEthInteractor(
log.WithGroup("eth-interactor"),
getblockClient,
)
// Initialize controller layer
walletsController := http.NewWalletsController(
log.WithGroup("wallets-controller"),
ethInteractor,
)
// Build a router
router := http.NewRouter(
log.WithGroup("router"),
walletsController,
)
// And run server with it
server := server.New(router, httpAddr)
select {
case <-ctx.Done():
log.Info("shutting down blk server. bye bye! =w=")
case err := <-server.Notify():
log.Error("error listen to net ;_;", logger.Err(err))
}
if err := server.Shutdown(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,6 @@
package http
// MostChangedWalletAddress response DTO object
type MostChangedWalletAddressResponse struct {
Address string `json:"address"`
}

View File

@ -0,0 +1,45 @@
package http
import (
"errors"
"net/http"
"github.com/optclblast/blk/internal/infrastructure/getblock"
)
var (
// ErrorBadQueryParams is thrown wheh query parameters are invalid
ErrorBadQueryParams = errors.New("bad query params")
)
// api error dto object
type apiError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// buildApiError returns a new apiError instance built from error code and error message
func buildApiError(
code int,
message string,
) apiError {
return apiError{
Code: code,
Message: message,
}
}
// mapError maps internal errors to its API representation
func mapError(err error) apiError {
switch {
case errors.Is(err, ErrorBadQueryParams):
return buildApiError(http.StatusBadRequest, "Invalid Query Params")
case errors.Is(err, getblock.ErrorRateLimitExceeded):
return buildApiError(
http.StatusTooManyRequests,
"GetBlock API rate limit exceeded! Type again later",
)
default:
return buildApiError(http.StatusInternalServerError, "Internal Server Error")
}
}

View File

@ -0,0 +1,115 @@
package http
import (
"encoding/json"
"log/slog"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/optclblast/blk/internal/logger"
)
// router object
type router struct {
*chi.Mux
log *slog.Logger
walletsController WalletsController
}
// NewRouter returns a new http.Handler object that can power your server
func NewRouter(
log *slog.Logger,
walletsController WalletsController,
) http.Handler {
r := &router{
Mux: chi.NewRouter(),
log: log,
walletsController: walletsController,
}
r.Use(middleware.Recoverer)
r.Use(handleMw)
r.Get("/most-changed", r.handle(
r.walletsController.MostChangedWalletAddress,
"most-changed",
))
return r
}
// handleMw adds content type headers
func handleMw(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
next.ServeHTTP(w, r)
})
}
type handleFunc func(w http.ResponseWriter, req *http.Request) (any, error)
// handle is a helper functions that makes it easier to work with http handlers
func (s *router) handle(
h handleFunc,
method_name string,
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
resp, err := h(w, r)
if err != nil {
s.log.Error(
"http error",
slog.String("method_name", method_name),
logger.Err(err),
)
s.responseError(w, err)
return
}
out, err := json.Marshal(resp)
if err != nil {
s.log.Error(
"error marshal response",
slog.String("method_name", method_name),
logger.Err(err),
slog.Any("object", resp),
)
s.responseError(w, err)
return
}
w.WriteHeader(http.StatusOK)
if _, err = w.Write(out); err != nil {
s.log.Error(
"error write http response",
slog.String("method_name", method_name),
logger.Err(err),
)
}
}
}
func (s *router) responseError(
w http.ResponseWriter,
e error,
) {
apiErr := mapError(e)
out, err := json.Marshal(apiErr)
if err != nil {
return
}
w.WriteHeader(apiErr.Code)
if _, err := w.Write(out); err != nil {
s.log.Error("error write error to connection", logger.Err(err))
}
}

View File

@ -0,0 +1,88 @@
package http
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"strconv"
"time"
"github.com/optclblast/blk/internal/usecase"
)
type WalletsController interface {
// MostChangedWalletAddress returns the address of the wallet whose balance
// delta was the highest among other wallets participating in transactions
// from numBlocks blocks to the HEAD block.
MostChangedWalletAddress(w http.ResponseWriter, r *http.Request) (any, error)
}
const (
defaultNumBlocks = 100
maxNumBlocks = 150
)
// MostChangedWalletAddress returns the address of the wallet whose balance
// delta was the highest among other wallets participating in transactions
// from numBlocks blocks to the HEAD block.
func (c *walletsController) MostChangedWalletAddress(
w http.ResponseWriter,
r *http.Request,
) (any, error) {
defer r.Body.Close()
var (
query = r.URL.Query()
numBlocks = defaultNumBlocks
err error
)
if v, ok := query["blocks"]; ok && len(v) > 0 {
numBlocks, err = strconv.Atoi(v[0])
if err != nil {
return nil, fmt.Errorf(
"error invalid block param value. %w",
errors.Join(err, ErrorBadQueryParams),
)
}
if numBlocks > maxNumBlocks {
numBlocks = maxNumBlocks
}
if numBlocks <= 0 {
numBlocks = defaultNumBlocks
}
}
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()
walletAddress, err := c.usecase.MostChangedAddress(ctx, numBlocks)
if err != nil {
return nil, fmt.Errorf("error fetch the most changed wallet. %w", err)
}
return MostChangedWalletAddressResponse{
Address: walletAddress,
}, nil
}
// walletsController interface implementation
type walletsController struct {
log *slog.Logger
usecase usecase.EthInteractor
}
// NewWalletsController return a new WalletsController instance
func NewWalletsController(
log *slog.Logger,
usecase usecase.EthInteractor,
) WalletsController {
return &walletsController{
log: log,
usecase: usecase,
}
}

View File

@ -0,0 +1,62 @@
package entities
import (
"encoding/json"
"io"
"os"
"testing"
)
func TestBlockUnmarshal(t *testing.T) {
for _, tc := range tests {
t.Run(tc.Title, func(t *testing.T) {
f, err := os.Open(tc.Path)
if err != nil {
t.Fatal(err)
}
data, err := io.ReadAll(f)
if err != nil {
t.Fatal(err)
}
block := new(Block)
err = json.Unmarshal(data, block)
if tc.MustFail && err == nil {
t.Fatal("unmarshall must fail")
}
if !tc.MustFail && err != nil {
t.Fatalf("unmarshall must pass but it is failed. %s\n", err.Error())
}
})
}
}
type TestCase struct {
Title string
Path string
MustFail bool
}
var tests = []TestCase{
{
Title: "Valid block json object",
Path: "./test_data/test.block.valid.json",
},
{
Title: "Valid block json object. No TXs",
Path: "./test_data/test.block.valid.notx.json",
},
{
Title: "Valid block json object empty",
Path: "./test_data/test.block.valid.empty.json",
},
{
Title: "Valid block json object",
Path: "./test_data/test.block.invalid.corrupted.json",
MustFail: true,
},
}

View File

@ -0,0 +1,195 @@
package entities
import (
"encoding/json"
"fmt"
"math/big"
"sort"
"time"
)
// Wallet represents an on-chain wallet with address in hex format and
// delta of its balance
type Wallet struct {
Address string
Delta *big.Int
}
// []*Wallet type alias
type Wallets []*Wallet
// Sort sorts wallets by delta
func (w Wallets) Sort() {
sort.Slice(w, func(r, l int) bool {
if r := w[r].Delta.Cmp(w[l].Delta); r < 0 {
return true
}
return false
})
}
// BlockNumber is an alias for hex block number
type BlockNumber string
// ToInt converts string hex block number into its big.Int representation
func (n BlockNumber) ToInt() (*big.Int, error) {
return hexToInt((string)(n))
}
// Block object
type Block struct {
Difficulty *big.Int `json:"difficulty"`
BaseFeePerGas *big.Int `json:"baseFeePerGas"`
ExtraData string `json:"extraData"`
GasLimit *big.Int `json:"gasLimit"`
GasUsed *big.Int `json:"gasUsed"`
Hash string `json:"hash"`
LogsBloom string `json:"logsBloom"`
Miner string `json:"miner"`
MixHash string `json:"mixHash"`
Nonce string `json:"nonce"`
Number *big.Int `json:"number"`
ParentHash string `json:"parentHash"`
ReceiptsRoot string `json:"receiptsRoot"`
Sha3Uncles string `json:"sha3Uncles"`
Size *big.Int `json:"size"`
StateRoot string `json:"stateRoot"`
Timestamp time.Time `json:"timestamp"`
TotalDifficulty *big.Int `json:"totalDifficulty"`
Transactions []*Transaction `json:"transactions"`
TransactionsRoot string `json:"transactionsRoot"`
}
// helper alias for proper unmarshal of a block object
type blockAlias Block
// blockRaw is an intermediate object needed for unmarshalling
type blockRaw struct {
*blockAlias
BaseFeePerGas string `json:"baseFeePerGas"`
Difficulty string `json:"difficulty"`
GasLimit string `json:"gasLimit"`
GasUsed string `json:"gasUsed"`
Number string `json:"number"`
Size string `json:"size"`
Timestamp string `json:"timestamp"`
TotalDifficulty string `json:"totalDifficulty"`
}
func (b *Block) UnmarshalJSON(data []byte) error {
raw := &blockRaw{
blockAlias: (*blockAlias)(b),
}
if err := json.Unmarshal(data, raw); err != nil {
return fmt.Errorf("error unmarshal base block data. %w", err)
}
b.Difficulty = hexToIntMust(raw.Difficulty)
b.TotalDifficulty = hexToIntMust(raw.TotalDifficulty)
b.BaseFeePerGas = hexToIntMust(raw.BaseFeePerGas)
b.GasLimit = hexToIntMust(raw.GasLimit)
b.GasUsed = hexToIntMust(raw.GasUsed)
b.Number = hexToIntMust(raw.Number)
b.Size = hexToIntMust(raw.Size)
b.Timestamp = time.Unix(hexToIntMust(raw.Timestamp).Int64(), 0)
return nil
}
// Transaction object
type Transaction struct {
BlockHash string `json:"blockHash"`
BlockNumber *big.Int `json:"blockNumber"`
From string `json:"from"`
Gas *big.Int `json:"gas"`
GasPrice *big.Int `json:"gasPrice"`
Hash string `json:"hash"`
Input string `json:"input"`
Nonce *big.Int `json:"nonce"`
To string `json:"to"`
TransactionIndex *big.Int `json:"transactionIndex"`
Value *big.Int `json:"value"`
Type *big.Int `json:"type"`
V *big.Int `json:"v"`
R string `json:"r"`
S string `json:"s"`
MaxFeePerGas *big.Int `json:"maxFeePerGas"`
MaxPriorityFeePerGas *big.Int `json:"maxPriorityFeePerGas"`
AccessList []interface{} `json:"accessList"`
ChainID *big.Int `json:"chainId"`
}
// helper alias for proper unmarshal of a transaction object
type transactionAlias Transaction
// transactionRaw is an intermediate object needed for unmarshalling
type transactionRaw struct {
*transactionAlias
BlockNumber string `json:"blockNumber"`
Gas string `json:"gas"`
GasPrice string `json:"gasPrice"`
Nonce string `json:"nonce"`
TransactionIndex string `json:"transactionIndex"`
Value string `json:"value"`
Type string `json:"type"`
V string `json:"v"`
MaxFeePerGas string `json:"maxFeePerGas"`
MaxPriorityFeePerGas string `json:"maxPriorityFeePerGas"`
ChainID string `json:"chainId"`
}
func (t *Transaction) UnmarshalJSON(data []byte) error {
txRaw := &transactionRaw{
transactionAlias: (*transactionAlias)(t),
}
if err := json.Unmarshal(data, txRaw); err != nil {
return fmt.Errorf("error unmarshal base tx data. %w", err)
}
t.BlockNumber = hexToIntMust(txRaw.BlockNumber)
t.Gas = hexToIntMust(txRaw.Gas)
t.GasPrice = hexToIntMust(txRaw.GasPrice)
t.Nonce = hexToIntMust(txRaw.Nonce)
t.TransactionIndex = hexToIntMust(txRaw.TransactionIndex)
t.Value = hexToIntMust(txRaw.Value)
t.Type = hexToIntMust(txRaw.Type)
t.V = hexToIntMust(txRaw.V)
t.MaxFeePerGas = hexToIntMust(txRaw.MaxFeePerGas)
t.MaxPriorityFeePerGas = hexToIntMust(txRaw.MaxPriorityFeePerGas)
t.ChainID = hexToIntMust(txRaw.ChainID)
return nil
}
// hexToIntMust is a shortcur for
//
// i, err := hexToInt(s)
// if err != nil {
// panic(err)
// }
func hexToIntMust(s string) *big.Int {
i, err := hexToInt(s)
if err != nil {
panic(err)
}
return i
}
// hexToInt converts string hex value into a big.Int
func hexToInt(s string) (*big.Int, error) {
bi := new(big.Int)
if len(s) < 2 {
return bi, nil
}
if _, ok := bi.SetString(s[2:], 16); !ok {
return nil, fmt.Errorf("error invalid hex number")
}
return bi, nil
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,3 @@
{
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,128 @@
{
"jsonrpc": "2.0",
"id": "getblock.io",
"result": {
"baseFeePerGas": "0x28d33f2a7",
"blobGasUsed": "0x0",
"difficulty": "0x0",
"excessBlobGas": "0x0",
"extraData": "0x546974616e2028746974616e6275696c6465722e78797a29",
"gasLimit": "0x1c9c380",
"gasUsed": "0x13551e9",
"hash": "0xc1334f706ba8da002e89c7d20ca101b5d6352583ed0c9e1013dad0e608c33d1e",
"logsBloom": "0x1ff372736963619618138677a211518d01da91e08dfc5c54e34b110ab68a0556ca7b65e95bc83fbb52b7fa86d3ee0b996e35ea15ec0d78c78445da4a21baf0a16a388db9738c9e38fc8e677b06c5aeef758f68a5fd555cb087a3c77ba9e2c2a17b03860e42f3a4fb842efc51e00469c3d64a17e70a482622db4bcb3d88ab2c338be7bbf008ff792ee44a9b5285668a225d95e163f90d1aef652b465bf5f558aa46fe9560d0a364c36ef053d28ce58fce8e4ec18101e31c2965f3de9f149c2d658bd3a2ba92fab1aca068d9277ceff2e42ae9a18d5529a83e5c82672e42f92dba69fc386d81509433eddc1e88a132d2848ea6d9806b1398c91f8f1b7ff1e07715",
"miner": "0x4838b106fce9647bdf1e7877bf73ce8b0bad5f97",
"mixHash": "0x41cb66c1708cabe4a48f143a8ecd070958ff4867f44cd885bfb38a3b85fa639b",
"nonce": "0x0000000000000000",
"number": "0x1310285",
"parentBeaconBlockRoot": "0x91fbb3bc39d20f81a26d961ea671539eef9e9f18451259b653b9bd75696dfd0a",
"parentHash": "0x454efc53478fb34e769513dfbbf08efa28d0abd6a38ccf8ac72347cef2333063",
"receiptsRoot": "0xa32e503a807d2fdb0db2c07bd3678a6c37c7216f614924242d49a4a53ad5e8a7",
"sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"size": "0x2a291",
"stateRoot": "0x6377fd23f7d4a95b5a16a50174166346e08d290f569c57bf5c9b2eb49629e65b",
"timestamp": "0x6659a1d3",
"totalDifficulty": "0xc70d815d562d3cfa955",
"transactionsRoot": "0x08af60cab8e2b79c64a64fe87cfc7733cb8558496e53540d3d9641678b3ff4e4",
"uncles": [],
"withdrawals": [
{
"index": "0x2d13b3b",
"validatorIndex": "0xba04",
"address": "0x680e6cebc672f310123696b93be888e4dd2745c5",
"amount": "0x1117858"
},
{
"index": "0x2d13b3c",
"validatorIndex": "0xba3b",
"address": "0xd7f713ae54c7929ead7fb152d54cdedd0843d4ea",
"amount": "0x10fd349"
},
{
"index": "0x2d13b3d",
"validatorIndex": "0xba3c",
"address": "0xd7f713ae54c7929ead7fb152d54cdedd0843d4ea",
"amount": "0x110eaa3"
},
{
"index": "0x2d13b3e",
"validatorIndex": "0xba3d",
"address": "0x4a15242fb84a5d6ab0114c932d8bb4e16e1b5eec",
"amount": "0x10fa80e"
},
{
"index": "0x2d13b3f",
"validatorIndex": "0xba40",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x11d4bd5"
},
{
"index": "0x2d13b40",
"validatorIndex": "0xba41",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x11e1ea0"
},
{
"index": "0x2d13b41",
"validatorIndex": "0xba42",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x11da792"
},
{
"index": "0x2d13b42",
"validatorIndex": "0xba43",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x11db356"
},
{
"index": "0x2d13b43",
"validatorIndex": "0xba5a",
"address": "0xdec1000f750e2b4af55dc519448dab87c0ecb503",
"amount": "0x11ca8d9"
},
{
"index": "0x2d13b44",
"validatorIndex": "0xba60",
"address": "0xebb47f1a29e8e330b6a1be26024525e6cf0764e2",
"amount": "0xf4aaa3"
},
{
"index": "0x2d13b45",
"validatorIndex": "0xba73",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x3c70d85"
},
{
"index": "0x2d13b46",
"validatorIndex": "0xba74",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x11db970"
},
{
"index": "0x2d13b47",
"validatorIndex": "0xba75",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x11d3131"
},
{
"index": "0x2d13b48",
"validatorIndex": "0xba76",
"address": "0xdd216d8ddabb2e4719410821cf32ce8556bc9eb1",
"amount": "0x11dfac8"
},
{
"index": "0x2d13b49",
"validatorIndex": "0xba78",
"address": "0xcbcdca647cfda9283992193604f8718a910b42fc",
"amount": "0xfb0f5e"
},
{
"index": "0x2d13b4a",
"validatorIndex": "0xba83",
"address": "0x9f4b4ceca7ace96834bd2fcc961c772de7cb481a",
"amount": "0x11d0bef"
}
],
"withdrawalsRoot": "0xce2b85a9b87375c244b42548a9f5b6679b41e68d90162df27fa4a3269c92b86c"
}
}

View File

@ -0,0 +1,83 @@
package getblock
import (
"context"
"fmt"
"log/slog"
"github.com/optclblast/blk/internal/entities"
"github.com/ybbus/jsonrpc/v3"
)
// B ase getblock API url
const baseURL = "https://go.getblock.io/"
// JSON rpc client
type Client struct {
log *slog.Logger
cc jsonrpc.RPCClient
}
// NewClient returns a new GetBlock JSON rpc client
func NewClient(
log *slog.Logger,
accessToken string,
) *Client {
return &Client{
log: log,
cc: jsonrpc.NewClient(baseURL + accessToken),
}
}
// LastBlockNumber returns a last block number
func (c *Client) LastBlockNumber(ctx context.Context) (entities.BlockNumber, error) {
const method = "eth_blockNumber"
res, err := c.cc.Call(ctx, method)
if err != nil {
if _, ok := err.(*jsonrpc.HTTPError); ok {
return "", ErrorRateLimitExceeded
}
return "", fmt.Errorf("error fetch last block number. %w", err)
} else if res.Error != nil {
return "", fmt.Errorf("error fetch last block number. %w", res.Error)
}
response, err := res.GetString()
if err != nil {
return "", fmt.Errorf("error parse response. %w", err)
}
c.log.Debug(
"last block number",
slog.String("method", method),
slog.String("resp", response),
)
return entities.BlockNumber(response), nil
}
// BlockInfoByNumber returns an info about block by its number
func (c *Client) BlockInfoByNumber(ctx context.Context, num entities.BlockNumber) (*entities.Block, error) {
const method = "eth_getBlockByNumber"
res, err := c.cc.Call(ctx, method, num, true)
if err != nil {
if _, ok := err.(*jsonrpc.HTTPError); ok {
return nil, ErrorRateLimitExceeded
}
return nil, fmt.Errorf("error fetch block info. %w", err)
} else if res.Error != nil {
return nil, fmt.Errorf("error fetch block info. %w", res.Error)
}
out := new(entities.Block)
if err := res.GetObject(out); err != nil {
return nil, fmt.Errorf("error marshal response body into block object. %w", err)
}
return out, nil
}

View File

@ -0,0 +1,9 @@
package getblock
import "errors"
var (
// ErrorRateLimitExceeded is thrown when
// the number of requests has exceeded the allowed limit
ErrorRateLimitExceeded = errors.New("api rate limit exceeded")
)

71
internal/logger/logger.go Normal file
View File

@ -0,0 +1,71 @@
package logger
import (
"io"
"log/slog"
"os"
)
// Builder object
type LoggerBuilder struct {
lvl slog.Level
writers []io.Writer
}
// NewBuilder return a new logger builder object
func NewBuilder() *LoggerBuilder {
return new(LoggerBuilder)
}
// WithWriter sets a specific writer
func (b *LoggerBuilder) WithWriter(w io.Writer) *LoggerBuilder {
b.writers = append(b.writers, w)
return b
}
// WithLevel sets log level
func (b *LoggerBuilder) WithLevel(l slog.Level) *LoggerBuilder {
b.lvl = l
return b
}
// Build returns the logger
func (b *LoggerBuilder) Build() *slog.Logger {
if len(b.writers) == 0 {
b.writers = append(b.writers, os.Stdout)
}
w := io.MultiWriter(b.writers...)
return newLogger(b.lvl, w)
}
func newLogger(lvl slog.Level, w io.Writer) *slog.Logger {
return slog.New(
slog.NewJSONHandler(w, &slog.HandlerOptions{Level: lvl}),
)
}
// Error logging attribute
func Err(err error) slog.Attr {
return slog.Attr{
Key: "error",
Value: slog.StringValue(err.Error()),
}
}
// Maps level from a string. By default returns slog.LevelInfo
func MapLevel(lvl string) slog.Level {
switch lvl {
case "dev", "local", "debug":
return slog.LevelDebug
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}

102
internal/server/http.go Normal file
View File

@ -0,0 +1,102 @@
// server package contains core network layer components
package server
import (
"context"
"net"
"net/http"
"time"
)
const (
defaultReadTimeout = 10 * time.Second
defaultWriteTimeout = 10 * time.Second
defaultShutdownTimeout = 5 * time.Second
)
// Http Server object
type Server struct {
server *http.Server
notify chan error
shutdownTimeout time.Duration
}
func (s *Server) start() {
go func() {
s.notify <- s.server.ListenAndServe()
close(s.notify)
}()
}
// Notify return an error channel, that will contain error if server died
func (s *Server) Notify() <-chan error {
return s.notify
}
// Graceful shutdown
func (s *Server) Shutdown() error {
ctx, cancel := context.WithTimeout(context.Background(), s.shutdownTimeout)
defer cancel()
return s.server.Shutdown(ctx)
}
// New returns a new Server object
func New(
handler http.Handler,
addr string,
opts ...Option,
) *Server {
httpServer := &http.Server{
Handler: handler,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
Addr: addr,
}
s := &Server{
server: httpServer,
notify: make(chan error, 1),
shutdownTimeout: defaultShutdownTimeout,
}
// Apply options
for _, opt := range opts {
opt(s)
}
s.start()
return s
}
// Options configures Server
type Option func(s *Server)
// Port sets a specific port for Server to listen to
func Port(port string) Option {
return func(s *Server) {
s.server.Addr = net.JoinHostPort("", port)
}
}
// ReadTimeout sets a specific connection read timeout
func ReadTimeout(timeout time.Duration) Option {
return func(s *Server) {
s.server.ReadTimeout = timeout
}
}
// WriteTimeout sets a specific connection write timeout
func WriteTimeout(timeout time.Duration) Option {
return func(s *Server) {
s.server.WriteTimeout = timeout
}
}
// ShutdownTimeout sets a specific connection shutdown timeout
func ShutdownTimeout(timeout time.Duration) Option {
return func(s *Server) {
s.shutdownTimeout = timeout
}
}

View File

@ -0,0 +1,176 @@
package usecase
import (
"context"
"log/slog"
"math/big"
"math/rand"
"slices"
"testing"
"github.com/optclblast/blk/internal/entities"
)
func TestAddressWithBiggestDelta(t *testing.T) {
ethInteractor := &ethInteractor{log: slog.Default()}
for _, tc := range tests {
t.Run(tc.Title, func(t *testing.T) {
txCh := make(chan *entities.Transaction, len(tc.Block.Transactions))
for _, txs := range tc.Block.Transactions {
txCh <- txs
}
close(txCh)
walletAddr, err := ethInteractor.addressWithBiggestDelta(context.TODO(), txCh)
if err != nil {
t.Fatalf("error: %s\n", err.Error())
}
if !slices.Contains(tc.ExpectedResult, walletAddr) {
t.Fatalf("invalid result: %s | Expected: %v", walletAddr, tc.ExpectedResult)
}
})
}
}
func BenchmarkAddressWithBiggestDelta(b *testing.B) {
ethInteractor := &ethInteractor{log: slog.Default()}
txs := make([]*entities.Transaction, 20000)
for i := 0; i < 20000; i++ {
txs[i] = &entities.Transaction{
Value: big.NewInt(rand.Int63()),
From: RandStringRunes(rand.Intn(1000)),
To: RandStringRunes(rand.Intn(1000)),
}
}
txCh := make(chan *entities.Transaction, len(txs))
for _, txs := range txs {
txCh <- txs
}
close(txCh)
_, err := ethInteractor.addressWithBiggestDelta(context.TODO(), txCh)
if err != nil {
b.Fatalf("error: %s\n", err.Error())
}
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
type TestCase struct {
Title string
Block *entities.Block
ExpectedResult []string
}
var tests []TestCase = []TestCase{
{
Title: "1 block, 3 wallets, 4 txs. Positive is the highest",
Block: &entities.Block{
Transactions: []*entities.Transaction{
{
From: "A",
To: "B", // + 1000
Value: big.NewInt(1000),
},
{
From: "F", // -90
To: "A", // -10
Value: big.NewInt(1000),
},
{
From: "A",
To: "B", // +10
Value: big.NewInt(10),
},
{
From: "A",
To: "B", // +15
Value: big.NewInt(15),
},
},
},
ExpectedResult: []string{"B"},
},
{
Title: "1 block, 0 wallets, 0 txs",
Block: &entities.Block{
Transactions: []*entities.Transaction{},
},
ExpectedResult: []string{""},
},
{
Title: "1 block, 3 wallets, 3 txs. Txs are negative",
Block: &entities.Block{
Transactions: []*entities.Transaction{
{
From: "A",
To: "B",
Value: big.NewInt(-10),
},
{
From: "A",
To: "B",
Value: big.NewInt(-1000),
},
{
From: "F",
To: "A",
Value: big.NewInt(-1000),
},
{
From: "A",
To: "B",
Value: big.NewInt(-15),
},
},
},
ExpectedResult: []string{"B"},
},
{
Title: "1 block, 2 wallets, 3 txs. Both are equal",
Block: &entities.Block{
Transactions: []*entities.Transaction{
{
From: "A",
To: "B",
Value: big.NewInt(-10),
},
{
From: "A",
To: "B",
Value: big.NewInt(50),
},
{
From: "B",
To: "A",
Value: big.NewInt(10),
},
{
From: "A",
To: "B",
Value: big.NewInt(-15),
},
},
},
ExpectedResult: []string{"B", "A"},
},
}

262
internal/usecase/eth.go Normal file
View File

@ -0,0 +1,262 @@
package usecase
import (
"context"
"fmt"
"log/slog"
"math/big"
"runtime"
"sync"
"github.com/alitto/pond"
"github.com/optclblast/blk/internal/entities"
"github.com/optclblast/blk/internal/logger"
cmap "github.com/orcaman/concurrent-map/v2"
)
// EthInteractor is core component of the system.
// Here all the data processing magic happens
type EthInteractor interface {
// MostChangedWalletAddress returns the address of the wallet whose balance
// delta was the highest among other wallets participating in transactions
// from numBlocks blocks to the HEAD block.
MostChangedAddress(ctx context.Context, numBlocks int) (string, error)
}
// ethInteractor is an EthInteractor implementation
type ethInteractor struct {
log *slog.Logger
client NodeClient
}
// NewEthInteractor return new NewEthInteractor instance
func NewEthInteractor(
log *slog.Logger,
client NodeClient,
) EthInteractor {
return &ethInteractor{
log: log,
client: client,
}
}
// Standard number of workers in all kind of pools
var defaultWorkersNum = runtime.GOMAXPROCS(0) * 2
func (t *ethInteractor) MostChangedAddress(
ctx context.Context,
numBlocks int,
) (string, error) {
// We need to fetch current head block
head, err := t.client.LastBlockNumber(ctx)
if err != nil {
return "", fmt.Errorf("error fetch last block number. %w", err)
}
t.log.Debug(
"most_changed_address",
slog.String("head block number", (string)(head)),
slog.Int("num blocks parameter", numBlocks),
)
headBlockNumber, err := head.ToInt()
if err != nil {
return "", fmt.Errorf("error map last block number to numeric. %w", err)
}
txChan := make(chan *entities.Transaction, defaultWorkersNum)
// Begin a transactions data stream
t.streamTransactions(ctx, headBlockNumber, numBlocks, txChan)
// Handle transactions stream and calculate the result
walletAddress, err := t.addressWithBiggestDelta(ctx, txChan)
if err != nil {
return "", fmt.Errorf("error fetch wallets. %w", err)
}
return walletAddress, nil
}
func (t *ethInteractor) addressWithBiggestDelta(
ctx context.Context,
txChan chan *entities.Transaction,
) (string, error) {
outChan := make(chan string, 1)
go func() {
defer func() {
if panic := recover(); panic != nil {
t.log.Error("addressWithBiggestDelta", slog.Any("panic", panic))
return
}
}()
// map [Wallet address => Delta]
addresses := cmap.New[*big.Int]()
var wg sync.WaitGroup
// Fill the map with address / delta pairs
for i := 0; i < defaultWorkersNum; i++ {
// Run a writer worker
wg.Add(1)
go func() {
defer wg.Done()
t.appendAddressDeltaWorker(&addresses, txChan)
}()
}
wg.Wait()
outChan <- biggestDeltaAddres(addresses.Items())
}()
select {
case out := <-outChan:
return out, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func (t *ethInteractor) appendAddressDeltaWorker(
cmp *cmap.ConcurrentMap[string, *big.Int],
txsChan <-chan *entities.Transaction,
) {
defer func() {
if panic := recover(); panic != nil {
t.log.Error("appendAddressDeltaWorker", slog.Any("panic", panic))
return
}
}()
for t := range txsChan {
deltaFrom, ok := cmp.Get(t.From)
if !ok {
deltaFrom = new(big.Int)
}
cmp.Set(t.From, deltaFrom.Sub(deltaFrom, t.Value))
deltaTo, ok := cmp.Get(t.To)
if !ok {
deltaTo = new(big.Int)
}
cmp.Set(t.To, deltaTo.Add(deltaTo, t.Value))
}
}
// Returns an address of a wallet with balances mod|delta| is the highest
func biggestDeltaAddres(
set map[string]*big.Int,
) string {
if len(set) == 0 {
return ""
}
i := 0
// Build wallets array
wallets := make(entities.Wallets, len(set))
for addr, dlt := range set {
wallets[i] = &entities.Wallet{
Address: addr,
Delta: dlt.Abs(dlt),
}
i++
}
// Sort
wallets.Sort()
// Return an address with highest delta
return wallets[len(wallets)-1].Address
}
const fetchWorkersPoolSize = 4
// streamTransactions fetches blocks from getblock node API and
// dispatches related transaction into a dedicated channel for
// other workers to process.
// The channels used by streamTransactions will be closed
// internally.
func (t *ethInteractor) streamTransactions(
ctx context.Context,
headBlock *big.Int,
numBlocks int,
txChan chan<- *entities.Transaction,
) {
blockToFetch := new(big.Int).Set(headBlock)
blocksChan := make(chan *entities.Block, numBlocks)
fetchPool := pond.New(fetchWorkersPoolSize, numBlocks)
var fetchWg sync.WaitGroup
for i := 0; i < numBlocks; i++ {
blockNumber := entities.BlockNumber("0x" + blockToFetch.Text(16))
fetchWg.Add(1)
fetchPool.Submit(func() {
defer fetchWg.Done()
block, err := t.client.BlockInfoByNumber(
ctx,
blockNumber,
)
if err != nil {
t.log.Error(
"error fetch block info",
logger.Err(err),
slog.Any("block number", blockNumber),
)
return
}
blocksChan <- block
})
blockToFetch.Sub(blockToFetch, big.NewInt(1))
}
go func() {
fetchWg.Wait()
close(blocksChan)
}()
processPool := pond.New(defaultWorkersNum, numBlocks)
var processWg sync.WaitGroup
go func() {
dispatchBlockTransactions(&processWg, processPool, blocksChan, txChan)
processWg.Wait()
close(txChan)
}()
}
// Dispatches blocks from blocksChan into txsChan
func dispatchBlockTransactions(
wg *sync.WaitGroup,
pool *pond.WorkerPool,
blocksChan <-chan *entities.Block,
txsChan chan<- *entities.Transaction,
) {
for b := range blocksChan {
wg.Add(1)
pool.Submit(func() {
defer wg.Done()
for _, tx := range b.Transactions {
txsChan <- tx
}
})
}
}

View File

@ -0,0 +1,19 @@
package usecase
import (
"context"
"github.com/optclblast/blk/internal/entities"
)
// NodeClient is an node provider client presentation interface
type NodeClient interface {
// LastBlockNumber return last block number.
LastBlockNumber(ctx context.Context) (entities.BlockNumber, error)
// BlockInfoByNumber accepts block number and returns all information, including
// transactions, related to that block.
// BlockInfoByNumber may return ErrorRateLimitExceeded and you may want to wrap it into
// backoff
BlockInfoByNumber(ctx context.Context, num entities.BlockNumber) (*entities.Block, error)
}