一套可直接落地的「KG + Vector + TSPR 推荐系统」代码级实现方案(Python + Neo4j + Milvus/FAISS)」。目标是:本地能跑 → API可用 → 可扩展生产


一、整体工程结构(直接照着建)

ai-recommend-system/
├── app.py # API入口(FastAPI)
├── config.py # 配置
├── kg/
│ ├── neo4j_client.py # KG查询
├── vector/
│ ├── embedder.py # embedding
│ ├── vector_db.py # FAISS/Milvus
├── retrieval/
│ ├── kg_retriever.py
│ ├── vector_retriever.py
│ ├── fusion.py # 融合核心
├── tspr/
│ ├── intent_parser.py # Query解析
│ ├── ranker.py # S3排序
├── llm/
│ ├── generator.py # S5生成

二、依赖安装

pip install fastapi uvicorn neo4j sentence-transformers faiss-cpu

三、核心模块实现


3.1 Neo4j连接(KG层)

# kg/neo4j_client.py

from neo4j import GraphDatabase

class Neo4jClient:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))

def query_products(self, user, problem):
query = “””
MATCH (p:Product)-[r1:SUITABLE_FOR]->(u:User)
MATCH (p)-[r2:HAS_FEATURE]->(f:Feature)
MATCH (f)-[:SOLVES]->(pr:Problem)
WHERE u.type = $user AND pr.name = $problem
RETURN p.name AS product,
(r1.confidence * r2.confidence) AS score
ORDER BY score DESC LIMIT 20
“””

with self.driver.session() as session:
result = session.run(query, user=user, problem=problem)
return [dict(r) for r in result]


3.2 向量Embedding

# vector/embedder.py

from sentence_transformers import SentenceTransformer

model = SentenceTransformer(‘all-MiniLM-L6-v2’)

def embed(text: str):
return model.encode(text)


3.3 向量数据库(FAISS版)

# vector/vector_db.py

import faiss
import numpy as np

class VectorDB:
def __init__(self, dim=384):
self.index = faiss.IndexFlatL2(dim)
self.products = []

def add(self, embeddings, products):
self.index.add(np.array(embeddings).astype(“float32”))
self.products.extend(products)

def search(self, query_embedding, top_k=10):
D, I = self.index.search(
np.array([query_embedding]).astype(“float32”),
top_k
)
return [
{“product”: self.products[i], “score”: float(1 D[0][idx])}
for idx, i in enumerate(I[0])
]


3.4 KG检索器

# retrieval/kg_retriever.py

def retrieve_kg(neo4j_client, user, problem):
return neo4j_client.query_products(user, problem)


3.5 向量检索器

# retrieval/vector_retriever.py

from vector.embedder import embed

def retrieve_vector(vector_db, query):
q_emb = embed(query)
return vector_db.search(q_emb, top_k=10)


3.6 Fusion融合(核心)

# retrieval/fusion.py

def fuse(kg_results, vector_results):
scores = {}

for r in kg_results:
scores[r[“product”]] = scores.get(r[“product”], 0) + 0.5 * r[“score”]

for r in vector_results:
scores[r[“product”]] = scores.get(r[“product”], 0) + 0.4 * r[“score”]

return sorted(
[{“product”: k, “score”: v} for k, v in scores.items()],
key=lambda x: x[“score”],
reverse=True
)


3.7 意图解析(TSPR S1)

# tspr/intent_parser.py

def parse_query(query: str):
q = query.lower()

user = “college_student” if “student” in q else “general”
problem = “braces” if “braces” in q else “general”

return {
“user”: user,
“problem”: problem
}


3.8 排序(TSPR S3)

# tspr/ranker.py

def rank(results):
for r in results:
r[“final_score”] = (
0.6 * r[“score”] + 0.2 * 0.9 + 0.2 * 0.8
)
return sorted(results, key=lambda x: x[“final_score”], reverse=True)


3.9 LLM生成(S5)

# llm/generator.py

def generate_answer(query, products):
top = products[0][“product”] if products else “Unknown”

return f”””
Top recommendation: {top}

Reason:
– Matches your needs
– Suitable for your scenario
“””


四、API入口(可直接跑)

# app.py

from fastapi import FastAPI
from kg.neo4j_client import Neo4jClient
from vector.vector_db import VectorDB
from retrieval.kg_retriever import retrieve_kg
from retrieval.vector_retriever import retrieve_vector
from retrieval.fusion import fuse
from tspr.intent_parser import parse_query
from tspr.ranker import rank
from llm.generator import generate_answer

app = FastAPI()

neo4j_client = Neo4jClient(“bolt://localhost:7687”, “neo4j”, “password”)
vector_db = VectorDB()

@app.post(“/recommend”)
def recommend(data: dict):
query = data[“query”]

intent = parse_query(query)

kg_results = retrieve_kg(
neo4j_client,
intent[“user”],
intent[“problem”]
)

vector_results = retrieve_vector(vector_db, query)

fused = fuse(kg_results, vector_results)

ranked = rank(fused)

answer = generate_answer(query, ranked)

return {
“results”: ranked[:5],
“answer”: answer
}


五、如何启动

uvicorn app:app –reload

测试:

curl -X POST http://127.0.0.1:8000/recommend \
-H “Content-Type: application/json” \
-d ‘{“query”:”best toothbrush for students with braces”}’

六、下一步升级(关键)


6.1 接入真实Embedding(OpenAI / bge)

替换:

SentenceTransformer OpenAI Embedding

6.2 FAISS → Milvus(生产)


6.3 KG增强

  • 自动抽取关系(LLM)
  • 用户行为反馈更新confidence

6.4 Fusion升级(核心竞争力)

final_score = (
α * KG +
β * Vector +
γ * CTR +
δ * Profit
)

最关键一句话

这套系统本质 =「Neo4j做大脑 + 向量做感知 + TSPR做决策」


如果你继续往上走(建议)

我可以帮你直接升级到:

👉 企业级版本(含控制AI输出能力)

  • Query路径控制系统(你之前的核心)
  • HIC人工干预后台
  • 推荐结果可控(谁排第一可控)

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注