018f19f8c9
Signed-off-by: John Howard <jhoward@microsoft.com>
(cherry picked from commit 3f6127b173
)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
551 lines
12 KiB
Go
551 lines
12 KiB
Go
package graphdb
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
const (
|
|
createEntityTable = `
|
|
CREATE TABLE IF NOT EXISTS entity (
|
|
id text NOT NULL PRIMARY KEY
|
|
);`
|
|
|
|
createEdgeTable = `
|
|
CREATE TABLE IF NOT EXISTS edge (
|
|
"entity_id" text NOT NULL,
|
|
"parent_id" text NULL,
|
|
"name" text NOT NULL,
|
|
CONSTRAINT "parent_fk" FOREIGN KEY ("parent_id") REFERENCES "entity" ("id"),
|
|
CONSTRAINT "entity_fk" FOREIGN KEY ("entity_id") REFERENCES "entity" ("id")
|
|
);
|
|
`
|
|
|
|
createEdgeIndices = `
|
|
CREATE UNIQUE INDEX IF NOT EXISTS "name_parent_ix" ON "edge" (parent_id, name);
|
|
`
|
|
)
|
|
|
|
// Entity with a unique id.
|
|
type Entity struct {
|
|
id string
|
|
}
|
|
|
|
// An Edge connects two entities together.
|
|
type Edge struct {
|
|
EntityID string
|
|
Name string
|
|
ParentID string
|
|
}
|
|
|
|
// Entities stores the list of entities.
|
|
type Entities map[string]*Entity
|
|
|
|
// Edges stores the relationships between entities.
|
|
type Edges []*Edge
|
|
|
|
// WalkFunc is a function invoked to process an individual entity.
|
|
type WalkFunc func(fullPath string, entity *Entity) error
|
|
|
|
// Database is a graph database for storing entities and their relationships.
|
|
type Database struct {
|
|
conn *sql.DB
|
|
mux sync.RWMutex
|
|
}
|
|
|
|
// IsNonUniqueNameError processes the error to check if it's caused by
|
|
// a constraint violation.
|
|
// This is necessary because the error isn't the same across various
|
|
// sqlite versions.
|
|
func IsNonUniqueNameError(err error) bool {
|
|
str := err.Error()
|
|
// sqlite 3.7.17-1ubuntu1 returns:
|
|
// Set failure: Abort due to constraint violation: columns parent_id, name are not unique
|
|
if strings.HasSuffix(str, "name are not unique") {
|
|
return true
|
|
}
|
|
// sqlite-3.8.3-1.fc20 returns:
|
|
// Set failure: Abort due to constraint violation: UNIQUE constraint failed: edge.parent_id, edge.name
|
|
if strings.Contains(str, "UNIQUE constraint failed") && strings.Contains(str, "edge.name") {
|
|
return true
|
|
}
|
|
// sqlite-3.6.20-1.el6 returns:
|
|
// Set failure: Abort due to constraint violation: constraint failed
|
|
if strings.HasSuffix(str, "constraint failed") {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// NewDatabase creates a new graph database initialized with a root entity.
|
|
func NewDatabase(conn *sql.DB) (*Database, error) {
|
|
if conn == nil {
|
|
return nil, fmt.Errorf("Database connection cannot be nil")
|
|
}
|
|
db := &Database{conn: conn}
|
|
|
|
// Create root entities
|
|
tx, err := conn.Begin()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if _, err := tx.Exec(createEntityTable); err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := tx.Exec(createEdgeTable); err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := tx.Exec(createEdgeIndices); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if _, err := tx.Exec("DELETE FROM entity where id = ?", "0"); err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
if _, err := tx.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
if _, err := tx.Exec("DELETE FROM edge where entity_id=? and name=?", "0", "/"); err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
if _, err := tx.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// Close the underlying connection to the database.
|
|
func (db *Database) Close() error {
|
|
return db.conn.Close()
|
|
}
|
|
|
|
// Set the entity id for a given path.
|
|
func (db *Database) Set(fullPath, id string) (*Entity, error) {
|
|
db.mux.Lock()
|
|
defer db.mux.Unlock()
|
|
|
|
tx, err := db.conn.Begin()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var entityID string
|
|
if err := tx.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityID); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
if _, err := tx.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
} else {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
}
|
|
e := &Entity{id}
|
|
|
|
parentPath, name := splitPath(fullPath)
|
|
if err := db.setEdge(parentPath, name, e, tx); err != nil {
|
|
tx.Rollback()
|
|
return nil, err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, err
|
|
}
|
|
return e, nil
|
|
}
|
|
|
|
// Exists returns true if a name already exists in the database.
|
|
func (db *Database) Exists(name string) bool {
|
|
db.mux.RLock()
|
|
defer db.mux.RUnlock()
|
|
|
|
e, err := db.get(name)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return e != nil
|
|
}
|
|
|
|
func (db *Database) setEdge(parentPath, name string, e *Entity, tx *sql.Tx) error {
|
|
parent, err := db.get(parentPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if parent.id == e.id {
|
|
return fmt.Errorf("Cannot set self as child")
|
|
}
|
|
|
|
if _, err := tx.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RootEntity returns the root "/" entity for the database.
|
|
func (db *Database) RootEntity() *Entity {
|
|
return &Entity{
|
|
id: "0",
|
|
}
|
|
}
|
|
|
|
// Get returns the entity for a given path.
|
|
func (db *Database) Get(name string) *Entity {
|
|
db.mux.RLock()
|
|
defer db.mux.RUnlock()
|
|
|
|
e, err := db.get(name)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return e
|
|
}
|
|
|
|
func (db *Database) get(name string) (*Entity, error) {
|
|
e := db.RootEntity()
|
|
// We always know the root name so return it if
|
|
// it is requested
|
|
if name == "/" {
|
|
return e, nil
|
|
}
|
|
|
|
parts := split(name)
|
|
for i := 1; i < len(parts); i++ {
|
|
p := parts[i]
|
|
if p == "" {
|
|
continue
|
|
}
|
|
|
|
next := db.child(e, p)
|
|
if next == nil {
|
|
return nil, fmt.Errorf("Cannot find child for %s", name)
|
|
}
|
|
e = next
|
|
}
|
|
return e, nil
|
|
|
|
}
|
|
|
|
// List all entities by from the name.
|
|
// The key will be the full path of the entity.
|
|
func (db *Database) List(name string, depth int) Entities {
|
|
db.mux.RLock()
|
|
defer db.mux.RUnlock()
|
|
|
|
out := Entities{}
|
|
e, err := db.get(name)
|
|
if err != nil {
|
|
return out
|
|
}
|
|
|
|
children, err := db.children(e, name, depth, nil)
|
|
if err != nil {
|
|
return out
|
|
}
|
|
|
|
for _, c := range children {
|
|
out[c.FullPath] = c.Entity
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Walk through the child graph of an entity, calling walkFunc for each child entity.
|
|
// It is safe for walkFunc to call graph functions.
|
|
func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error {
|
|
children, err := db.Children(name, depth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Note: the database lock must not be held while calling walkFunc
|
|
for _, c := range children {
|
|
if err := walkFunc(c.FullPath, c.Entity); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Children returns the children of the specified entity.
|
|
func (db *Database) Children(name string, depth int) ([]WalkMeta, error) {
|
|
db.mux.RLock()
|
|
defer db.mux.RUnlock()
|
|
|
|
e, err := db.get(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return db.children(e, name, depth, nil)
|
|
}
|
|
|
|
// Parents returns the parents of a specified entity.
|
|
func (db *Database) Parents(name string) ([]string, error) {
|
|
db.mux.RLock()
|
|
defer db.mux.RUnlock()
|
|
|
|
e, err := db.get(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return db.parents(e)
|
|
}
|
|
|
|
// Refs returns the reference count for a specified id.
|
|
func (db *Database) Refs(id string) int {
|
|
db.mux.RLock()
|
|
defer db.mux.RUnlock()
|
|
|
|
var count int
|
|
if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil {
|
|
return 0
|
|
}
|
|
return count
|
|
}
|
|
|
|
// RefPaths returns all the id's path references.
|
|
func (db *Database) RefPaths(id string) Edges {
|
|
db.mux.RLock()
|
|
defer db.mux.RUnlock()
|
|
|
|
refs := Edges{}
|
|
|
|
rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id)
|
|
if err != nil {
|
|
return refs
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var name string
|
|
var parentID string
|
|
if err := rows.Scan(&name, &parentID); err != nil {
|
|
return refs
|
|
}
|
|
refs = append(refs, &Edge{
|
|
EntityID: id,
|
|
Name: name,
|
|
ParentID: parentID,
|
|
})
|
|
}
|
|
return refs
|
|
}
|
|
|
|
// Delete the reference to an entity at a given path.
|
|
func (db *Database) Delete(name string) error {
|
|
db.mux.Lock()
|
|
defer db.mux.Unlock()
|
|
|
|
if name == "/" {
|
|
return fmt.Errorf("Cannot delete root entity")
|
|
}
|
|
|
|
parentPath, n := splitPath(name)
|
|
parent, err := db.get(parentPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := db.conn.Exec("DELETE FROM edge WHERE parent_id = ? AND name = ?;", parent.id, n); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Purge removes the entity with the specified id
|
|
// Walk the graph to make sure all references to the entity
|
|
// are removed and return the number of references removed
|
|
func (db *Database) Purge(id string) (int, error) {
|
|
db.mux.Lock()
|
|
defer db.mux.Unlock()
|
|
|
|
tx, err := db.conn.Begin()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
// Delete all edges
|
|
rows, err := tx.Exec("DELETE FROM edge WHERE entity_id = ?;", id)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return -1, err
|
|
}
|
|
changes, err := rows.RowsAffected()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
// Clear who's using this id as parent
|
|
refs, err := tx.Exec("DELETE FROM edge WHERE parent_id = ?;", id)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return -1, err
|
|
}
|
|
refsCount, err := refs.RowsAffected()
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
// Delete entity
|
|
if _, err := tx.Exec("DELETE FROM entity where id = ?;", id); err != nil {
|
|
tx.Rollback()
|
|
return -1, err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
return int(changes + refsCount), nil
|
|
}
|
|
|
|
// Rename an edge for a given path
|
|
func (db *Database) Rename(currentName, newName string) error {
|
|
db.mux.Lock()
|
|
defer db.mux.Unlock()
|
|
|
|
parentPath, name := splitPath(currentName)
|
|
newParentPath, newEdgeName := splitPath(newName)
|
|
|
|
if parentPath != newParentPath {
|
|
return fmt.Errorf("Cannot rename when root paths do not match %s != %s", parentPath, newParentPath)
|
|
}
|
|
|
|
parent, err := db.get(parentPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rows, err := db.conn.Exec("UPDATE edge SET name = ? WHERE parent_id = ? AND name = ?;", newEdgeName, parent.id, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i, err := rows.RowsAffected()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if i == 0 {
|
|
return fmt.Errorf("Cannot locate edge for %s %s", parent.id, name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WalkMeta stores the walk metadata.
|
|
type WalkMeta struct {
|
|
Parent *Entity
|
|
Entity *Entity
|
|
FullPath string
|
|
Edge *Edge
|
|
}
|
|
|
|
func (db *Database) children(e *Entity, name string, depth int, entities []WalkMeta) ([]WalkMeta, error) {
|
|
if e == nil {
|
|
return entities, nil
|
|
}
|
|
|
|
rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var entityID, entityName string
|
|
if err := rows.Scan(&entityID, &entityName); err != nil {
|
|
return nil, err
|
|
}
|
|
child := &Entity{entityID}
|
|
edge := &Edge{
|
|
ParentID: e.id,
|
|
Name: entityName,
|
|
EntityID: child.id,
|
|
}
|
|
|
|
meta := WalkMeta{
|
|
Parent: e,
|
|
Entity: child,
|
|
FullPath: path.Join(name, edge.Name),
|
|
Edge: edge,
|
|
}
|
|
|
|
entities = append(entities, meta)
|
|
|
|
if depth != 0 {
|
|
nDepth := depth
|
|
if depth != -1 {
|
|
nDepth--
|
|
}
|
|
entities, err = db.children(child, meta.FullPath, nDepth, entities)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
return entities, nil
|
|
}
|
|
|
|
func (db *Database) parents(e *Entity) (parents []string, err error) {
|
|
if e == nil {
|
|
return parents, nil
|
|
}
|
|
|
|
rows, err := db.conn.Query("SELECT parent_id FROM edge where entity_id = ?;", e.id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var parentID string
|
|
if err := rows.Scan(&parentID); err != nil {
|
|
return nil, err
|
|
}
|
|
parents = append(parents, parentID)
|
|
}
|
|
|
|
return parents, nil
|
|
}
|
|
|
|
// Return the entity based on the parent path and name.
|
|
func (db *Database) child(parent *Entity, name string) *Entity {
|
|
var id string
|
|
if err := db.conn.QueryRow("SELECT entity_id FROM edge WHERE parent_id = ? AND name = ?;", parent.id, name).Scan(&id); err != nil {
|
|
return nil
|
|
}
|
|
return &Entity{id}
|
|
}
|
|
|
|
// ID returns the id used to reference this entity.
|
|
func (e *Entity) ID() string {
|
|
return e.id
|
|
}
|
|
|
|
// Paths returns the paths sorted by depth.
|
|
func (e Entities) Paths() []string {
|
|
out := make([]string, len(e))
|
|
var i int
|
|
for k := range e {
|
|
out[i] = k
|
|
i++
|
|
}
|
|
sortByDepth(out)
|
|
|
|
return out
|
|
}
|