2018-07-21 22:49:36 +03:00
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Handler is the API for Mutable Resources
// It enables creating, updating, syncing and retrieving resources and their update data
package mru
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"unsafe"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage"
)
const chunkSize = 4096 // temporary until we implement FileStore in the resourcehandler
type Handler struct {
chunkStore * storage . NetStore
HashSize int
resources map [ uint64 ] * resource
resourceLock sync . RWMutex
storeTimeout time . Duration
queryMaxPeriods uint32
}
// HandlerParams pass parameters to the Handler constructor NewHandler
// Signer and TimestampProvider are mandatory parameters
type HandlerParams struct {
QueryMaxPeriods uint32
}
// hashPool contains a pool of ready hashers
var hashPool sync . Pool
var minimumChunkLength int
// init initializes the package and hashPool
func init ( ) {
hashPool = sync . Pool {
New : func ( ) interface { } {
return storage . MakeHashFunc ( resourceHashAlgorithm ) ( )
} ,
}
if minimumMetadataLength < minimumUpdateDataLength {
minimumChunkLength = minimumMetadataLength
} else {
minimumChunkLength = minimumUpdateDataLength
}
}
// NewHandler creates a new Mutable Resource API
func NewHandler ( params * HandlerParams ) ( * Handler , error ) {
rh := & Handler {
resources : make ( map [ uint64 ] * resource ) ,
storeTimeout : defaultStoreTimeout ,
queryMaxPeriods : params . QueryMaxPeriods ,
}
for i := 0 ; i < hasherCount ; i ++ {
hashfunc := storage . MakeHashFunc ( resourceHashAlgorithm ) ( )
if rh . HashSize == 0 {
rh . HashSize = hashfunc . Size ( )
}
hashPool . Put ( hashfunc )
}
return rh , nil
}
// SetStore sets the store backend for the Mutable Resource API
func ( h * Handler ) SetStore ( store * storage . NetStore ) {
h . chunkStore = store
}
// Validate is a chunk validation method
// If it looks like a resource update, the chunk address is checked against the ownerAddr of the update's signature
// It implements the storage.ChunkValidator interface
func ( h * Handler ) Validate ( chunkAddr storage . Address , data [ ] byte ) bool {
dataLength := len ( data )
if dataLength < minimumChunkLength {
return false
}
//metadata chunks have the first two bytes set to zero
if data [ 0 ] == 0 && data [ 1 ] == 0 && dataLength >= minimumMetadataLength {
//metadata chunk
rootAddr , _ := metadataHash ( data )
valid := bytes . Equal ( chunkAddr , rootAddr )
if ! valid {
log . Debug ( fmt . Sprintf ( "Invalid root metadata chunk with address: %s" , chunkAddr . Hex ( ) ) )
}
return valid
}
// if it is not a metadata chunk, check if it is a properly formatted update chunk with
// valid signature and proof of ownership of the resource it is trying
// to update
// First, deserialize the chunk
var r SignedResourceUpdate
if err := r . fromChunk ( chunkAddr , data ) ; err != nil {
log . Debug ( "Invalid resource chunk with address %s: %s " , chunkAddr . Hex ( ) , err . Error ( ) )
return false
}
// check that the lookup information contained in the chunk matches the updateAddr (chunk search key)
// that was used to retrieve this chunk
// if this validation fails, someone forged a chunk.
if ! bytes . Equal ( chunkAddr , r . updateHeader . UpdateAddr ( ) ) {
log . Debug ( "period,version,rootAddr contained in update chunk do not match updateAddr %s" , chunkAddr . Hex ( ) )
return false
}
// Verify signatures and that the signer actually owns the resource
// If it fails, it means either the signature is not valid, data is corrupted
// or someone is trying to update someone else's resource.
if err := r . Verify ( ) ; err != nil {
log . Debug ( "Invalid signature: %v" , err )
return false
}
return true
}
// GetContent retrieves the data payload of the last synced update of the Mutable Resource
func ( h * Handler ) GetContent ( rootAddr storage . Address ) ( storage . Address , [ ] byte , error ) {
rsrc := h . get ( rootAddr )
if rsrc == nil || ! rsrc . isSynced ( ) {
return nil , nil , NewError ( ErrNotFound , " does not exist or is not synced" )
}
return rsrc . lastKey , rsrc . data , nil
}
// GetLastPeriod retrieves the period of the last synced update of the Mutable Resource
func ( h * Handler ) GetLastPeriod ( rootAddr storage . Address ) ( uint32 , error ) {
rsrc := h . get ( rootAddr )
if rsrc == nil {
return 0 , NewError ( ErrNotFound , " does not exist" )
} else if ! rsrc . isSynced ( ) {
return 0 , NewError ( ErrNotSynced , " is not synced" )
}
return rsrc . period , nil
}
// GetVersion retrieves the period of the last synced update of the Mutable Resource
func ( h * Handler ) GetVersion ( rootAddr storage . Address ) ( uint32 , error ) {
rsrc := h . get ( rootAddr )
if rsrc == nil {
return 0 , NewError ( ErrNotFound , " does not exist" )
} else if ! rsrc . isSynced ( ) {
return 0 , NewError ( ErrNotSynced , " is not synced" )
}
return rsrc . version , nil
}
// \TODO should be hashsize * branches from the chosen chunker, implement with FileStore
func ( h * Handler ) chunkSize ( ) int64 {
return chunkSize
}
// New creates a new metadata chunk out of the request passed in.
func ( h * Handler ) New ( ctx context . Context , request * Request ) error {
// frequency 0 is invalid
if request . metadata . Frequency == 0 {
return NewError ( ErrInvalidValue , "frequency cannot be 0 when creating a resource" )
}
// make sure owner is set to something
if request . metadata . Owner == zeroAddr {
return NewError ( ErrInvalidValue , "ownerAddr must be set to create a new metadata chunk" )
}
// create the meta chunk and store it in swarm
chunk , metaHash , err := request . metadata . newChunk ( )
if err != nil {
return err
}
if request . metaHash != nil && ! bytes . Equal ( request . metaHash , metaHash ) ||
request . rootAddr != nil && ! bytes . Equal ( request . rootAddr , chunk . Addr ) {
return NewError ( ErrInvalidValue , "metaHash in UpdateRequest does not match actual metadata" )
}
request . metaHash = metaHash
request . rootAddr = chunk . Addr
h . chunkStore . Put ( ctx , chunk )
log . Debug ( "new resource" , "name" , request . metadata . Name , "startTime" , request . metadata . StartTime , "frequency" , request . metadata . Frequency , "owner" , request . metadata . Owner )
// create the internal index for the resource and populate it with its metadata
rsrc := & resource {
resourceUpdate : resourceUpdate {
updateHeader : updateHeader {
UpdateLookup : UpdateLookup {
rootAddr : chunk . Addr ,
} ,
} ,
} ,
ResourceMetadata : request . metadata ,
updated : time . Now ( ) ,
}
h . set ( chunk . Addr , rsrc )
return nil
}
// NewUpdateRequest prepares an UpdateRequest structure with all the necessary information to
// just add the desired data and sign it.
// The resulting structure can then be signed and passed to Handler.Update to be verified and sent
func ( h * Handler ) NewUpdateRequest ( ctx context . Context , rootAddr storage . Address ) ( updateRequest * Request , err error ) {
if rootAddr == nil {
return nil , NewError ( ErrInvalidValue , "rootAddr cannot be nil" )
}
// Make sure we have a cache of the metadata chunk
rsrc , err := h . Load ( ctx , rootAddr )
if err != nil {
return nil , err
}
now := TimestampProvider . Now ( )
updateRequest = new ( Request )
updateRequest . period , err = getNextPeriod ( rsrc . StartTime . Time , now . Time , rsrc . Frequency )
if err != nil {
return nil , err
}
if _ , err = h . lookup ( rsrc , LookupLatestVersionInPeriod ( rsrc . rootAddr , updateRequest . period ) ) ; err != nil {
if err . ( * Error ) . code != ErrNotFound {
return nil , err
}
// not finding updates means that there is a network error
// or that the resource really does not have updates in this period.
}
updateRequest . multihash = rsrc . multihash
updateRequest . rootAddr = rsrc . rootAddr
updateRequest . metaHash = rsrc . metaHash
updateRequest . metadata = rsrc . ResourceMetadata
// if we already have an update for this period then increment version
// resource object MUST be in sync for version to be correct, but we checked this earlier in the method already
if h . hasUpdate ( rootAddr , updateRequest . period ) {
updateRequest . version = rsrc . version + 1
} else {
updateRequest . version = 1
}
return updateRequest , nil
}
// Lookup retrieves a specific or latest version of the resource update with metadata chunk at params.Root
// Lookup works differently depending on the configuration of `LookupParams`
// See the `LookupParams` documentation and helper functions:
// `LookupLatest`, `LookupLatestVersionInPeriod` and `LookupVersion`
// When looking for the latest update, it starts at the next period after the current time.
// upon failure tries the corresponding keys of each previous period until one is found
// (or startTime is reached, in which case there are no updates).
func ( h * Handler ) Lookup ( ctx context . Context , params * LookupParams ) ( * resource , error ) {
rsrc := h . get ( params . rootAddr )
if rsrc == nil {
return nil , NewError ( ErrNothingToReturn , "resource not loaded" )
}
return h . lookup ( rsrc , params )
}
// LookupPrevious returns the resource before the one currently loaded in the resource cache
// This is useful where resource updates are used incrementally in contrast to
// merely replacing content.
// Requires a cached resource object to determine the current state of the resource.
func ( h * Handler ) LookupPrevious ( ctx context . Context , params * LookupParams ) ( * resource , error ) {
rsrc := h . get ( params . rootAddr )
if rsrc == nil {
return nil , NewError ( ErrNothingToReturn , "resource not loaded" )
}
if ! rsrc . isSynced ( ) {
return nil , NewError ( ErrNotSynced , "LookupPrevious requires synced resource." )
} else if rsrc . period == 0 {
return nil , NewError ( ErrNothingToReturn , " not found" )
}
var version , period uint32
if rsrc . version > 1 {
version = rsrc . version - 1
period = rsrc . period
} else if rsrc . period == 1 {
return nil , NewError ( ErrNothingToReturn , "Current update is the oldest" )
} else {
version = 0
period = rsrc . period - 1
}
return h . lookup ( rsrc , NewLookupParams ( rsrc . rootAddr , period , version , params . Limit ) )
}
// base code for public lookup methods
func ( h * Handler ) lookup ( rsrc * resource , params * LookupParams ) ( * resource , error ) {
lp := * params
// we can't look for anything without a store
if h . chunkStore == nil {
return nil , NewError ( ErrInit , "Call Handler.SetStore() before performing lookups" )
}
var specificperiod bool
if lp . period > 0 {
specificperiod = true
} else {
// get the current time and the next period
now := TimestampProvider . Now ( )
var period uint32
period , err := getNextPeriod ( rsrc . StartTime . Time , now . Time , rsrc . Frequency )
if err != nil {
return nil , err
}
lp . period = period
}
// start from the last possible period, and iterate previous ones
// (unless we want a specific period only) until we find a match.
// If we hit startTime we're out of options
var specificversion bool
if lp . version > 0 {
specificversion = true
} else {
lp . version = 1
}
var hops uint32
if lp . Limit == 0 {
lp . Limit = h . queryMaxPeriods
}
log . Trace ( "resource lookup" , "period" , lp . period , "version" , lp . version , "limit" , lp . Limit )
for lp . period > 0 {
if lp . Limit != 0 && hops > lp . Limit {
return nil , NewErrorf ( ErrPeriodDepth , "Lookup exceeded max period hops (%d)" , lp . Limit )
}
updateAddr := lp . UpdateAddr ( )
chunk , err := h . chunkStore . GetWithTimeout ( context . TODO ( ) , updateAddr , defaultRetrieveTimeout )
if err == nil {
if specificversion {
return h . updateIndex ( rsrc , chunk )
}
// check if we have versions > 1. If a version fails, the previous version is used and returned.
log . Trace ( "rsrc update version 1 found, checking for version updates" , "period" , lp . period , "updateAddr" , updateAddr )
for {
newversion := lp . version + 1
updateAddr := lp . UpdateAddr ( )
newchunk , err := h . chunkStore . GetWithTimeout ( context . TODO ( ) , updateAddr , defaultRetrieveTimeout )
if err != nil {
return h . updateIndex ( rsrc , chunk )
}
chunk = newchunk
lp . version = newversion
log . Trace ( "version update found, checking next" , "version" , lp . version , "period" , lp . period , "updateAddr" , updateAddr )
}
}
if specificperiod {
break
}
log . Trace ( "rsrc update not found, checking previous period" , "period" , lp . period , "updateAddr" , updateAddr )
lp . period --
hops ++
}
return nil , NewError ( ErrNotFound , "no updates found" )
}
// Load retrieves the Mutable Resource metadata chunk stored at rootAddr
// Upon retrieval it creates/updates the index entry for it with metadata corresponding to the chunk contents
func ( h * Handler ) Load ( ctx context . Context , rootAddr storage . Address ) ( * resource , error ) {
chunk , err := h . chunkStore . GetWithTimeout ( ctx , rootAddr , defaultRetrieveTimeout )
if err != nil {
return nil , NewError ( ErrNotFound , err . Error ( ) )
}
// create the index entry
rsrc := & resource { }
if err := rsrc . ResourceMetadata . binaryGet ( chunk . SData ) ; err != nil { // Will fail if this is not really a metadata chunk
return nil , err
}
rsrc . rootAddr , rsrc . metaHash = metadataHash ( chunk . SData )
if ! bytes . Equal ( rsrc . rootAddr , rootAddr ) {
return nil , NewError ( ErrCorruptData , "Corrupt metadata chunk" )
}
h . set ( rootAddr , rsrc )
log . Trace ( "resource index load" , "rootkey" , rootAddr , "name" , rsrc . ResourceMetadata . Name , "starttime" , rsrc . ResourceMetadata . StartTime , "frequency" , rsrc . ResourceMetadata . Frequency )
return rsrc , nil
}
// update mutable resource index map with specified content
func ( h * Handler ) updateIndex ( rsrc * resource , chunk * storage . Chunk ) ( * resource , error ) {
// retrieve metadata from chunk data and check that it matches this mutable resource
var r SignedResourceUpdate
if err := r . fromChunk ( chunk . Addr , chunk . SData ) ; err != nil {
return nil , err
}
log . Trace ( "resource index update" , "name" , rsrc . ResourceMetadata . Name , "updatekey" , chunk . Addr , "period" , r . period , "version" , r . version )
// update our rsrcs entry map
rsrc . lastKey = chunk . Addr
rsrc . period = r . period
rsrc . version = r . version
rsrc . updated = time . Now ( )
rsrc . data = make ( [ ] byte , len ( r . data ) )
rsrc . multihash = r . multihash
copy ( rsrc . data , r . data )
rsrc . Reader = bytes . NewReader ( rsrc . data )
log . Debug ( "resource synced" , "name" , rsrc . ResourceMetadata . Name , "updateAddr" , chunk . Addr , "period" , rsrc . period , "version" , rsrc . version )
h . set ( chunk . Addr , rsrc )
return rsrc , nil
}
// Update adds an actual data update
// Uses the Mutable Resource metadata currently loaded in the resources map entry.
// It is the caller's responsibility to make sure that this data is not stale.
// Note that a Mutable Resource update cannot span chunks, and thus has a MAX NET LENGTH 4096, INCLUDING update header data and signature. An error will be returned if the total length of the chunk payload will exceed this limit.
// Update can only check if the caller is trying to overwrite the very last known version, otherwise it just puts the update
// on the network.
func ( h * Handler ) Update ( ctx context . Context , r * SignedResourceUpdate ) ( storage . Address , error ) {
return h . update ( ctx , r )
}
// create and commit an update
func ( h * Handler ) update ( ctx context . Context , r * SignedResourceUpdate ) ( updateAddr storage . Address , err error ) {
// we can't update anything without a store
if h . chunkStore == nil {
return nil , NewError ( ErrInit , "Call Handler.SetStore() before updating" )
}
rsrc := h . get ( r . rootAddr )
if rsrc != nil && rsrc . period != 0 && rsrc . version != 0 && // This is the only cheap check we can do for sure
rsrc . period == r . period && rsrc . version >= r . version { // without having to lookup update chunks
return nil , NewError ( ErrInvalidValue , "A former update in this period is already known to exist" )
}
chunk , err := r . toChunk ( ) // Serialize the update into a chunk. Fails if data is too big
if err != nil {
return nil , err
}
// send the chunk
h . chunkStore . Put ( ctx , chunk )
log . Trace ( "resource update" , "updateAddr" , r . updateAddr , "lastperiod" , r . period , "version" , r . version , "data" , chunk . SData , "multihash" , r . multihash )
// update our resources map entry if the new update is older than the one we have, if we have it.
2018-08-07 12:53:56 +03:00
if rsrc != nil && ( r . period > rsrc . period || ( rsrc . period == r . period && r . version > rsrc . version ) ) {
2018-07-21 22:49:36 +03:00
rsrc . period = r . period
rsrc . version = r . version
rsrc . data = make ( [ ] byte , len ( r . data ) )
rsrc . updated = time . Now ( )
rsrc . lastKey = r . updateAddr
rsrc . multihash = r . multihash
copy ( rsrc . data , r . data )
rsrc . Reader = bytes . NewReader ( rsrc . data )
}
return r . updateAddr , nil
}
// Retrieves the resource index value for the given nameHash
func ( h * Handler ) get ( rootAddr storage . Address ) * resource {
if len ( rootAddr ) < storage . KeyLength {
log . Warn ( "Handler.get with invalid rootAddr" )
return nil
}
hashKey := * ( * uint64 ) ( unsafe . Pointer ( & rootAddr [ 0 ] ) )
h . resourceLock . RLock ( )
defer h . resourceLock . RUnlock ( )
rsrc := h . resources [ hashKey ]
return rsrc
}
// Sets the resource index value for the given nameHash
func ( h * Handler ) set ( rootAddr storage . Address , rsrc * resource ) {
if len ( rootAddr ) < storage . KeyLength {
log . Warn ( "Handler.set with invalid rootAddr" )
return
}
hashKey := * ( * uint64 ) ( unsafe . Pointer ( & rootAddr [ 0 ] ) )
h . resourceLock . Lock ( )
defer h . resourceLock . Unlock ( )
h . resources [ hashKey ] = rsrc
}
// Checks if we already have an update on this resource, according to the value in the current state of the resource index
func ( h * Handler ) hasUpdate ( rootAddr storage . Address , period uint32 ) bool {
rsrc := h . get ( rootAddr )
return rsrc != nil && rsrc . period == period
}