Merge pull request #2565 from danielnorberg/master
lock around read operations in graph
This commit is contained in:
commit
310fbe7bb7
1 changed files with 79 additions and 42 deletions
|
@ -48,7 +48,7 @@ type WalkFunc func(fullPath string, entity *Entity) error
|
||||||
// Graph database for storing entities and their relationships
|
// Graph database for storing entities and their relationships
|
||||||
type Database struct {
|
type Database struct {
|
||||||
conn *sql.DB
|
conn *sql.DB
|
||||||
mux sync.Mutex
|
mux sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new graph database initialized with a root entity
|
// Create a new graph database initialized with a root entity
|
||||||
|
@ -138,7 +138,14 @@ func (db *Database) Set(fullPath, id string) (*Entity, error) {
|
||||||
|
|
||||||
// Return true if a name already exists in the database
|
// Return true if a name already exists in the database
|
||||||
func (db *Database) Exists(name string) bool {
|
func (db *Database) Exists(name string) bool {
|
||||||
return db.Get(name) != nil
|
db.mux.RLock()
|
||||||
|
defer db.mux.RUnlock()
|
||||||
|
|
||||||
|
e, err := db.get(name)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return e != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Database) setEdge(parentPath, name string, e *Entity) error {
|
func (db *Database) setEdge(parentPath, name string, e *Entity) error {
|
||||||
|
@ -165,6 +172,9 @@ func (db *Database) RootEntity() *Entity {
|
||||||
|
|
||||||
// Return the entity for a given path
|
// Return the entity for a given path
|
||||||
func (db *Database) Get(name string) *Entity {
|
func (db *Database) Get(name string) *Entity {
|
||||||
|
db.mux.RLock()
|
||||||
|
defer db.mux.RUnlock()
|
||||||
|
|
||||||
e, err := db.get(name)
|
e, err := db.get(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -200,23 +210,36 @@ func (db *Database) get(name string) (*Entity, error) {
|
||||||
// List all entities by from the name
|
// List all entities by from the name
|
||||||
// The key will be the full path of the entity
|
// The key will be the full path of the entity
|
||||||
func (db *Database) List(name string, depth int) Entities {
|
func (db *Database) List(name string, depth int) Entities {
|
||||||
|
db.mux.RLock()
|
||||||
|
defer db.mux.RUnlock()
|
||||||
|
|
||||||
out := Entities{}
|
out := Entities{}
|
||||||
e, err := db.get(name)
|
e, err := db.get(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
for c := range db.children(e, name, depth) {
|
|
||||||
|
children, err := db.children(e, name, depth, nil)
|
||||||
|
if err != nil {
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range children {
|
||||||
out[c.FullPath] = c.Entity
|
out[c.FullPath] = c.Entity
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Walk through the child graph of an entity, calling walkFunc for each child entity.
|
||||||
|
// It is safe for walkFunc to call graph functions.
|
||||||
func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error {
|
func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error {
|
||||||
e, err := db.get(name)
|
children, err := db.Children(name, depth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for c := range db.children(e, name, depth) {
|
|
||||||
|
// Note: the database lock must not be held while calling walkFunc
|
||||||
|
for _, c := range children {
|
||||||
if err := walkFunc(c.FullPath, c.Entity); err != nil {
|
if err := walkFunc(c.FullPath, c.Entity); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -224,8 +247,24 @@ func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return the children of the specified entity
|
||||||
|
func (db *Database) Children(name string, depth int) ([]WalkMeta, error) {
|
||||||
|
db.mux.RLock()
|
||||||
|
defer db.mux.RUnlock()
|
||||||
|
|
||||||
|
e, err := db.get(name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.children(e, name, depth, nil)
|
||||||
|
}
|
||||||
|
|
||||||
// Return the refrence count for a specified id
|
// Return the refrence count for a specified id
|
||||||
func (db *Database) Refs(id string) int {
|
func (db *Database) Refs(id string) int {
|
||||||
|
db.mux.RLock()
|
||||||
|
defer db.mux.RUnlock()
|
||||||
|
|
||||||
var count int
|
var count int
|
||||||
if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil {
|
if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil {
|
||||||
return 0
|
return 0
|
||||||
|
@ -235,6 +274,9 @@ func (db *Database) Refs(id string) int {
|
||||||
|
|
||||||
// Return all the id's path references
|
// Return all the id's path references
|
||||||
func (db *Database) RefPaths(id string) Edges {
|
func (db *Database) RefPaths(id string) Edges {
|
||||||
|
db.mux.RLock()
|
||||||
|
defer db.mux.RUnlock()
|
||||||
|
|
||||||
refs := Edges{}
|
refs := Edges{}
|
||||||
|
|
||||||
rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id)
|
rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id)
|
||||||
|
@ -356,56 +398,51 @@ type WalkMeta struct {
|
||||||
Edge *Edge
|
Edge *Edge
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Database) children(e *Entity, name string, depth int) <-chan WalkMeta {
|
func (db *Database) children(e *Entity, name string, depth int, entities []WalkMeta) ([]WalkMeta, error) {
|
||||||
out := make(chan WalkMeta)
|
|
||||||
if e == nil {
|
if e == nil {
|
||||||
close(out)
|
return entities, nil
|
||||||
return out
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id)
|
||||||
rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id)
|
if err != nil {
|
||||||
if err != nil {
|
return nil, err
|
||||||
close(out)
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var entityId, entityName string
|
||||||
|
if err := rows.Scan(&entityId, &entityName); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
child := &Entity{entityId}
|
||||||
|
edge := &Edge{
|
||||||
|
ParentID: e.id,
|
||||||
|
Name: entityName,
|
||||||
|
EntityID: child.id,
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
for rows.Next() {
|
meta := WalkMeta{
|
||||||
var entityId, entityName string
|
Parent: e,
|
||||||
if err := rows.Scan(&entityId, &entityName); err != nil {
|
Entity: child,
|
||||||
// Log error
|
FullPath: path.Join(name, edge.Name),
|
||||||
continue
|
Edge: edge,
|
||||||
}
|
}
|
||||||
child := &Entity{entityId}
|
|
||||||
edge := &Edge{
|
|
||||||
ParentID: e.id,
|
|
||||||
Name: entityName,
|
|
||||||
EntityID: child.id,
|
|
||||||
}
|
|
||||||
|
|
||||||
meta := WalkMeta{
|
entities = append(entities, meta)
|
||||||
Parent: e,
|
|
||||||
Entity: child,
|
|
||||||
FullPath: path.Join(name, edge.Name),
|
|
||||||
Edge: edge,
|
|
||||||
}
|
|
||||||
|
|
||||||
out <- meta
|
if depth != 0 {
|
||||||
if depth == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
nDepth := depth
|
nDepth := depth
|
||||||
if depth != -1 {
|
if depth != -1 {
|
||||||
nDepth -= 1
|
nDepth -= 1
|
||||||
}
|
}
|
||||||
sc := db.children(child, meta.FullPath, nDepth)
|
entities, err = db.children(child, meta.FullPath, nDepth, entities)
|
||||||
for c := range sc {
|
if err != nil {
|
||||||
out <- c
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(out)
|
}
|
||||||
}()
|
|
||||||
return out
|
return entities, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the entity based on the parent path and name
|
// Return the entity based on the parent path and name
|
||||||
|
|
Loading…
Add table
Reference in a new issue