一套可直接落地的「KG + Vector + TSPR 推荐系统」代码级实现方案(Python + Neo4j + Milvus/FAISS)」。目标是:本地能跑 → API可用 → 可扩展生产。
一、整体工程结构(直接照着建)
├── app.py # API入口(FastAPI)
├── config.py # 配置
├── kg/
│ ├── neo4j_client.py # KG查询
├── vector/
│ ├── embedder.py # embedding
│ ├── vector_db.py # FAISS/Milvus
├── retrieval/
│ ├── kg_retriever.py
│ ├── vector_retriever.py
│ ├── fusion.py # 融合核心
├── tspr/
│ ├── intent_parser.py # Query解析
│ ├── ranker.py # S3排序
├── llm/
│ ├── generator.py # S5生成
二、依赖安装
三、核心模块实现
3.1 Neo4j连接(KG层)
from neo4j import GraphDatabase
class Neo4jClient:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def query_products(self, user, problem):
query = “””
MATCH (p:Product)-[r1:SUITABLE_FOR]->(u:User)
MATCH (p)-[r2:HAS_FEATURE]->(f:Feature)
MATCH (f)-[:SOLVES]->(pr:Problem)
WHERE u.type = $user AND pr.name = $problem
RETURN p.name AS product,
(r1.confidence * r2.confidence) AS score
ORDER BY score DESC LIMIT 20
“””
with self.driver.session() as session:
result = session.run(query, user=user, problem=problem)
return [dict(r) for r in result]
3.2 向量Embedding
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(‘all-MiniLM-L6-v2’)
def embed(text: str):
return model.encode(text)
3.3 向量数据库(FAISS版)
import faiss
import numpy as np
class VectorDB:
def __init__(self, dim=384):
self.index = faiss.IndexFlatL2(dim)
self.products = []
def add(self, embeddings, products):
self.index.add(np.array(embeddings).astype(“float32”))
self.products.extend(products)
def search(self, query_embedding, top_k=10):
D, I = self.index.search(
np.array([query_embedding]).astype(“float32”),
top_k
)
return [
{“product”: self.products[i], “score”: float(1 – D[0][idx])}
for idx, i in enumerate(I[0])
]
3.4 KG检索器
def retrieve_kg(neo4j_client, user, problem):
return neo4j_client.query_products(user, problem)
3.5 向量检索器
from vector.embedder import embed
def retrieve_vector(vector_db, query):
q_emb = embed(query)
return vector_db.search(q_emb, top_k=10)
3.6 Fusion融合(核心)
def fuse(kg_results, vector_results):
scores = {}
for r in kg_results:
scores[r[“product”]] = scores.get(r[“product”], 0) + 0.5 * r[“score”]
for r in vector_results:
scores[r[“product”]] = scores.get(r[“product”], 0) + 0.4 * r[“score”]
return sorted(
[{“product”: k, “score”: v} for k, v in scores.items()],
key=lambda x: x[“score”],
reverse=True
)
3.7 意图解析(TSPR S1)
def parse_query(query: str):
q = query.lower()
user = “college_student” if “student” in q else “general”
problem = “braces” if “braces” in q else “general”
return {
“user”: user,
“problem”: problem
}
3.8 排序(TSPR S3)
def rank(results):
for r in results:
r[“final_score”] = (
0.6 * r[“score”] + 0.2 * 0.9 + 0.2 * 0.8
)
return sorted(results, key=lambda x: x[“final_score”], reverse=True)
3.9 LLM生成(S5)
def generate_answer(query, products):
top = products[0][“product”] if products else “Unknown”
return f”””
Top recommendation: {top}
Reason:
– Matches your needs
– Suitable for your scenario
“””
四、API入口(可直接跑)
from fastapi import FastAPI
from kg.neo4j_client import Neo4jClient
from vector.vector_db import VectorDB
from retrieval.kg_retriever import retrieve_kg
from retrieval.vector_retriever import retrieve_vector
from retrieval.fusion import fuse
from tspr.intent_parser import parse_query
from tspr.ranker import rank
from llm.generator import generate_answer
app = FastAPI()
neo4j_client = Neo4jClient(“bolt://localhost:7687”, “neo4j”, “password”)
vector_db = VectorDB()
@app.post(“/recommend”)
def recommend(data: dict):
query = data[“query”]
intent = parse_query(query)
kg_results = retrieve_kg(
neo4j_client,
intent[“user”],
intent[“problem”]
)
vector_results = retrieve_vector(vector_db, query)
fused = fuse(kg_results, vector_results)
ranked = rank(fused)
answer = generate_answer(query, ranked)
return {
“results”: ranked[:5],
“answer”: answer
}
五、如何启动
测试:
-H “Content-Type: application/json” \
-d ‘{“query”:”best toothbrush for students with braces”}’
六、下一步升级(关键)
6.1 接入真实Embedding(OpenAI / bge)
替换:
6.2 FAISS → Milvus(生产)
6.3 KG增强
- 自动抽取关系(LLM)
- 用户行为反馈更新confidence
6.4 Fusion升级(核心竞争力)
α * KG +
β * Vector +
γ * CTR +
δ * Profit
)
最关键一句话
这套系统本质 =「Neo4j做大脑 + 向量做感知 + TSPR做决策」
如果你继续往上走(建议)
我可以帮你直接升级到:
👉 企业级版本(含控制AI输出能力)
- Query路径控制系统(你之前的核心)
- HIC人工干预后台
- 推荐结果可控(谁排第一可控)