diff --git a/cmd/p2p/sensor/api.go b/cmd/p2p/sensor/api.go index c9802742..696c83de 100644 --- a/cmd/p2p/sensor/api.go +++ b/cmd/p2p/sensor/api.go @@ -18,6 +18,8 @@ import ( type peerData struct { Name string `json:"name"` ProtocolVersion uint `json:"protocol_version"` + BlockHash string `json:"block_hash,omitempty"` + BlockNumber uint64 `json:"block_number,omitempty"` Received p2p.MessageCount `json:"received"` Sent p2p.MessageCount `json:"sent"` PacketsReceived p2p.MessageCount `json:"packets_received"` @@ -85,9 +87,18 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) { continue } + // Get latest block info for this peer + blockHash, blockNumber := conns.GetPeerLatestBlock(peerID) + var blockHashStr string + if blockNumber > 0 { + blockHashStr = blockHash.Hex() + } + peers[url] = peerData{ Name: conns.GetPeerName(peerID), ProtocolVersion: conns.GetPeerVersion(peerID), + BlockHash: blockHashStr, + BlockNumber: blockNumber, Received: messages.Received, Sent: messages.Sent, PacketsReceived: messages.PacketsReceived, diff --git a/cmd/p2p/sensor/rpc.go b/cmd/p2p/sensor/rpc.go index 1263dea1..e336de8b 100644 --- a/cmd/p2p/sensor/rpc.go +++ b/cmd/p2p/sensor/rpc.go @@ -1,16 +1,18 @@ package sensor import ( + "bytes" "encoding/json" "fmt" "io" "math/big" "net/http" - "strings" "github.com/0xPolygon/polygon-cli/p2p" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/rs/zerolog/log" ) @@ -37,13 +39,35 @@ type rpcError struct { Data any `json:"data,omitempty"` } +// rpcProxyConfig holds configuration for proxying RPC requests to an upstream server. +type rpcProxyConfig struct { + rpcURL string + httpClient *http.Client +} + // handleRPC sets up the JSON-RPC server for receiving and broadcasting transactions. // It handles eth_sendRawTransaction requests, validates transaction signatures, // and broadcasts valid transactions to all connected peers. // Supports both single requests and batch requests per JSON-RPC 2.0 specification. +// If proxyRPC is enabled, unsupported methods are forwarded to the upstream rpcURL. func handleRPC(conns *p2p.Conns, networkID uint64) { // Use network ID as chain ID for signature validation chainID := new(big.Int).SetUint64(networkID) + gpo := p2p.NewGasPriceOracle(conns) + + var proxyConfig *rpcProxyConfig + if inputSensorParams.ProxyRPC { + proxyConfig = &rpcProxyConfig{ + rpcURL: inputSensorParams.RPC, + httpClient: &http.Client{ + Timeout: inputSensorParams.ProxyRPCTimeout, + }, + } + log.Info(). + Str("rpc", inputSensorParams.RPC). + Dur("timeout", inputSensorParams.ProxyRPCTimeout). + Msg("RPC proxy enabled") + } mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -57,17 +81,12 @@ func handleRPC(conns *p2p.Conns, networkID uint64) { writeError(w, -32700, "Parse error", nil) return } - defer func() { - if err := r.Body.Close(); err != nil { - log.Debug().Err(err).Msg("Failed to close request body") - } - }() // Check if this is a batch request (starts with '[') or single request - trimmed := strings.TrimSpace(string(body)) + trimmed := bytes.TrimSpace(body) if len(trimmed) > 0 && trimmed[0] == '[' { // Handle batch request - handleBatchRequest(w, body, conns, chainID) + handleBatchRequest(w, r, body, conns, chainID, gpo, proxyConfig) return } @@ -78,14 +97,28 @@ func handleRPC(conns *p2p.Conns, networkID uint64) { return } - // Handle eth_sendRawTransaction - if req.Method == "eth_sendRawTransaction" { - handleSendRawTransaction(w, req, conns, chainID) + // Process request + var txs types.Transactions + resp := processRequest(req, conns, chainID, gpo, &txs) + + // If method not found and proxy is enabled, forward to upstream + if isMethodNotFound(resp) && proxyConfig != nil { + proxyRPCRequest(w, r, body, proxyConfig) return } - // Method not found - writeError(w, -32601, "Method not found", req.ID) + // Broadcast any transactions + if len(txs) > 0 { + log.Info().Str("hash", txs[0].Hash().Hex()).Msg("Broadcasting transaction") + count := conns.BroadcastTxs(txs) + log.Info().Str("hash", txs[0].Hash().Hex()).Int("peers", count).Msg("Transaction broadcast complete") + } + + // Write response + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(&resp); err != nil { + log.Error().Err(err).Msg("Failed to encode response") + } }) addr := fmt.Sprintf(":%d", inputSensorParams.RPCPort) @@ -124,10 +157,49 @@ func writeResult(w http.ResponseWriter, result any, id any) { } } -// handleBatchRequest processes JSON-RPC 2.0 batch requests, validates all transactions, -// and broadcasts valid transactions to connected peers. Returns a batch response with -// results or errors for each request in the batch. -func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, chainID *big.Int) { +// proxyRPCRequest forwards a JSON-RPC request to the upstream RPC server and streams +// the response back to the client. Used for methods not handled locally. +func proxyRPCRequest(w http.ResponseWriter, r *http.Request, body []byte, config *rpcProxyConfig) { + req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, config.rpcURL, bytes.NewReader(body)) + if err != nil { + log.Error().Err(err).Msg("Failed to create proxy request") + writeError(w, -32603, "Internal error: failed to create proxy request", nil) + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := config.httpClient.Do(req) + if err != nil { + log.Error().Err(err).Str("rpc", config.rpcURL).Msg("Proxy request failed") + if r.Context().Err() != nil { + writeError(w, -32603, "Request cancelled or timed out", nil) + return + } + writeError(w, -32603, fmt.Sprintf("Upstream RPC error: %v", err), nil) + return + } + defer resp.Body.Close() + + // Copy response headers and status + for key, values := range resp.Header { + for _, value := range values { + w.Header().Add(key, value) + } + } + w.WriteHeader(resp.StatusCode) + + // Stream response body + if _, err := io.Copy(w, resp.Body); err != nil { + log.Error().Err(err).Msg("Failed to copy proxy response") + } +} + +// handleBatchRequest processes JSON-RPC 2.0 batch requests. +// For eth_sendRawTransaction requests, it collects valid transactions for batch broadcasting. +// Returns a batch response with results or errors for each request. +// If proxyConfig is provided and the batch contains unsupported methods, +// the entire batch is forwarded to the upstream RPC server for simplicity. +func handleBatchRequest(w http.ResponseWriter, r *http.Request, body []byte, conns *p2p.Conns, chainID *big.Int, gpo *p2p.GasPriceOracle, proxyConfig *rpcProxyConfig) { // Parse batch of requests var requests []rpcRequest if err := json.Unmarshal(body, &requests); err != nil { @@ -141,35 +213,20 @@ func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, ch return } - // Process all requests and collect valid transactions for batch broadcasting + // Process all requests responses := make([]rpcResponse, 0, len(requests)) txs := make(types.Transactions, 0) for _, req := range requests { - if req.Method != "eth_sendRawTransaction" { - responses = append(responses, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32601, - Message: "Method not found", - }, - ID: req.ID, - }) - continue - } + resp := processRequest(req, conns, chainID, gpo, &txs) - tx, response := validateTx(req, chainID) - if tx == nil { - responses = append(responses, response) - continue + // If any method not found and proxy is enabled, forward entire batch + if isMethodNotFound(resp) && proxyConfig != nil { + proxyRPCRequest(w, r, body, proxyConfig) + return } - txs = append(txs, tx) - responses = append(responses, rpcResponse{ - JSONRPC: "2.0", - Result: tx.Hash().Hex(), - ID: req.ID, - }) + responses = append(responses, resp) } // Broadcast all valid transactions in a single batch if there are any @@ -194,76 +251,137 @@ func handleBatchRequest(w http.ResponseWriter, body []byte, conns *p2p.Conns, ch } } +// newResultResponse creates a success response. +func newResultResponse(result, id any) rpcResponse { + return rpcResponse{JSONRPC: "2.0", Result: result, ID: id} +} + +// newErrorResponse creates an error response. +func newErrorResponse(err *rpcError, id any) rpcResponse { + return rpcResponse{JSONRPC: "2.0", Error: err, ID: id} +} + +const rpcMethodNotFoundCode = -32601 + +// newMethodNotFoundResponse creates a method not found error response. +func newMethodNotFoundResponse(id any) rpcResponse { + return rpcResponse{ + JSONRPC: "2.0", + Error: &rpcError{Code: rpcMethodNotFoundCode, Message: "Method not found"}, + ID: id, + } +} + +// isMethodNotFound returns true if the response is a method not found error. +func isMethodNotFound(resp rpcResponse) bool { + return resp.Error != nil && resp.Error.Code == rpcMethodNotFoundCode +} + +// processRequest handles a single RPC request and returns a response. +// For eth_sendRawTransaction, valid transactions are appended to txs for batch broadcasting. +// Returns a method not found response if the method is not handled locally. +func processRequest(req rpcRequest, conns *p2p.Conns, chainID *big.Int, gpo *p2p.GasPriceOracle, txs *types.Transactions) rpcResponse { + switch req.Method { + case "eth_sendRawTransaction": + tx, errResp := validateTx(req, chainID) + if tx == nil { + return errResp + } + if txs != nil { + *txs = append(*txs, tx) + } + return newResultResponse(tx.Hash().Hex(), req.ID) + + case "eth_chainId": + return newResultResponse(hexutil.EncodeBig(chainID), req.ID) + + case "eth_blockNumber": + head := conns.HeadBlock() + if head.Block == nil { + return newResultResponse(nil, req.ID) + } + return newResultResponse(hexutil.EncodeUint64(head.Block.NumberU64()), req.ID) + + case "eth_gasPrice": + return newResultResponse(hexutil.EncodeBig(gpo.SuggestGasPrice()), req.ID) + + case "eth_maxPriorityFeePerGas": + tip := gpo.SuggestGasTipCap() + if tip == nil { + tip = big.NewInt(1e9) // Default to 1 gwei + } + return newResultResponse(hexutil.EncodeBig(tip), req.ID) + + case "eth_getBlockByHash": + result, err := getBlockByHash(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getBlockByNumber": + result, err := getBlockByNumber(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getTransactionByHash": + result, err := getTransactionByHash(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getTransactionByBlockHashAndIndex": + result, err := getTransactionByBlockHashAndIndex(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getBlockTransactionCountByHash": + result, err := getBlockTransactionCountByHash(req, conns) + return handleMethodResult(result, err, req.ID) + + case "eth_getUncleCountByBlockHash": + result, err := getUncleCountByBlockHash(req, conns) + return handleMethodResult(result, err, req.ID) + + default: + return newMethodNotFoundResponse(req.ID) + } +} + +// handleMethodResult converts a method's result and error into an rpcResponse. +func handleMethodResult(result any, err *rpcError, id any) rpcResponse { + if err != nil { + return newErrorResponse(err, id) + } + return newResultResponse(result, id) +} + // validateTx validates a transaction from a JSON-RPC request by decoding the raw // transaction hex, unmarshaling it, and verifying the signature. Returns the transaction if valid // (with an empty response), or nil transaction with an error response if validation fails. func validateTx(req rpcRequest, chainID *big.Int) (*types.Transaction, rpcResponse) { - // Check params + invalidParams := func(msg string) rpcResponse { + return newErrorResponse(&rpcError{Code: -32602, Message: msg}, req.ID) + } + if len(req.Params) == 0 { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: "Invalid params: missing raw transaction", - }, - ID: req.ID, - } + return nil, invalidParams("Invalid params: missing raw transaction") } - // Extract raw transaction hex string hex, ok := req.Params[0].(string) if !ok { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: "Invalid params: raw transaction must be a hex string", - }, - ID: req.ID, - } + return nil, invalidParams("Invalid params: raw transaction must be a hex string") } - // Decode hex string to bytes bytes, err := hexutil.Decode(hex) if err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction hex: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction hex: %v", err)) } - // Unmarshal transaction tx := new(types.Transaction) if err = tx.UnmarshalBinary(bytes); err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction encoding: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction encoding: %v", err)) } - // Validate transaction signature signer := types.LatestSignerForChainID(chainID) sender, err := types.Sender(signer, tx) if err != nil { - return nil, rpcResponse{ - JSONRPC: "2.0", - Error: &rpcError{ - Code: -32602, - Message: fmt.Sprintf("Invalid transaction signature: %v", err), - }, - ID: req.ID, - } + return nil, invalidParams(fmt.Sprintf("Invalid transaction signature: %v", err)) } - // Log the transaction to := "nil" if tx.To() != nil { to = tx.To().Hex() @@ -280,26 +398,359 @@ func validateTx(req rpcRequest, chainID *big.Int) (*types.Transaction, rpcRespon return tx, rpcResponse{} } -// handleSendRawTransaction processes eth_sendRawTransaction requests, validates the -// transaction, broadcasts it to all connected peers, and writes the transaction hash -// as a JSON-RPC response. -func handleSendRawTransaction(w http.ResponseWriter, req rpcRequest, conns *p2p.Conns, chainID *big.Int) { - tx, response := validateTx(req, chainID) - if tx == nil { - writeError(w, response.Error.Code, response.Error.Message, response.ID) - return +// parseFullTxParam extracts the fullTx boolean from params[1], defaulting to false. +func parseFullTxParam(params []any) bool { + if len(params) >= 2 { + if fullTx, ok := params[1].(bool); ok { + return fullTx + } } + return false +} - log.Info(). - Str("hash", tx.Hash().Hex()). - Msg("Broadcasting transaction") +// getBlockByHash retrieves a block by its hash from the cache. +func getBlockByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing block hash parameter"} + } - count := conns.BroadcastTx(tx) + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } - log.Info(). - Str("hash", tx.Hash().Hex()). - Int("peers", count). - Msg("Transaction broadcast complete") + hash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(hash) + if !ok { + return nil, nil // Return null for not found (per spec) + } + + return formatBlockResponse(hash, cache, parseFullTxParam(req.Params)), nil +} + +// getBlockByNumber retrieves a block by its number from the cache. +func getBlockByNumber(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing block number parameter"} + } + + blockNumParam, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block number parameter"} + } + + var hash common.Hash + var cache p2p.BlockCache + var found bool + + switch blockNumParam { + case "latest", "pending": + head := conns.HeadBlock() + if head.Block == nil { + return nil, nil + } + hash = head.Block.Hash() + cache, found = conns.Blocks().Get(hash) + if !found { + // Construct cache from head block + cache = p2p.BlockCache{ + Header: head.Block.Header(), + Body: ð.BlockBody{ + Transactions: head.Block.Transactions(), + Uncles: head.Block.Uncles(), + }, + TD: head.TD, + } + found = true + } + case "earliest": + hash, cache, found = conns.GetBlockByNumber(0) + default: + num, err := hexutil.DecodeUint64(blockNumParam) + if err != nil { + return nil, &rpcError{Code: -32602, Message: "invalid block number: " + err.Error()} + } + hash, cache, found = conns.GetBlockByNumber(num) + } + + if !found { + return nil, nil + } + + return formatBlockResponse(hash, cache, parseFullTxParam(req.Params)), nil +} + +// getTransactionByHash retrieves a transaction by its hash from the cache. +func getTransactionByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 1 { + return nil, &rpcError{Code: -32602, Message: "missing transaction hash parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid transaction hash parameter"} + } + + hash := common.HexToHash(hashStr) + + // First check the transactions cache + tx, ok := conns.GetTx(hash) + if ok { + return formatTransactionResponse(tx, common.Hash{}, nil, 0), nil + } + + // Search in blocks for the transaction + for _, blockHash := range conns.Blocks().Keys() { + cache, ok := conns.Blocks().Peek(blockHash) + if !ok || cache.Body == nil { + continue + } + for i, tx := range cache.Body.Transactions { + if tx.Hash() == hash { + return formatTransactionResponse(tx, blockHash, cache.Header, uint64(i)), nil + } + } + } + + return nil, nil +} + +// getTransactionByBlockHashAndIndex retrieves a transaction by block hash and index. +func getTransactionByBlockHashAndIndex(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + if len(req.Params) < 2 { + return nil, &rpcError{Code: -32602, Message: "missing block hash or index parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } + + indexStr, ok := req.Params[1].(string) + if !ok { + return nil, &rpcError{Code: -32602, Message: "invalid index parameter"} + } + + index, err := hexutil.DecodeUint64(indexStr) + if err != nil { + return nil, &rpcError{Code: -32602, Message: "invalid index: " + err.Error()} + } + + blockHash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(blockHash) + if !ok || cache.Body == nil { + return nil, nil + } + + if int(index) >= len(cache.Body.Transactions) { + return nil, nil + } + + tx := cache.Body.Transactions[index] + return formatTransactionResponse(tx, blockHash, cache.Header, index), nil +} + +// getBlockCacheByHashParam parses a block hash from params[0] and returns the block cache. +// Returns the cache and nil error on success, or nil cache and error on parse failure. +// If the block is not found, returns nil cache with nil error (per JSON-RPC spec). +func getBlockCacheByHashParam(req rpcRequest, conns *p2p.Conns) (p2p.BlockCache, *rpcError) { + if len(req.Params) < 1 { + return p2p.BlockCache{}, &rpcError{Code: -32602, Message: "missing block hash parameter"} + } + + hashStr, ok := req.Params[0].(string) + if !ok { + return p2p.BlockCache{}, &rpcError{Code: -32602, Message: "invalid block hash parameter"} + } + + hash := common.HexToHash(hashStr) + cache, ok := conns.Blocks().Get(hash) + if !ok || cache.Body == nil { + return p2p.BlockCache{}, nil + } + + return cache, nil +} + +// getBlockTransactionCountByHash returns the transaction count in a block. +func getBlockTransactionCountByHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + cache, err := getBlockCacheByHashParam(req, conns) + if err != nil || cache.Body == nil { + return nil, err + } + return hexutil.EncodeUint64(uint64(len(cache.Body.Transactions))), nil +} + +// getUncleCountByBlockHash returns the uncle count in a block. +func getUncleCountByBlockHash(req rpcRequest, conns *p2p.Conns) (any, *rpcError) { + cache, err := getBlockCacheByHashParam(req, conns) + if err != nil || cache.Body == nil { + return nil, err + } + return hexutil.EncodeUint64(uint64(len(cache.Body.Uncles))), nil +} + +// formatBlockResponse formats a block cache into the Ethereum JSON-RPC block format. +func formatBlockResponse(hash common.Hash, cache p2p.BlockCache, fullTx bool) map[string]any { + header := cache.Header + if header == nil { + return nil + } + + result := map[string]any{ + "hash": hash.Hex(), + "number": hexutil.EncodeUint64(header.Number.Uint64()), + "parentHash": header.ParentHash.Hex(), + "nonce": hexutil.EncodeUint64(header.Nonce.Uint64()), + "sha3Uncles": header.UncleHash.Hex(), + "logsBloom": hexutil.Encode(header.Bloom.Bytes()), + "transactionsRoot": header.TxHash.Hex(), + "stateRoot": header.Root.Hex(), + "receiptsRoot": header.ReceiptHash.Hex(), + "miner": header.Coinbase.Hex(), + "difficulty": hexutil.EncodeBig(header.Difficulty), + "extraData": hexutil.Encode(header.Extra), + "gasLimit": hexutil.EncodeUint64(header.GasLimit), + "gasUsed": hexutil.EncodeUint64(header.GasUsed), + "timestamp": hexutil.EncodeUint64(header.Time), + "mixHash": header.MixDigest.Hex(), + } + + if header.BaseFee != nil { + result["baseFeePerGas"] = hexutil.EncodeBig(header.BaseFee) + } + + if header.WithdrawalsHash != nil { + result["withdrawalsRoot"] = header.WithdrawalsHash.Hex() + } + + if header.BlobGasUsed != nil { + result["blobGasUsed"] = hexutil.EncodeUint64(*header.BlobGasUsed) + } + + if header.ExcessBlobGas != nil { + result["excessBlobGas"] = hexutil.EncodeUint64(*header.ExcessBlobGas) + } + + if header.ParentBeaconRoot != nil { + result["parentBeaconBlockRoot"] = header.ParentBeaconRoot.Hex() + } + + // Add total difficulty if available + if cache.TD != nil { + result["totalDifficulty"] = hexutil.EncodeBig(cache.TD) + } + + // Add transactions + if cache.Body != nil && cache.Body.Transactions != nil { + if fullTx { + txs := make([]map[string]any, len(cache.Body.Transactions)) + for i, tx := range cache.Body.Transactions { + txs[i] = formatTransactionResponse(tx, hash, header, uint64(i)) + } + result["transactions"] = txs + } else { + txHashes := make([]string, len(cache.Body.Transactions)) + for i, tx := range cache.Body.Transactions { + txHashes[i] = tx.Hash().Hex() + } + result["transactions"] = txHashes + } + } else { + result["transactions"] = []string{} + } + + // Add uncles + if cache.Body != nil && cache.Body.Uncles != nil { + uncleHashes := make([]string, len(cache.Body.Uncles)) + for i, uncle := range cache.Body.Uncles { + uncleHashes[i] = uncle.Hash().Hex() + } + result["uncles"] = uncleHashes + } else { + result["uncles"] = []string{} + } + + // Add size (approximate based on header + body) + result["size"] = hexutil.EncodeUint64(0) // We don't have exact size; use 0 + + return result +} + +// formatTransactionResponse formats a transaction into the Ethereum JSON-RPC format. +// If blockHash is empty, the transaction is considered pending. +func formatTransactionResponse(tx *types.Transaction, blockHash common.Hash, header *types.Header, index uint64) map[string]any { + v, r, s := tx.RawSignatureValues() + + result := map[string]any{ + "hash": tx.Hash().Hex(), + "nonce": hexutil.EncodeUint64(tx.Nonce()), + "gas": hexutil.EncodeUint64(tx.Gas()), + "value": hexutil.EncodeBig(tx.Value()), + "input": hexutil.Encode(tx.Data()), + "v": hexutil.EncodeBig(v), + "r": hexutil.EncodeBig(r), + "s": hexutil.EncodeBig(s), + "type": hexutil.EncodeUint64(uint64(tx.Type())), + } + + if tx.To() != nil { + result["to"] = tx.To().Hex() + } else { + result["to"] = nil + } + + // Add from address if we can derive it + signer := types.LatestSignerForChainID(tx.ChainId()) + if from, err := types.Sender(signer, tx); err == nil { + result["from"] = from.Hex() + } + + // Set gas price fields based on transaction type + switch tx.Type() { + case types.LegacyTxType, types.AccessListTxType: + result["gasPrice"] = hexutil.EncodeBig(tx.GasPrice()) + case types.DynamicFeeTxType, types.BlobTxType: + result["maxFeePerGas"] = hexutil.EncodeBig(tx.GasFeeCap()) + result["maxPriorityFeePerGas"] = hexutil.EncodeBig(tx.GasTipCap()) + // For EIP-1559 txs, also set gasPrice to effective gas price if in a block + if header != nil && header.BaseFee != nil { + effectiveGasPrice := new(big.Int).Add(header.BaseFee, tx.GasTipCap()) + if effectiveGasPrice.Cmp(tx.GasFeeCap()) > 0 { + effectiveGasPrice = tx.GasFeeCap() + } + result["gasPrice"] = hexutil.EncodeBig(effectiveGasPrice) + } else { + result["gasPrice"] = hexutil.EncodeBig(tx.GasFeeCap()) + } + } + + // Add chain ID if present + if tx.ChainId() != nil { + result["chainId"] = hexutil.EncodeBig(tx.ChainId()) + } + + // Add access list if present + if tx.AccessList() != nil { + result["accessList"] = tx.AccessList() + } + + // Add blob-specific fields + if tx.Type() == types.BlobTxType { + result["maxFeePerBlobGas"] = hexutil.EncodeBig(tx.BlobGasFeeCap()) + result["blobVersionedHashes"] = tx.BlobHashes() + } + + // Add block info if transaction is in a block + if blockHash != (common.Hash{}) && header != nil { + result["blockHash"] = blockHash.Hex() + result["blockNumber"] = hexutil.EncodeUint64(header.Number.Uint64()) + result["transactionIndex"] = hexutil.EncodeUint64(index) + } else { + result["blockHash"] = nil + result["blockNumber"] = nil + result["transactionIndex"] = nil + } - writeResult(w, tx.Hash().Hex(), req.ID) + return result } diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index f329e7b3..3c9fc32d 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -77,6 +77,8 @@ type ( DiscoveryDNS string Database string NoDiscovery bool + ProxyRPC bool + ProxyRPCTimeout time.Duration RequestsCache ds.LRUOptions ParentsCache ds.LRUOptions BlocksCache ds.LRUOptions @@ -501,6 +503,8 @@ will result in less chance of missing data but can significantly increase memory f.IntVar(&inputSensorParams.Port, "port", 30303, "TCP network listening port") f.IntVar(&inputSensorParams.DiscoveryPort, "discovery-port", 30303, "UDP P2P discovery port") f.StringVar(&inputSensorParams.RPC, "rpc", "https://polygon-rpc.com", "RPC endpoint used to fetch latest block") + f.BoolVar(&inputSensorParams.ProxyRPC, "proxy-rpc", false, "proxy unsupported RPC methods to the --rpc endpoint") + f.DurationVar(&inputSensorParams.ProxyRPCTimeout, "proxy-rpc-timeout", 30*time.Second, "timeout for proxied RPC requests") f.StringVar(&inputSensorParams.GenesisHash, "genesis-hash", "0xa9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b", "genesis block hash") f.BytesHexVar(&inputSensorParams.ForkID, "fork-id", []byte{34, 213, 35, 178}, "hex encoded fork ID (omit 0x)") f.IntVar(&inputSensorParams.DialRatio, "dial-ratio", 0, diff --git a/cmd/p2p/sensor/usage.md b/cmd/p2p/sensor/usage.md index c45a77c5..37109552 100644 --- a/cmd/p2p/sensor/usage.md +++ b/cmd/p2p/sensor/usage.md @@ -9,6 +9,34 @@ created automatically. The bootnodes may change, so refer to the [Polygon Knowledge Layer][bootnodes] if the sensor is not discovering peers. +## JSON-RPC Server + +The sensor runs a JSON-RPC server on port 8545 (configurable via `--rpc-port`) +that supports a subset of Ethereum JSON-RPC methods using cached data. + +### Supported Methods + +| Method | Description | +|--------|-------------| +| `eth_chainId` | Returns the chain ID | +| `eth_blockNumber` | Returns the current head block number | +| `eth_gasPrice` | Returns suggested gas price based on recent blocks | +| `eth_getBlockByHash` | Returns block by hash | +| `eth_getBlockByNumber` | Returns block by number (if cached) | +| `eth_getTransactionByHash` | Returns transaction by hash | +| `eth_getTransactionByBlockHashAndIndex` | Returns transaction at index in block | +| `eth_getBlockTransactionCountByHash` | Returns transaction count in block | +| `eth_getUncleCountByBlockHash` | Returns uncle count in block | +| `eth_sendRawTransaction` | Broadcasts signed transaction to peers | + +### Limitations + +Methods requiring state or receipts are not supported: +- `eth_getBalance`, `eth_getCode`, `eth_call`, `eth_estimateGas` +- `eth_getTransactionReceipt`, `eth_getLogs` + +Data is served from an LRU cache, so older blocks/transactions may not be available. + ## Metrics The sensor exposes Prometheus metrics at `http://localhost:2112/metrics` diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 57b64fbf..8caede4e 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -30,6 +30,34 @@ created automatically. The bootnodes may change, so refer to the [Polygon Knowledge Layer][bootnodes] if the sensor is not discovering peers. +## JSON-RPC Server + +The sensor runs a JSON-RPC server on port 8545 (configurable via `--rpc-port`) +that supports a subset of Ethereum JSON-RPC methods using cached data. + +### Supported Methods + +| Method | Description | +|--------|-------------| +| `eth_chainId` | Returns the chain ID | +| `eth_blockNumber` | Returns the current head block number | +| `eth_gasPrice` | Returns suggested gas price based on recent blocks | +| `eth_getBlockByHash` | Returns block by hash | +| `eth_getBlockByNumber` | Returns block by number (if cached) | +| `eth_getTransactionByHash` | Returns transaction by hash | +| `eth_getTransactionByBlockHashAndIndex` | Returns transaction at index in block | +| `eth_getBlockTransactionCountByHash` | Returns transaction count in block | +| `eth_getUncleCountByBlockHash` | Returns uncle count in block | +| `eth_sendRawTransaction` | Broadcasts signed transaction to peers | + +### Limitations + +Methods requiring state or receipts are not supported: +- `eth_getBalance`, `eth_getCode`, `eth_call`, `eth_estimateGas` +- `eth_getTransactionReceipt`, `eth_getLogs` + +Data is served from an LRU cache, so older blocks/transactions may not be available. + ## Metrics The sensor exposes Prometheus metrics at `http://localhost:2112/metrics` diff --git a/p2p/conns.go b/p2p/conns.go index 7690cd62..e92e21ea 100644 --- a/p2p/conns.go +++ b/p2p/conns.go @@ -334,6 +334,11 @@ func (c *Conns) AddTxs(txs []*types.Transaction) []common.Hash { return hashes } +// GetTx retrieves a transaction from the shared cache and updates LRU ordering. +func (c *Conns) GetTx(hash common.Hash) (*types.Transaction, bool) { + return c.txs.Get(hash) +} + // PeekTxs retrieves multiple transactions from the shared cache without updating LRU ordering. // Uses a single read lock for better concurrency when LRU ordering is not needed. func (c *Conns) PeekTxs(hashes []common.Hash) []*types.Transaction { @@ -367,7 +372,7 @@ func (c *Conns) HeadBlock() eth.NewBlockPacket { // Returns true if the head block was updated, false otherwise. func (c *Conns) UpdateHeadBlock(packet eth.NewBlockPacket) bool { return c.head.Update(func(current eth.NewBlockPacket) (eth.NewBlockPacket, bool) { - if current.Block == nil || (packet.Block.NumberU64() > current.Block.NumberU64() && packet.TD.Cmp(current.TD) == 1) { + if current.Block == nil || packet.Block.NumberU64() > current.Block.NumberU64() { return packet, true } return current, false @@ -431,6 +436,19 @@ func (c *Conns) GetPeerName(peerID string) string { return "" } +// GetBlockByNumber iterates through the cache to find a block by its number. +// Returns the hash, block cache, and true if found; empty values and false otherwise. +func (c *Conns) GetBlockByNumber(number uint64) (common.Hash, BlockCache, bool) { + for _, hash := range c.blocks.Keys() { + if cache, ok := c.blocks.Peek(hash); ok && cache.Header != nil { + if cache.Header.Number.Uint64() == number { + return hash, cache, true + } + } + } + return common.Hash{}, BlockCache{}, false +} + // GetPeerVersion returns the negotiated eth protocol version for a specific peer. // Returns 0 if the peer is not found. func (c *Conns) GetPeerVersion(peerID string) uint { @@ -443,3 +461,16 @@ func (c *Conns) GetPeerVersion(peerID string) uint { return 0 } + +// GetPeerLatestBlock returns the latest block hash and number for a peer. +// Returns zero hash and 0 if the peer is not found or no block has been received. +func (c *Conns) GetPeerLatestBlock(peerID string) (common.Hash, uint64) { + c.mu.RLock() + defer c.mu.RUnlock() + + if cn, ok := c.conns[peerID]; ok { + return cn.latestBlockHash.Get(), cn.latestBlockNumber.Get() + } + + return common.Hash{}, 0 +} diff --git a/p2p/datastructures/lru.go b/p2p/datastructures/lru.go index 2fd2e0ee..c38ce031 100644 --- a/p2p/datastructures/lru.go +++ b/p2p/datastructures/lru.go @@ -260,6 +260,19 @@ func (c *LRU[K, V]) Remove(key K) (V, bool) { return zero, false } +// Keys returns all keys in the cache in LRU order (most recent first). +func (c *LRU[K, V]) Keys() []K { + c.mu.RLock() + defer c.mu.RUnlock() + + keys := make([]K, 0, c.list.Len()) + for elem := c.list.Front(); elem != nil; elem = elem.Next() { + e := elem.Value.(*entry[K, V]) + keys = append(keys, e.key) + } + return keys +} + // AddBatch adds multiple key-value pairs to the cache. // Uses a single write lock for all additions, reducing lock contention // compared to calling Add in a loop. Keys and values must have the same length. diff --git a/p2p/gasprice.go b/p2p/gasprice.go new file mode 100644 index 00000000..b7c34906 --- /dev/null +++ b/p2p/gasprice.go @@ -0,0 +1,256 @@ +package p2p + +import ( + "math/big" + "sort" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// Gas price oracle constants (matching Bor/geth defaults) +const ( + // gpoSampleNumber is the number of transactions to sample per block + gpoSampleNumber = 3 + // gpoCheckBlocks is the number of blocks to check for gas price estimation + gpoCheckBlocks = 20 + // gpoPercentile is the percentile to use for gas price estimation + gpoPercentile = 60 +) + +var ( + // gpoMaxPrice is the maximum gas price to suggest (500 gwei) + gpoMaxPrice = big.NewInt(500_000_000_000) + // gpoIgnorePrice is the minimum tip to consider (2 gwei, lower than Bor's 25 gwei for broader network compatibility) + gpoIgnorePrice = big.NewInt(2_000_000_000) + // gpoDefaultPrice is the default gas price when no data is available (1 gwei) + gpoDefaultPrice = big.NewInt(1_000_000_000) +) + +// GasPriceOracle estimates gas prices based on recent block data. +// It follows Bor/geth's gas price oracle approach. +type GasPriceOracle struct { + conns *Conns + + mu sync.RWMutex + lastHead common.Hash + lastTip *big.Int +} + +// NewGasPriceOracle creates a new gas price oracle that uses the given Conns for block data. +func NewGasPriceOracle(conns *Conns) *GasPriceOracle { + return &GasPriceOracle{ + conns: conns, + } +} + +// SuggestGasPrice estimates the gas price based on recent blocks. +// For EIP-1559 networks, this returns baseFee + suggestedTip. +// For legacy networks, this returns the 60th percentile of gas prices. +func (o *GasPriceOracle) SuggestGasPrice() *big.Int { + head := o.conns.HeadBlock() + if head.Block == nil { + return gpoDefaultPrice + } + + // For EIP-1559: return baseFee + suggested tip + if baseFee := head.Block.BaseFee(); baseFee != nil { + tip := o.SuggestGasTipCap() + if tip == nil { + tip = gpoDefaultPrice + } + return new(big.Int).Add(baseFee, tip) + } + + // Legacy: return percentile of gas prices + return o.suggestLegacyGasPrice() +} + +// suggestLegacyGasPrice estimates gas price for pre-EIP-1559 networks. +func (o *GasPriceOracle) suggestLegacyGasPrice() *big.Int { + keys := o.conns.blocks.Keys() + if len(keys) == 0 { + return gpoDefaultPrice + } + + if len(keys) > gpoCheckBlocks { + keys = keys[:gpoCheckBlocks] + } + + var prices []*big.Int + for _, hash := range keys { + cache, ok := o.conns.blocks.Peek(hash) + if !ok || cache.Body == nil { + continue + } + + for _, tx := range cache.Body.Transactions { + if price := tx.GasPrice(); price != nil && price.Sign() > 0 { + prices = append(prices, new(big.Int).Set(price)) + } + } + } + + if len(prices) == 0 { + return gpoDefaultPrice + } + + sort.Slice(prices, func(i, j int) bool { + return prices[i].Cmp(prices[j]) < 0 + }) + + price := prices[(len(prices)-1)*gpoPercentile/100] + if price.Cmp(gpoMaxPrice) > 0 { + return new(big.Int).Set(gpoMaxPrice) + } + return price +} + +// SuggestGasTipCap estimates a gas tip cap (priority fee) based on recent blocks. +// This implementation follows Bor/geth's gas price oracle approach: +// - Samples the lowest N tips from each of the last M blocks +// - Ignores tips below a threshold +// - Returns the configured percentile of collected tips +// - Caches results until head changes +func (o *GasPriceOracle) SuggestGasTipCap() *big.Int { + head := o.conns.HeadBlock() + if head.Block == nil { + return nil + } + headHash := head.Block.Hash() + + // Check cache first + o.mu.RLock() + if headHash == o.lastHead && o.lastTip != nil { + tip := new(big.Int).Set(o.lastTip) + o.mu.RUnlock() + return tip + } + lastTip := o.lastTip + o.mu.RUnlock() + + // Collect tips from recent blocks + keys := o.conns.blocks.Keys() + if len(keys) == 0 { + return lastTip + } + + // Limit to checkBlocks most recent + if len(keys) > gpoCheckBlocks { + keys = keys[:gpoCheckBlocks] + } + + var results []*big.Int + for _, hash := range keys { + tips := o.getBlockTips(hash, gpoSampleNumber, gpoIgnorePrice) + if len(tips) == 0 && lastTip != nil { + // Empty block or all tips below threshold, use last tip + tips = []*big.Int{lastTip} + } + results = append(results, tips...) + } + + if len(results) == 0 { + return lastTip + } + + // Sort and get percentile + sort.Slice(results, func(i, j int) bool { + return results[i].Cmp(results[j]) < 0 + }) + tip := results[(len(results)-1)*gpoPercentile/100] + + // Apply max price cap + if tip.Cmp(gpoMaxPrice) > 0 { + tip = new(big.Int).Set(gpoMaxPrice) + } + + // Cache result + o.mu.Lock() + o.lastHead = headHash + o.lastTip = tip + o.mu.Unlock() + + return new(big.Int).Set(tip) +} + +// getBlockTips returns the lowest N tips from a block that are above the ignore threshold. +// Transactions are sorted by effective tip ascending, and the first N valid tips are returned. +func (o *GasPriceOracle) getBlockTips(hash common.Hash, limit int, ignoreUnder *big.Int) []*big.Int { + cache, ok := o.conns.blocks.Peek(hash) + if !ok || cache.Body == nil || cache.Header == nil { + return nil + } + + baseFee := cache.Header.BaseFee + if baseFee == nil { + return nil // Pre-EIP-1559 block + } + + // Calculate tips for all transactions + var allTips []*big.Int + for _, tx := range cache.Body.Transactions { + tip := effectiveGasTip(tx, baseFee) + if tip != nil && tip.Sign() > 0 { + allTips = append(allTips, tip) + } + } + + if len(allTips) == 0 { + return nil + } + + // Sort by tip ascending (lowest first, like Bor) + sort.Slice(allTips, func(i, j int) bool { + return allTips[i].Cmp(allTips[j]) < 0 + }) + + // Collect tips above threshold, up to limit + var tips []*big.Int + for _, tip := range allTips { + if ignoreUnder != nil && tip.Cmp(ignoreUnder) < 0 { + continue + } + tips = append(tips, tip) + if len(tips) >= limit { + break + } + } + + return tips +} + +// effectiveGasTip returns the effective tip (priority fee) for a transaction. +// For EIP-1559 transactions: min(maxPriorityFeePerGas, maxFeePerGas - baseFee) +// For legacy transactions: gasPrice - baseFee (the implicit tip) +// Returns nil if the tip cannot be determined or is negative. +func effectiveGasTip(tx *types.Transaction, baseFee *big.Int) *big.Int { + switch tx.Type() { + case types.DynamicFeeTxType, types.BlobTxType: + tip := tx.GasTipCap() + if tip == nil { + return nil + } + // Effective tip is min(maxPriorityFeePerGas, maxFeePerGas - baseFee) + if tx.GasFeeCap() != nil { + effectiveTip := new(big.Int).Sub(tx.GasFeeCap(), baseFee) + if effectiveTip.Cmp(tip) < 0 { + tip = effectiveTip + } + } + if tip.Sign() <= 0 { + return nil + } + return new(big.Int).Set(tip) + default: + // Legacy/AccessList transactions: tip is gasPrice - baseFee + if price := tx.GasPrice(); price != nil { + tip := new(big.Int).Sub(price, baseFee) + if tip.Sign() > 0 { + return tip + } + } + return nil + } +} diff --git a/p2p/log.go b/p2p/log.go index bfb26b8e..4ba2caba 100644 --- a/p2p/log.go +++ b/p2p/log.go @@ -89,6 +89,7 @@ func (c *MessageCount) IsEmpty() bool { return sum( c.BlockHeaders, c.BlockBodies, + c.Blocks, c.BlockHashes, c.BlockHeaderRequests, c.BlockBodiesRequests, @@ -98,16 +99,19 @@ func (c *MessageCount) IsEmpty() bool { c.Pings, c.Errors, c.Disconnects, + c.NewWitness, + c.NewWitnessHashes, + c.GetWitnessRequest, + c.Witness, ) == 0 } func sum(ints ...int64) int64 { - var sum int64 = 0 + var total int64 for _, i := range ints { - sum += i + total += i } - - return sum + return total } // IncrementByName increments the appropriate field based on message name. diff --git a/p2p/nodeset.go b/p2p/nodeset.go index 9572f853..aa25870f 100644 --- a/p2p/nodeset.go +++ b/p2p/nodeset.go @@ -97,7 +97,7 @@ func WriteURLs(file string, ns NodeSet) error { } } - urls := []string{} + var urls []string for url := range m { urls = append(urls, url) } @@ -130,7 +130,7 @@ func WritePeers(file string, urls []string) error { } func WriteDNSTreeNodes(file string, tree *dnsdisc.Tree) error { - urls := []string{} + var urls []string for _, node := range tree.Nodes() { urls = append(urls, node.URLv4()) } diff --git a/p2p/protocol.go b/p2p/protocol.go index d8fab248..c4dae523 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -99,6 +99,12 @@ type conn struct { // version stores the negotiated eth protocol version (e.g., 68 or 69). version uint + + // latestBlockHash and latestBlockNumber track the most recent block + // information received from this peer via NewBlock, NewBlockHashes, + // or BlockRangeUpdate messages. Uses ds.Locked for thread-safe API access. + latestBlockHash *ds.Locked[common.Hash] + latestBlockNumber *ds.Locked[uint64] } // EthProtocolOptions is the options used when creating a new eth protocol. @@ -156,6 +162,8 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { blockAnnounce: make(chan eth.NewBlockHashesPacket, maxQueuedBlockAnns), closeCh: make(chan struct{}), version: version, + latestBlockHash: &ds.Locked[common.Hash]{}, + latestBlockNumber: &ds.Locked[uint64]{}, } // Ensure cleanup happens on any exit path (including statusExchange failure) @@ -432,6 +440,12 @@ func (c *conn) handleBlockRangeUpdate(msg ethp2p.Msg) error { Hex("hash", packet.LatestBlockHash[:]). Msg("Received BlockRangeUpdate") + // Update latest block info from the range update (thread-safe for API access) + if packet.LatestBlock > c.latestBlockNumber.Get() { + c.latestBlockHash.Set(packet.LatestBlockHash) + c.latestBlockNumber.Set(packet.LatestBlock) + } + return nil } @@ -529,6 +543,12 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { for _, entry := range packet { hash := entry.Hash + // Update latest block info if this block is newer (thread-safe for API access) + if entry.Number > c.latestBlockNumber.Get() { + c.latestBlockHash.Set(hash) + c.latestBlockNumber.Set(entry.Number) + } + // Mark as known from this peer c.addKnownBlock(hash) @@ -891,12 +911,23 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error { c.db.WriteBlockHeaders(ctx, headers, tfs, isParent) // Update cache to store headers + var head *types.Header for _, header := range headers { - hash := header.Hash() - c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { + c.conns.Blocks().Update(header.Hash(), func(cache BlockCache) BlockCache { cache.Header = header return cache }) + + if head == nil || header.Number.Cmp(head.Number) > 0 { + head = header + } + } + + if head != nil { + c.conns.UpdateHeadBlock(eth.NewBlockPacket{ + Block: types.NewBlockWithHeader(head), + TD: c.conns.HeadBlock().TD, + }) } return nil @@ -966,12 +997,26 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { c.db.WriteBlockBody(ctx, body, hash, tfs) - // Update cache to store body + // Update cache and try to update head block if we now have complete block data. + var header *types.Header c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { cache.Body = body + header = cache.Header return cache }) + if header != nil { + block := types.NewBlockWithHeader(header).WithBody(types.Body{ + Transactions: body.Transactions, + Uncles: body.Uncles, + Withdrawals: body.Withdrawals, + }) + c.conns.UpdateHeadBlock(eth.NewBlockPacket{ + Block: block, + TD: c.conns.HeadBlock().TD, + }) + } + return nil } @@ -999,6 +1044,10 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), 1) + // Update latest block info for this peer (thread-safe for API access) + c.latestBlockHash.Set(hash) + c.latestBlockNumber.Set(packet.Block.Number().Uint64()) + // Mark block as known from this peer c.addKnownBlock(hash)