基于PostgreSQL和NetworkX的人物关系图系统设计

发布于2025-07-08

基于PostgreSQL和NetworkX的人物关系图系统设计

下面我将为你设计一个完整的架构,使用PostgreSQL持久化存储人物关系数据,并在内存中使用NetworkX进行分析处理的方案。

一、数据库表结构设计

1. 人物表 (persons)

sql

CREATE TABLE persons ( person_id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, gender CHAR(1), birth_date DATE, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_persons_name ON persons(name);
SQL

2. 关系类型表 (relationship_types)

sql

CREATE TABLE relationship_types ( type_id SERIAL PRIMARY KEY, type_name VARCHAR(50) NOT NULL, is_directed BOOLEAN NOT NULL DEFAULT TRUE, description TEXT, weight FLOAT DEFAULT 1.0 );
SQL

3. 关系表 (relationships)

sql

CREATE TABLE relationships ( relationship_id SERIAL PRIMARY KEY, source_id INTEGER REFERENCES persons(person_id) ON DELETE CASCADE, target_id INTEGER REFERENCES persons(person_id) ON DELETE CASCADE, type_id INTEGER REFERENCES relationship_types(type_id), start_date DATE, end_date DATE, weight FLOAT DEFAULT 1.0, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT no_self_relationship CHECK (source_id != target_id) ); -- 复合索引提高查询性能 CREATE INDEX idx_relationships_source ON relationships(source_id); CREATE INDEX idx_relationships_target ON relationships(target_id); CREATE INDEX idx_relationships_type ON relationships(type_id);
SQL

4. 图快照表 (graph_snapshots, 可选)

sql

CREATE TABLE graph_snapshots ( snapshot_id SERIAL PRIMARY KEY, snapshot_name VARCHAR(100) NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
SQL

5. 快照节点表 (snapshot_persons, 可选)

sql

CREATE TABLE snapshot_persons ( snapshot_id INTEGER REFERENCES graph_snapshots(snapshot_id) ON DELETE CASCADE, person_id INTEGER REFERENCES persons(person_id) ON DELETE CASCADE, PRIMARY KEY (snapshot_id, person_id) );
SQL

二、系统架构设计

plain

┌───────────────────────────────────────────────────┐ │ Application │ ├───────────────────────────────────────────────────┤ │ ┌─────────────┐ ┌─────────────┐ ┌──────────┐ │ │ │ Data │ │ Graph │ │ API │ │ │ │ Access │ │ Analysis │ │ Layer │ │ │ │ Layer │ │ (NetworkX) │ │ │ │ │ └─────────────┘ └─────────────┘ └──────────┘ │ └───────────────────────────────────────────────────┘ │ │ ▼ ▼ ┌───────────────────────────────────────────────────┐ │ PostgreSQL Database │ │ ┌─────────────┐ ┌─────────────────┐ ┌──────┐ │ │ │ persons │ │ relationships │ │ types│ │ │ └─────────────┘ └─────────────────┘ └──────┘ │ └───────────────────────────────────────────────────┘
Plain text

三、Python实现核心组件

1. 数据库访问层

python

# db_access.py import psycopg2 from psycopg2 import sql from typing import List, Dict, Any import json class GraphDatabase: def __init__(self, dbname, user, password, host="localhost"): self.conn = psycopg2.connect( dbname=dbname, user=user, password=password, host=host) def get_persons(self, limit=1000) -> List[Dict[str, Any]]: with self.conn.cursor() as cur: cur.execute("SELECT * FROM persons LIMIT %s", (limit,)) columns = [desc[0] for desc in cur.description] return [dict(zip(columns, row)) for row in cur.fetchall()] def get_relationships(self, limit=1000) -> List[Dict[str, Any]]: with self.conn.cursor() as cur: cur.execute(""" SELECT r.*, rt.type_name, rt.is_directed FROM relationships r JOIN relationship_types rt ON r.type_id = rt.type_id LIMIT %s """, (limit,)) columns = [desc[0] for desc in cur.description] return [dict(zip(columns, row)) for row in cur.fetchall()] def get_subgraph(self, center_person_id, degrees=2) -> Dict[str, Any]: """获取以某个人为中心的n度关系子图""" with self.conn.cursor() as cur: # 使用递归CTE查询n度关系 cur.execute(""" WITH RECURSIVE graph_search AS ( SELECT source_id, target_id, 1 AS degree FROM relationships WHERE source_id = %s OR target_id = %s UNION SELECT r.source_id, r.target_id, gs.degree + 1 FROM relationships r JOIN graph_search gs ON r.source_id = gs.target_id OR r.target_id = gs.source_id OR r.source_id = gs.source_id OR r.target_id = gs.target_id WHERE gs.degree < %s ) SELECT DISTINCT source_id, target_id FROM graph_search """, (center_person_id, center_person_id, degrees)) edges = cur.fetchall() # 获取相关节点信息 person_ids = {p for edge in edges for p in edge} cur.execute( "SELECT * FROM persons WHERE person_id IN %s", (tuple(person_ids),) columns = [desc[0] for desc in cur.description] nodes = [dict(zip(columns, row)) for row in cur.fetchall()] return {"nodes": nodes, "edges": edges} def close(self): self.conn.close()
Python

2. 图分析服务层

python

# graph_service.py import networkx as nx from typing import Dict, List, Any from db_access import GraphDatabase class GraphAnalysisService: def __init__(self, db_config): self.db = GraphDatabase(**db_config) self.graph = nx.DiGraph() def load_full_graph(self): """加载完整图到内存""" persons = self.db.get_persons() relationships = self.db.get_relationships() # 添加节点 for person in persons: self.graph.add_node( person['person_id'], **{k:v for k,v in person.items() if k != 'person_id'} ) # 添加边 for rel in relationships: self.graph.add_edge( rel['source_id'], rel['target_id'], **{k:v for k,v in rel.items() if k not in ('source_id', 'target_id')} ) def load_subgraph(self, center_person_id, degrees=2): """加载子图到内存""" subgraph_data = self.db.get_subgraph(center_person_id, degrees) self.graph = nx.DiGraph() for node in subgraph_data['nodes']: self.graph.add_node( node['person_id'], **{k:v for k,v in node.items() if k != 'person_id'} ) for source, target in subgraph_data['edges']: if self.graph.has_edge(source, target): continue # 这里可以添加更详细的关系属性 self.graph.add_edge(source, target) def find_shortest_path(self, person1_id, person2_id): """查找两人之间的最短路径""" try: path = nx.shortest_path(self.graph, person1_id, person2_id) return { "path": path, "length": len(path)-1, "nodes": [self.graph.nodes[node_id] for node_id in path] } except nx.NetworkXNoPath: return None def analyze_network(self, person_id): """分析个人网络特征""" if person_id not in self.graph: return None return { "degree_centrality": nx.degree_centrality(self.graph).get(person_id), "betweenness": nx.betweenness_centrality(self.graph).get(person_id), "closeness": nx.closeness_centrality(self.graph).get(person_id), "pagerank": nx.pagerank(self.graph).get(person_id) } def find_common_connections(self, person1_id, person2_id): """查找共同联系人""" neighbors1 = set(self.graph.successors(person1_id)) neighbors2 = set(self.graph.successors(person2_id)) common = neighbors1 & neighbors2 return { "common_connections": list(common), "count": len(common) } def close(self): self.db.close()
Python

3. API服务层 (FastAPI示例)

python

# api_server.py from fastapi import FastAPI from pydantic import BaseModel from graph_service import GraphAnalysisService import os app = FastAPI() # 配置数据库连接 db_config = { "dbname": os.getenv("DB_NAME", "relationship_graph"), "user": os.getenv("DB_USER", "postgres"), "password": os.getenv("DB_PASSWORD", "password"), "host": os.getenv("DB_HOST", "localhost") } graph_service = GraphAnalysisService(db_config) class PersonRequest(BaseModel): person_id: int degrees: int = 2 @app.post("/load_subgraph") async def load_subgraph(request: PersonRequest): graph_service.load_subgraph(request.person_id, request.degrees) return {"message": f"Subgraph loaded for person {request.person_id}"} @app.get("/shortest_path/{person1_id}/{person2_id}") async def get_shortest_path(person1_id: int, person2_id: int): return graph_service.find_shortest_path(person1_id, person2_id) @app.get("/analyze/{person_id}") async def analyze_person(person_id: int): return graph_service.analyze_network(person_id) @app.on_event("shutdown") def shutdown_event(): graph_service.close()
Python

四、部署架构建议

plain

┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ │ │ │ │ Web/App │───▶│ API Server │───▶│ PostgreSQL │ │ Client │ │ (FastAPI) │ │ Database │ │ │ │ │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ ▼ ┌─────────────────┐ │ │ │ NetworkX │ │ In-Memory │ │ Graph │ │ │ └─────────────────┘
Plain text

五、性能优化建议

  1. 数据库层面:
      • 为常用查询创建适当的索引
      • 考虑使用物化视图预计算常用关系
      • 对大表进行分区
  1. 内存管理:
      • 对大型图使用分块加载策略
      • 实现缓存机制,避免重复加载相同子图
      • 考虑使用nx.Graph代替nx.DiGraph(如果关系是无向的)
  1. 查询优化:
      • 为常用分析预计算指标并存储
      • 对深度遍历设置合理的深度限制
      • 考虑使用图数据库扩展如Apache AGE(PostgreSQL图扩展)
  1. 扩展性设计:
      • 对API服务实现负载均衡
      • 考虑使用Redis缓存常用查询结果
      • 对超大规模图考虑分布式计算框架如Dask
这个架构提供了灵活性和性能的良好平衡,允许你利用PostgreSQL的可靠性和NetworkX的强大分析能力。根据你的具体需求,可以进一步调整和扩展这个基础架构。
 
配置system服务为什么Cursor即将放弃向量搜索
Loading...
©2021-2025 Arterning.
All rights reserved.