name: crud-transport-grpc
description: PowerX gRPC 传输层规则(全局 server、拦截器链、DI)。
PowerX CRUD Transport gRPC
步骤
- 打开
本文件内嵌规则。
- 按规则执行实现/校对。
- 完成后按核对清单验收。
核对点
- 与 PowerX 当前代码结构、路径与命名一致。
- 仅在传输层/契约层做职责内改动,不跨层越界。
规则(内嵌)
transport_grpc.yaml
kind: ruleset
name: transport_grpc
version: 1.0.0
owner: powerx
status: stable
meta:
intent: >
规范 gRPC 服务器层的实现:单实例 Server、模块级 Handler;统一拦截器(鉴权/租户/日志/恢复/限流/校验);
错误映射到 gRPC status;尊重 deadline;透传/回写 metadata;按需支持流式事件与健康检查/反射。
references:
- crud_grpc.yaml
- crud_service.yaml
- crud_di.yaml
- proto_gen.yaml
scope:
applies_to:
- "internal/transport/grpc/**/**.go"
- "internal/server/grpc/**/**.go"
principles:
- 传输解耦:gRPC 层仅依赖 *shared.Deps 和 Service,不直连 Repo/DB/外部客户端。
- 可观测:记录 request_id/trace_id/tenant/actor 与方法名、耗时、错误码等关键指标。
- 一致性:错误语义与 REST/Service 对齐;分页与 DTO 吻合。
- 健壮性:统一恢复拦截;尊重客户端 deadline;合理默认超时;支持优雅停机。
- 扩展性:拦截器链条可插拔;流式事件名与 REST SSE 保持一致。
- 单实例:仅允许一个全局 gRPC Server(internal/server/grpc/server.go)。
checks:
# ===== 全局 Server(唯一):负责 New + 拦截器 + 集中注册 =====
- id: grpc.global.server.shape
level: error
when: { file: "internal/server/grpc/server.go" }
assert:
- must_contain_regex: "grpc\\.NewServer\\("
- must_contain_regex: "Register[A-Za-z0-9]+ServiceServer\\(.*\\)"
- must_not_import: ["gorm.io/gorm","database/sql","github.com/gin-gonic/gin"]
# ===== 模块实现文件:允许 *_handler.go 或 service.go,禁止 new/register =====
- id: grpc.module.impl.shape
level: error
when: { glob: "internal/transport/grpc/**/{*_handler.go,service.go}" }
assert:
- must_contain_regex: "type\\s+Server\\s+struct\\s*{\\s*\\*shared\\.Deps\\s*}"
- must_contain_regex: "func\\s+New\\(deps\\s*\\*shared\\.Deps\\)\\s*\\*Server"
- must_not_contain_regex: "grpc\\.NewServer\\("
- must_not_contain_regex: "Register[A-Za-z0-9]+ServiceServer\\("
- must_not_import: ["database/sql","github.com/gin-gonic/gin"]
# ===== 代码生成产物位置校验 =====
- id: grpc.codegen.layout
level: error
when: { file: "api/grpc/contract/buf.gen.yaml" }
assert:
- contains: "api/grpc/gen"
# ========= 依赖注入:必须通过 Deps 注入 Service =========
di_usage:
- id: grpc.depends_on_service_only
level: error
when: { glob: "internal/transport/grpc/**/**.go" }
assert:
- should_contain: "deps"
- must_not_import: ["gorm.io/gorm","database/sql"]
- must_not_call: ["redis.NewClient(","kafka.NewReader(","sql.Open("]
# ========= 拦截器:Unary & Stream 必配链(检查全局 server)=========
interceptors:
- id: grpc.chain.interceptors
level: error
when: { file: "internal/server/grpc/server.go" }
assert:
- must_contain_any:
- "grpc.ChainUnaryInterceptor("
- "grpc_middleware.ChainUnaryServer("
- must_contain_any:
- "grpc.ChainStreamInterceptor("
- "grpc_middleware.ChainStreamServer("
- id: grpc.interceptors.kinds
level: warn
when: { file: "internal/server/grpc/server.go" }
assert:
- should_contain_any:
- "AuthUnaryInterceptor"
- "TenantUnaryInterceptor"
- "LoggingUnaryInterceptor"
- "RecoveryUnaryInterceptor"
- "RateLimitUnaryInterceptor"
- "ValidateUnaryInterceptor"
# ========= Metadata 与 Deadline =========
metadata_deadline:
- id: grpc.metadata.read
level: error
when: { glob: "internal/transport/grpc/**/**.go" }
assert:
- must_contain_any: ["metadata.FromIncomingContext","grpc.SetHeader","grpc.SetTrailer"]
- id: grpc.deadline.respect
level: warn
when: { glob: "internal/transport/grpc/**/**.go" }
assert:
- should_contain_any:
- "ctx, cancel := context.WithTimeout("
- "if _, ok := ctx.Deadline(); ok {"
- "deadline, ok := ctx.Deadline()"
# ========= 鉴权与多租户(通常由拦截器注入)=========
auth_tenant:
- id: grpc.auth.from_metadata
level: warn
when: { glob: "internal/transport/grpc/**/interceptors*.go" }
assert:
- should_contain_any: ["authorization","bearer","jwt","tenant","actor"]
- id: grpc.tenant.required_in_ctx
level: warn
when: { glob: "internal/transport/grpc/**/**.go" }
assert:
- should_contain: "TenantIDFromCtx("
# ========= 错误映射 =========
error_mapping:
- id: grpc.error.to_status
level: error
when: { glob: "internal/transport/grpc/**/**.go" }
assert:
- must_contain_any: ["status.Error(","status.Errorf("]
- must_import: ["google.golang.org/grpc/status","google.golang.org/grpc/codes"]
# ========= 服务实现仅调用 Service =========
handler_calls_service:
- id: grpc.handler.calls.service
level: error
when: { glob: "internal/transport/grpc/**/**.go" }
assert:
- should_contain_regex: "s\\.Deps\\.[A-Za-z0-9]+Service\\."
- must_not_import: ["internal/repository"]
# ========= 流式(存在时)事件与取消 =========
streaming:
- id: grpc.streaming.events
level: warn
when: { contains: "stream " }
assert:
- should_contain_any: ["start","token","data","final","end","error","heartbeat"]
- should_contain_any: ["ctx.Err()","stream.Context()"]
# ========= 可选:健康检查与反射(检查全局 server)=========
health_reflection:
- id: grpc.health.reflection
level: info
when: { file: "internal/server/grpc/server.go" }
assert:
- should_contain_any:
- "health.NewServer("
- "reflection.Register("
acceptance:
checklist:
- "[ ] 仅存在一个全局 gRPC Server:internal/server/grpc/server.go"
- "[ ] 模块实现通过 New(*shared.Deps) 构造,禁止 new/register server"
- "[ ] 已配置 Unary/Stream 拦截器链(鉴权/租户/日志/恢复/限流/校验 可按需)"
- "[ ] 通过 metadata 读写 request_id/trace_id,尊重 client deadline"
- "[ ] 错误映射使用 status/codes,语义与 REST/Service 对齐"
- "[ ] 如有流式:事件名与 REST SSE 对齐、处理取消与心跳"
- "[ ] (可选)健康检查与反射启用"
templates:
server_go: |
// internal/transport/grpc/server.go
package grpcserver
import (
"context"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"github.com/ArtisanCloud/PowerX/internal/app/shared"
gen "{{module_path}}/api/grpc/gen/media"
)
type Server struct {
deps *shared.Deps
gs *grpc.Server
}
func New(deps *shared.Deps) *Server {
unary := grpc.ChainUnaryInterceptor(
AuthUnaryInterceptor(deps),
TenantUnaryInterceptor(deps),
LoggingUnaryInterceptor(deps),
RecoveryUnaryInterceptor(),
// RateLimitUnaryInterceptor(...),
// ValidateUnaryInterceptor(...),
)
stream := grpc.ChainStreamInterceptor(
AuthStreamInterceptor(deps),
TenantStreamInterceptor(deps),
LoggingStreamInterceptor(deps),
RecoveryStreamInterceptor(),
)
gs := grpc.NewServer(unary, stream)
s := &Server{deps: deps, gs: gs}
gen.RegisterMediaAssetServiceServer(gs, s) // 示例注册
return s
}
func (s *Server) Serve(l net.Listener) error { return s.gs.Serve(l) }
func (s *Server) Stop() { s.gs.GracefulStop() }
// ---- 示例:实现方法仅调用 Service,并处理 metadata / deadline ----
func (s *Server) GetMediaAsset(ctx context.Context, in *gen.GetMediaAssetRequest) (*gen.MediaAssetResponse, error) {
// deadline:若上游未设置,则给一个合理默认
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
defer cancel()
}
// 读取 request metadata(如 request-id)
if md, ok := metadata.FromIncomingContext(ctx); ok {
_ = md.Get("x-request-id")
}
tenantID := TenantIDFromCtx(ctx) // 由拦截器注入
out, err := s.deps.MediaAssetService.Get(ctx, tenantID, in.GetId())
if err != nil { return nil, toStatus(err) }
return &gen.MediaAssetResponse{Data: MapToProto(out)}, nil
}
interceptors_go: |
// internal/transport/grpc/interceptors.go
package grpcserver
import (
"context"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"github.com/ArtisanCloud/PowerX/internal/app/shared"
)
type ctxKey string
const (
tenantKey ctxKey = "tenant_id"
actorKey ctxKey = "actor_id"
)
func TenantIDFromCtx(ctx context.Context) uint64 {
v := ctx.Value(tenantKey)
if v == nil { return 0 }
if n, ok := v.(uint64); ok { return n }
return 0
}
func AuthUnaryInterceptor(deps *shared.Deps) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, _ := metadata.FromIncomingContext(ctx)
tok := getBearer(md.Get("authorization"))
if tok == "" {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
// TODO: 验证 JWT/STS → 解析 tenant/actor
ctx = context.WithValue(ctx, tenantKey, /* tenantID */ uint64(1))
ctx = context.WithValue(ctx, actorKey, /* actorID */ uint64(1))
return handler(ctx, req)
}
}
func TenantUnaryInterceptor(deps *shared.Deps) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if TenantIDFromCtx(ctx) == 0 {
return nil, status.Error(codes.InvalidArgument, "tenant missing")
}
return handler(ctx, req)
}
}
func LoggingUnaryInterceptor(deps *shared.Deps) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
// TODO: 记录 method、耗时、tenant、actor、status 等
_ = start
return resp, err
}
}
func RecoveryUnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() { if r := recover(); r != nil { err = status.Error(codes.Internal, "panic recovered") } }()
return handler(ctx, req)
}
}
func getBearer(vals []string) string {
for _, v := range vals {
v = strings.TrimSpace(v)
if strings.HasPrefix(strings.ToLower(v), "bearer ") {
return strings.TrimSpace(v[7:])
}
}
return ""
}
errors_map_go: |
// internal/transport/grpc/errors_map.go
package grpcserver
import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func toStatus(err error) error {
switch {
case IsNotFound(err):
return status.Error(codes.NotFound, "not found")
case IsConflict(err):
return status.Error(codes.AlreadyExists, "conflict")
case IsUnauthorized(err):
return status.Error(codes.Unauthenticated, "unauthenticated")
case IsForbidden(err):
return status.Error(codes.PermissionDenied, "forbidden")
case IsRateLimited(err):
return status.Error(codes.ResourceExhausted, "rate limited")
case IsPrecondition(err):
return status.Error(codes.FailedPrecondition, "precondition failed")
default:
return status.Error(codes.Internal, err.Error())
}
}
mapping_helper_go: |
// internal/transport/grpc/mapping_helper.go
package grpcserver
import (
gen "{{module_path}}/api/grpc/gen/media"
m "{{module_path}}/pkg/corex/db/persistence/model/media"
)
func MapToProto(in *m.MediaAsset) *gen.MediaAsset {
if in == nil { return nil }
return &gen.MediaAsset{
Id: in.ID,
TenantId: in.TenantID,
Name: in.Name,
Code: in.Code,
MetaJson: string(in.Meta),
Status: int32(in.Status),
// TODO: 时间字段转换
}
}
func MapListToProto(items []*m.MediaAsset) []*gen.MediaAsset {
out := make([]*gen.MediaAsset, 0, len(items))
for _, it := range items { out = append(out, MapToProto(it)) }
return out
}