123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- package graphdb
- import (
- "database/sql"
- "fmt"
- "path"
- "strings"
- "sync"
- )
- const (
- createEntityTable = `
- CREATE TABLE IF NOT EXISTS entity (
- id text NOT NULL PRIMARY KEY
- );`
- createEdgeTable = `
- CREATE TABLE IF NOT EXISTS edge (
- "entity_id" text NOT NULL,
- "parent_id" text NULL,
- "name" text NOT NULL,
- CONSTRAINT "parent_fk" FOREIGN KEY ("parent_id") REFERENCES "entity" ("id"),
- CONSTRAINT "entity_fk" FOREIGN KEY ("entity_id") REFERENCES "entity" ("id")
- );
- `
- createEdgeIndices = `
- CREATE UNIQUE INDEX IF NOT EXISTS "name_parent_ix" ON "edge" (parent_id, name);
- `
- )
- // Entity with a unique id.
- type Entity struct {
- id string
- }
- // An Edge connects two entities together.
- type Edge struct {
- EntityID string
- Name string
- ParentID string
- }
- // Entities stores the list of entities.
- type Entities map[string]*Entity
- // Edges stores the relationships between entities.
- type Edges []*Edge
- // WalkFunc is a function invoked to process an individual entity.
- type WalkFunc func(fullPath string, entity *Entity) error
- // Database is a graph database for storing entities and their relationships.
- type Database struct {
- conn *sql.DB
- mux sync.RWMutex
- }
- // IsNonUniqueNameError processes the error to check if it's caused by
- // a constraint violation.
- // This is necessary because the error isn't the same across various
- // sqlite versions.
- func IsNonUniqueNameError(err error) bool {
- str := err.Error()
- // sqlite 3.7.17-1ubuntu1 returns:
- // Set failure: Abort due to constraint violation: columns parent_id, name are not unique
- if strings.HasSuffix(str, "name are not unique") {
- return true
- }
- // sqlite-3.8.3-1.fc20 returns:
- // Set failure: Abort due to constraint violation: UNIQUE constraint failed: edge.parent_id, edge.name
- if strings.Contains(str, "UNIQUE constraint failed") && strings.Contains(str, "edge.name") {
- return true
- }
- // sqlite-3.6.20-1.el6 returns:
- // Set failure: Abort due to constraint violation: constraint failed
- if strings.HasSuffix(str, "constraint failed") {
- return true
- }
- return false
- }
- // NewDatabase creates a new graph database initialized with a root entity.
- func NewDatabase(conn *sql.DB) (*Database, error) {
- if conn == nil {
- return nil, fmt.Errorf("Database connection cannot be nil")
- }
- db := &Database{conn: conn}
- // Create root entities
- tx, err := conn.Begin()
- if err != nil {
- return nil, err
- }
- if _, err := tx.Exec(createEntityTable); err != nil {
- return nil, err
- }
- if _, err := tx.Exec(createEdgeTable); err != nil {
- return nil, err
- }
- if _, err := tx.Exec(createEdgeIndices); err != nil {
- return nil, err
- }
- if _, err := tx.Exec("DELETE FROM entity where id = ?", "0"); err != nil {
- tx.Rollback()
- return nil, err
- }
- if _, err := tx.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil {
- tx.Rollback()
- return nil, err
- }
- if _, err := tx.Exec("DELETE FROM edge where entity_id=? and name=?", "0", "/"); err != nil {
- tx.Rollback()
- return nil, err
- }
- if _, err := tx.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil {
- tx.Rollback()
- return nil, err
- }
- if err := tx.Commit(); err != nil {
- return nil, err
- }
- return db, nil
- }
- // Close the underlying connection to the database.
- func (db *Database) Close() error {
- return db.conn.Close()
- }
- // Set the entity id for a given path.
- func (db *Database) Set(fullPath, id string) (*Entity, error) {
- db.mux.Lock()
- defer db.mux.Unlock()
- tx, err := db.conn.Begin()
- if err != nil {
- return nil, err
- }
- var entityID string
- if err := tx.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityID); err != nil {
- if err == sql.ErrNoRows {
- if _, err := tx.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil {
- tx.Rollback()
- return nil, err
- }
- } else {
- tx.Rollback()
- return nil, err
- }
- }
- e := &Entity{id}
- parentPath, name := splitPath(fullPath)
- if err := db.setEdge(parentPath, name, e, tx); err != nil {
- tx.Rollback()
- return nil, err
- }
- if err := tx.Commit(); err != nil {
- return nil, err
- }
- return e, nil
- }
- // Exists returns true if a name already exists in the database.
- func (db *Database) Exists(name string) bool {
- db.mux.RLock()
- defer db.mux.RUnlock()
- e, err := db.get(name)
- if err != nil {
- return false
- }
- return e != nil
- }
- func (db *Database) setEdge(parentPath, name string, e *Entity, tx *sql.Tx) error {
- parent, err := db.get(parentPath)
- if err != nil {
- return err
- }
- if parent.id == e.id {
- return fmt.Errorf("Cannot set self as child")
- }
- if _, err := tx.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil {
- return err
- }
- return nil
- }
- // RootEntity returns the root "/" entity for the database.
- func (db *Database) RootEntity() *Entity {
- return &Entity{
- id: "0",
- }
- }
- // Get returns the entity for a given path.
- func (db *Database) Get(name string) *Entity {
- db.mux.RLock()
- defer db.mux.RUnlock()
- e, err := db.get(name)
- if err != nil {
- return nil
- }
- return e
- }
- func (db *Database) get(name string) (*Entity, error) {
- e := db.RootEntity()
- // We always know the root name so return it if
- // it is requested
- if name == "/" {
- return e, nil
- }
- parts := split(name)
- for i := 1; i < len(parts); i++ {
- p := parts[i]
- if p == "" {
- continue
- }
- next := db.child(e, p)
- if next == nil {
- return nil, fmt.Errorf("Cannot find child for %s", name)
- }
- e = next
- }
- return e, nil
- }
- // List all entities by from the name.
- // The key will be the full path of the entity.
- func (db *Database) List(name string, depth int) Entities {
- db.mux.RLock()
- defer db.mux.RUnlock()
- out := Entities{}
- e, err := db.get(name)
- if err != nil {
- return out
- }
- children, err := db.children(e, name, depth, nil)
- if err != nil {
- return out
- }
- for _, c := range children {
- out[c.FullPath] = c.Entity
- }
- return out
- }
- // Walk through the child graph of an entity, calling walkFunc for each child entity.
- // It is safe for walkFunc to call graph functions.
- func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error {
- children, err := db.Children(name, depth)
- if err != nil {
- return err
- }
- // Note: the database lock must not be held while calling walkFunc
- for _, c := range children {
- if err := walkFunc(c.FullPath, c.Entity); err != nil {
- return err
- }
- }
- return nil
- }
- // Children returns the children of the specified entity.
- func (db *Database) Children(name string, depth int) ([]WalkMeta, error) {
- db.mux.RLock()
- defer db.mux.RUnlock()
- e, err := db.get(name)
- if err != nil {
- return nil, err
- }
- return db.children(e, name, depth, nil)
- }
- // Parents returns the parents of a specified entity.
- func (db *Database) Parents(name string) ([]string, error) {
- db.mux.RLock()
- defer db.mux.RUnlock()
- e, err := db.get(name)
- if err != nil {
- return nil, err
- }
- return db.parents(e)
- }
- // Refs returns the reference count for a specified id.
- func (db *Database) Refs(id string) int {
- db.mux.RLock()
- defer db.mux.RUnlock()
- var count int
- if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil {
- return 0
- }
- return count
- }
- // RefPaths returns all the id's path references.
- func (db *Database) RefPaths(id string) Edges {
- db.mux.RLock()
- defer db.mux.RUnlock()
- refs := Edges{}
- rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id)
- if err != nil {
- return refs
- }
- defer rows.Close()
- for rows.Next() {
- var name string
- var parentID string
- if err := rows.Scan(&name, &parentID); err != nil {
- return refs
- }
- refs = append(refs, &Edge{
- EntityID: id,
- Name: name,
- ParentID: parentID,
- })
- }
- return refs
- }
- // Delete the reference to an entity at a given path.
- func (db *Database) Delete(name string) error {
- db.mux.Lock()
- defer db.mux.Unlock()
- if name == "/" {
- return fmt.Errorf("Cannot delete root entity")
- }
- parentPath, n := splitPath(name)
- parent, err := db.get(parentPath)
- if err != nil {
- return err
- }
- if _, err := db.conn.Exec("DELETE FROM edge WHERE parent_id = ? AND name = ?;", parent.id, n); err != nil {
- return err
- }
- return nil
- }
- // Purge removes the entity with the specified id
- // Walk the graph to make sure all references to the entity
- // are removed and return the number of references removed
- func (db *Database) Purge(id string) (int, error) {
- db.mux.Lock()
- defer db.mux.Unlock()
- tx, err := db.conn.Begin()
- if err != nil {
- return -1, err
- }
- // Delete all edges
- rows, err := tx.Exec("DELETE FROM edge WHERE entity_id = ?;", id)
- if err != nil {
- tx.Rollback()
- return -1, err
- }
- changes, err := rows.RowsAffected()
- if err != nil {
- return -1, err
- }
- // Clear who's using this id as parent
- refs, err := tx.Exec("DELETE FROM edge WHERE parent_id = ?;", id)
- if err != nil {
- tx.Rollback()
- return -1, err
- }
- refsCount, err := refs.RowsAffected()
- if err != nil {
- return -1, err
- }
- // Delete entity
- if _, err := tx.Exec("DELETE FROM entity where id = ?;", id); err != nil {
- tx.Rollback()
- return -1, err
- }
- if err := tx.Commit(); err != nil {
- return -1, err
- }
- return int(changes + refsCount), nil
- }
- // Rename an edge for a given path
- func (db *Database) Rename(currentName, newName string) error {
- db.mux.Lock()
- defer db.mux.Unlock()
- parentPath, name := splitPath(currentName)
- newParentPath, newEdgeName := splitPath(newName)
- if parentPath != newParentPath {
- return fmt.Errorf("Cannot rename when root paths do not match %s != %s", parentPath, newParentPath)
- }
- parent, err := db.get(parentPath)
- if err != nil {
- return err
- }
- rows, err := db.conn.Exec("UPDATE edge SET name = ? WHERE parent_id = ? AND name = ?;", newEdgeName, parent.id, name)
- if err != nil {
- return err
- }
- i, err := rows.RowsAffected()
- if err != nil {
- return err
- }
- if i == 0 {
- return fmt.Errorf("Cannot locate edge for %s %s", parent.id, name)
- }
- return nil
- }
- // WalkMeta stores the walk metadata.
- type WalkMeta struct {
- Parent *Entity
- Entity *Entity
- FullPath string
- Edge *Edge
- }
- func (db *Database) children(e *Entity, name string, depth int, entities []WalkMeta) ([]WalkMeta, error) {
- if e == nil {
- return entities, nil
- }
- rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- for rows.Next() {
- var entityID, entityName string
- if err := rows.Scan(&entityID, &entityName); err != nil {
- return nil, err
- }
- child := &Entity{entityID}
- edge := &Edge{
- ParentID: e.id,
- Name: entityName,
- EntityID: child.id,
- }
- meta := WalkMeta{
- Parent: e,
- Entity: child,
- FullPath: path.Join(name, edge.Name),
- Edge: edge,
- }
- entities = append(entities, meta)
- if depth != 0 {
- nDepth := depth
- if depth != -1 {
- nDepth--
- }
- entities, err = db.children(child, meta.FullPath, nDepth, entities)
- if err != nil {
- return nil, err
- }
- }
- }
- return entities, nil
- }
- func (db *Database) parents(e *Entity) (parents []string, err error) {
- if e == nil {
- return parents, nil
- }
- rows, err := db.conn.Query("SELECT parent_id FROM edge where entity_id = ?;", e.id)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- for rows.Next() {
- var parentID string
- if err := rows.Scan(&parentID); err != nil {
- return nil, err
- }
- parents = append(parents, parentID)
- }
- return parents, nil
- }
- // Return the entity based on the parent path and name.
- func (db *Database) child(parent *Entity, name string) *Entity {
- var id string
- if err := db.conn.QueryRow("SELECT entity_id FROM edge WHERE parent_id = ? AND name = ?;", parent.id, name).Scan(&id); err != nil {
- return nil
- }
- return &Entity{id}
- }
- // ID returns the id used to reference this entity.
- func (e *Entity) ID() string {
- return e.id
- }
- // Paths returns the paths sorted by depth.
- func (e Entities) Paths() []string {
- out := make([]string, len(e))
- var i int
- for k := range e {
- out[i] = k
- i++
- }
- sortByDepth(out)
- return out
- }
|