Teamlinker/code/server/common/rpc/rpc.ts
sx1989827 13013e59c8 fix
2021-08-29 21:46:53 +08:00

178 lines
6.1 KiB
TypeScript

import { IServer_Common_Http_Proxy } from './../types/http';
import { IServer_Common_Nacos_Instance } from './../types/nacos';
import { ECommon_Services } from '../../../common/types';
import { Redis } from '../cache/redis';
import * as sio from "socket.io"
import * as ioClient from "socket.io-client"
import { REDIS_GATEWAY } from '../cache/keys/gateway';
import * as http from 'http';
import { getHttpRoutes, handleHttpCall } from '../http/http';
interface ISocketStruct {
name:string,
args:any[]
}
interface ISocketRPCResponseStruct {
code:number,
msg?:string,
data?:any
}
var g_RevieveFunc={
}
var g_RevieveInstance={
}
var g_Socket=<{
[name:string]:ioClient.Socket
}>{
}
export class Rpc{
private static io:InstanceType<typeof sio.Server>
static start(port:number | http.Server){
Rpc.io=new sio.Server(port,{
transports: ['websocket', 'polling']
})
Rpc.io.on("connection",(socket)=>{
socket.on("rpc-call",async (arg:ISocketStruct,callback)=>{
let funcName=arg.name
if(g_RevieveFunc[funcName] && g_RevieveInstance[funcName])
{
let objRes=<ISocketRPCResponseStruct>{}
try {
let ret=await g_RevieveFunc[funcName].apply(g_RevieveInstance[funcName],arg.args)
objRes.code=0
objRes.data=ret
callback(objRes)
} catch (err) {
objRes.code=err.code
objRes.msg=err.msg;
callback(objRes)
}
}
})
socket.on("http-call",async (arg:IServer_Common_Http_Proxy,callback)=>{
let key=arg.method+ " " +arg.path
let objRoute=getHttpRoutes()
for(let keyRoute in objRoute) {
if(key==keyRoute){
let ret=await handleHttpCall(objRoute[keyRoute],arg)
callback(ret)
return
}
}
let obj:IServer_Common_Http_Proxy=<IServer_Common_Http_Proxy>{}
obj.status=404
callback(obj)
})
})
}
static async validInstance(name:string):Promise<IServer_Common_Nacos_Instance>
{
let objRedis=REDIS_GATEWAY.instance(name)
let ret=await objRedis.get()
return ret?ret[0]:null
}
}
export async function proxyRequest(objProxy:IServer_Common_Http_Proxy,targetServer:ECommon_Services):Promise<IServer_Common_Http_Proxy> {
return new Promise(async function(resolve,reject){
if(!g_Socket[targetServer] || !g_Socket[targetServer].connected)
{
if(g_Socket[targetServer] && !g_Socket[targetServer].connected) {
g_Socket[targetServer].close()
}
let instance=await Rpc.validInstance(targetServer)
if(!instance)
{
resolve(null)
return
}
let socket=ioClient.io(`ws://${instance.ip}:${instance.port}`)
g_Socket[targetServer]=socket
socket.on("connect",()=>{
socket.emit("http-call",objProxy,(res:IServer_Common_Http_Proxy)=>{
resolve(res)
})
})
}
else
{
g_Socket[targetServer].emit("http-call",objProxy,(res:IServer_Common_Http_Proxy)=>{
resolve(res)
})
}
})
}
export function DRPCSend(rpcName:ECommon_Services){
return function (target: Object, propertyName: string, descriptor: TypedPropertyDescriptor<Function>) {
descriptor.value = function () {
let args=Array.from(arguments)
return new Promise(async function(resolve,reject){
if(!g_Socket[rpcName] || !g_Socket[rpcName].connected)
{
if(g_Socket[rpcName] && !g_Socket[rpcName].connected) {
g_Socket[rpcName].close()
}
let instance=await Rpc.validInstance(rpcName)
if(!instance)
{
resolve(null)
return
}
let socket=ioClient.io(`ws://${instance.ip}:${instance.port}`)
g_Socket[rpcName]=socket
socket.on("connect",()=>{
socket.emit("rpc-call",<ISocketStruct>{
name:propertyName,
args:args
},(res:ISocketRPCResponseStruct)=>{
if(res.code==0)
{
resolve(res.data)
} else {
reject({
code:res.code,
msg:res.msg
})
}
})
})
}
else
{
g_Socket[rpcName].emit("rpc-call",<ISocketStruct>{
name:propertyName,
args:args
},(res:ISocketRPCResponseStruct)=>{
if(res.code==0)
{
resolve(res.data)
} else {
reject({
code:res.code,
msg:res.msg
})
}
})
}
})
}
}
}
export function DRPCRecieve(target: Object, propertyName: string, descriptor: TypedPropertyDescriptor<Function>) {
if(!g_RevieveFunc[propertyName])
{
g_RevieveFunc[propertyName]=descriptor.value;
}
if(!g_RevieveInstance[propertyName])
{
g_RevieveInstance[propertyName]=target
}
}