【最后203篇系列】004 -Smarklink

news/2025/2/3 23:29:19 标签: python

说明

这个用来替代nginx。

最初是希望用nginx进行故障检测和负载均衡,花了很多时间,大致的结论是:nginx可以实现,但是是在商业版里。非得要找替代肯定可以搞出来,但是太麻烦了(即使是nginx本身的配置也很烦)。更重要的是,我突然意识到其实我对这方面的需求大幅提升了,nginx无法实现我最核心的功能。

于是我打算自己做一个网关。以下是两者的差别:
在这里插入图片描述

我并不是要做一个新的产品来替代nginx,只是需要对我的应用服务进行非常精细的控制,然后我问了下大模型,是否可行:

使用 Tornado 构建自定义应用网关 是一个非常有挑战性但也非常灵活的选择。通过 Tornado 的异步特性和强大的编程能力,你可以实现非常精细的控制,满足特定的业务需求。以下是实现这一目标的思路和建议:


1. Tornado 作为应用网关的核心功能

你可以利用 Tornado 实现以下网关功能:

反向代理
  • 使用 Tornado 的 AsyncHTTPClienthttpclient 模块,将客户端请求转发到后端服务。
  • 示例:
    python">from tornado.httpclient import AsyncHTTPClient
    
    class ProxyHandler(tornado.web.RequestHandler):
        async def get(self):
            http_client = AsyncHTTPClient()
            response = await http_client.fetch("http://backend-service" + self.request.uri)
            self.write(response.body)
    
负载均衡
  • 实现自定义的负载均衡算法(如轮询、加权轮询、最小连接数等)。
  • 示例:
    python">backend_servers = ["http://backend1", "http://backend2", "http://backend3"]
    current_server = 0
    
    class LoadBalancerHandler(tornado.web.RequestHandler):
        async def get(self):
            global current_server
            backend_url = backend_servers[current_server]
            current_server = (current_server + 1) % len(backend_servers)
            http_client = AsyncHTTPClient()
            response = await http_client.fetch(backend_url + self.request.uri)
            self.write(response.body)
    
动态路由
  • 根据请求的路径、头部或其他条件,将请求路由到不同的后端服务。
  • 示例:
    python">class RouterHandler(tornado.web.RequestHandler):
        async def get(self):
            if self.request.path.startswith("/api/v1"):
                backend_url = "http://api-v1-service"
            elif self.request.path.startswith("/api/v2"):
                backend_url = "http://api-v2-service"
            else:
                backend_url = "http://default-service"
            http_client = AsyncHTTPClient()
            response = await http_client.fetch(backend_url + self.request.uri)
            self.write(response.body)
    
限流和限速
  • 使用 Tornado 的异步特性实现请求速率限制。
  • 示例:
    python">from tornado.ioloop import IOLoop
    from tornado.web import RequestHandler
    from tornado.queues import Queue
    
    request_queue = Queue(maxsize=100)  # 限制并发请求数
    
    class RateLimitHandler(RequestHandler):
        async def get(self):
            await request_queue.put(None)
            try:
                # 处理请求
                self.write("Hello, world!")
            finally:
                request_queue.get_nowait()
    
身份验证和授权
  • 在网关层实现统一的身份验证(如 JWT 验证、OAuth 等)。
  • 示例:
    python">import jwt
    
    class AuthHandler(tornado.web.RequestHandler):
        def prepare(self):
            token = self.request.headers.get("Authorization")
            if not token:
                self.set_status(401)
                self.finish("Unauthorized")
            try:
                payload = jwt.decode(token, "secret_key", algorithms=["HS256"])
                self.current_user = payload["user"]
            except jwt.InvalidTokenError:
                self.set_status(401)
                self.finish("Invalid token")
    
日志和监控
  • 在网关层记录请求日志,并集成监控工具(如 Prometheus)。
  • 示例:
    python">class LoggingHandler(tornado.web.RequestHandler):
        def on_finish(self):
            print(f"Request: {self.request.method} {self.request.uri} - Status: {self.get_status()}")
    

2. Tornado 网关的优势

  • 高度定制化:你可以完全控制网关的行为,实现特定的业务逻辑。
  • 灵活性:Tornado 的异步特性使其能够高效处理高并发请求。
  • 与 Python 生态无缝集成:可以轻松集成其他 Python 库(如数据库驱动、消息队列等)。

3. Tornado 网关的挑战

  • 开发成本高:需要从头实现许多功能(如负载均衡、健康检查、限流等)。
  • 性能瓶颈:Tornado 是 Python 编写的,性能可能不如 Nginx 这样的 C 语言实现。
  • 维护复杂性:自定义网关需要更多的测试和维护工作。

4. 建议的架构

如果你决定使用 Tornado 构建网关,建议采用以下架构:

  1. Nginx 作为前置网关

    • 处理 SSL/TLS 终止、静态文件缓存、DDoS 防护等。
    • 将动态请求转发到 Tornado 网关。
  2. Tornado 作为应用网关

    • 实现精细的流量控制、动态路由、身份验证等功能。
    • 将请求转发到后端的微服务或应用服务器。
  3. 监控和日志

    • 集成 Prometheus、Grafana 等工具,实时监控网关性能。
    • 使用 ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志分析。

5. 示例代码

以下是一个简单的 Tornado 网关示例:

python">import tornado.ioloop
import tornado.web
from tornado.httpclient import AsyncHTTPClient

class ProxyHandler(tornado.web.RequestHandler):
    async def get(self):
        backend_url = "http://backend-service" + self.request.uri
        http_client = AsyncHTTPClient()
        response = await http_client.fetch(backend_url)
        self.write(response.body)

def make_app():
    return tornado.web.Application([
        (r"/.*", ProxyHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

总结

使用 Tornado 构建自定义应用网关是可行的,尤其适合需要高度定制化的场景。你可以实现反向代理、负载均衡、动态路由、限流、身份验证等功能,但需要注意开发成本和性能瓶颈。如果可能,建议将 Nginx 作为前置网关,Tornado 作为应用网关,结合两者的优势,构建高性能、高可用的系统。

看看性能是不是ok:
在这里插入图片描述

在这里插入图片描述

所以,总体上tornado在性能上可能会比nginx低一个量级:

  • 1 简单的基础服务可以继续走nginx,或者直连,这部分假设无问题。
  • 2 复杂的、核心的服务走smartlink。这部分的处理本身就非常“昂贵”,比如用大模型处理数据,天然的TPS就比较低,smartlink额外的开销比起来不算啥,是可行的(或者说现在的计算资源太多了,费点没啥)。即使是较低配的生产机也应该有8个核,花1个核做网关在我看来是值得的。

smartlink 可以带来什么:

  • 1 权限控制。过去我一般不去设置权限,这样开发和使用比较方便,但这一步肯定是要做的。通过smartlink,仅开放一个对外端口,访问其他服务时必须提供对应口令,在smartlink完成鉴权。
  • 2 流量及质量。现在有一类服务要求比较高,按照固定周期统计可能会有些滞后。通过smartlink(结合redis), 可以实时的对流量进行控制,获取请求的响应头,将成功(2xx)、失败(5xx)等不同类型的错误进行实时统计和通知。另外可以统计实时的影响速度,由别的算法应用读取,来进行分发优化。
  • 3 优化与强化。这个问题是最为核心且重要的,当我们花费成千上万的人民币在进行数据的处理时,我们知道做的不够好。通常来说,算法很难在第一版达到完美。然而当服务进入生产状态时,我们又很难去停下来再次改进;即使我们离线改好了新版算法,又担心上线时出bug。这样的最终结果是:不出问题不改进。smartlink在增加cpu开销时,也给了足够强大的控制逻辑,可以解决这些问题。 这里不讨论细节,只看机制上的可行性。当请求到达时,smarklink会按照实时加载的策略(通过redis)决定请求是否可以多发。比如除了将请求实时转向对应的稳定版服务,也发到kafka。收到请求的处理结果后,除了返回用户,也发一份到kafka。这个过程会增加2次写kafka的操作,对于秒级级的请求来说是可以接受的(假设请求30秒超时)。数据到了kafka之后,可以有多个消费者准实时的执行新版本服务的请求,然后和当前服务的结果一并进行效果评估与比对。首先会排除的问题是bug引起的请求失败,然后新版本服务可以准实时的进行评估、打标(RLHF)。评估结束后,可以采用手动/自动的方法,修改redis中的数据,从而使请求平滑切换到新的服务。这个服务是经过了效果评估和上线测试的,不会有低级错误。

内容

本次的目标先实现几个基本功能

功能:

  • 1 转发
  • 2 health-check
  • 3 负载均衡

1 测试样例

  • 1 转发

在一台主机上,启动一个微服务(实体识别),通过smartlink转发服务到此位置

  • 2 health-check

当请求报连接错误时,smartlink会将对应服务记为不可用;当服务恢复时,会将其标记为可用。

  • 3 负载均衡(least time)

再启动一个服务,加入并在服务中进行时间扰动。要求smartlink在将服务转发时会依据其响应时间动态分配。

2 实验

我发现今天可能写不完全部了,内容有点多,我先把完成的有部分整理上。

2.1 基础配置部分 base_config.py

这里先手动配置了一个测试服务。假设每个服务的可用资源是一个字典, 并且认为一定有一个默认的服务版本(base)

python">...
base_config.server_dict = {}
base_config.server_dict['test_server'] = {}
base_config.server_dict['test_server']['base'] = "http://IP:PORT/get_lng_lat/" 

2.2 数据模型 data_model.py

约定(建议性质)了模型的输入和输出。本来是想强制的,但那样难度太高了,而且作为一个网关,目前的主要任务还是能够进行有效转发。未来可以尽量往这个标准靠。

python"># 服务输入
class StandardInput(BaseModel):
    server_name : str  # 要请求的服务名
    params : dict = {} #  请求参数
    data : list = []# 请求数据 - 一般以lod的方式输入


# 服务输出
class StandardOutput(BaseModel):
    server_name: str = 'default'
    status : bool  = True
    msg : str  = 'ok'
    data : list = [] # 返回数据 - 以lod的方式返回
    duration : int = 0 # ms 

2.3 服务 server.py

2.3.1 基本部分导入部分

python">import tornado.httpserver  # http服务器
import tornado.ioloop  # ?
import tornado.options  # 指定服务端口和路径解析
import tornado.web  # web模块
from tornado.options import define, options
import os.path  # 获取和生成template文件路径
import tornado.httpclient  
from Basefuncs import * 
from base_config import base_config
from data_model import * 
# 配置日志
logger = get_base_logger()

import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, datetime):
            return obj.__str__()
        if isinstance(obj, dd.timedelta):
            return obj.__str__()
        else:
            return super(MyEncoder, self).default(obj)

2.3.2 基础转发

python"># 基础转发处理器
class BaseForwardHandler(tornado.web.RequestHandler):
    def set_default_headers(self):
        self.set_header("Access-Control-Allow-Origin", "*")
        self.set_header("Access-Control-Allow-Headers", "Content-Type")
        self.set_header("Access-Control-Allow-Methods", "POST, OPTIONS")

    def options(self):
        self.set_status(204)
        self.finish()

    # 被转发的请求
    async def forward_request(self, target_url, body=None, headers=None):
        # 获取原始请求的 body 和 headers
        # 如果没有传递 body 和 headers,则使用原始请求的 body 和 headers
        if body is None:
            body = self.request.body
        if headers is None:
            headers = self.request.headers

        # 使用 Tornado 的异步 HTTP 客户端转发请求
        http_client = tornado.httpclient.AsyncHTTPClient()
        try:
            logger.info(f"Forwarding request to {target_url}")
            response = await http_client.fetch(
                target_url,
                method="POST",
                headers=headers,
                body=body
            )
            self.write(response.body)

        except tornado.httpclient.HTTPError as e:
            logger.error(f"HTTP Error: {e.code} - {str(e)}")
            self.set_status(e.code)
            self.write(json.dumps({"error": str(e)}))
            # 触发告警通知
            await self.trigger_alert(e)
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            self.set_status(500)
            self.write(json.dumps({"error": "Internal Server Error"}))
            # 触发告警通知
            await self.trigger_alert(e)


    async def trigger_alert(self, error):
        """
        触发告警通知的逻辑
        """
        alert_message = f"Service Error: {str(error)}"
        logger.error(f"Sending alert: {alert_message}")
        
        # 示例:发送告警到内部系统(如邮件、Slack、Webhook 等)
        try:
            # 假设我们有一个发送告警的函数
            await self.send_alert_to_internal_system(alert_message)
        except Exception as e:
            logger.error(f"Failed to send alert: {str(e)}")

    async def send_alert_to_internal_system(self, message):
        """
        发送告警到内部系统的逻辑
        """
        # 示例:使用 HTTP 客户端发送告警到 Webhook
        http_client = tornado.httpclient.AsyncHTTPClient()
        alert_url = "http://internal-alert-system/endpoint"
        body = json.dumps({"message": message})
        try:
            await http_client.fetch(
                alert_url,
                method="POST",
                headers={"Content-Type": "application/json"},
                body=body
            )
            logger.info("Alert sent successfully")
        except Exception as e:
            logger.error(f"Failed to send alert: {str(e)}")
            raise e

2.3.3 应用的转发逻辑

python">import time 
class AIServer(BaseForwardHandler):
    print('AIServer Working ', get_time_str1())
    async def post(self):
        standard_output = StandardOutput()
        print('解析请求体')
        # 1. 解析请求体
        try:
            request_data = json.loads(self.request.body)
            print("Parsed request data:", request_data)
        except json.JSONDecodeError:
            self.set_status(400)
            standard_output.status = False 
            standard_output.msg  = "Invalid JSON format"

            self.write(json.dumps(standard_output.dict()))
            return

        # 2. 验证请求格式
        try:
            standard_input = StandardInput(**request_data)
        except (TypeError, ValueError) as e:
            # 如果 request_data 不符合 StandardInput 的要求,捕获 TypeError 或 ValueError
            self.set_status(400)
            standard_output.status = False 
            standard_output.msg  = f"Invalid input data: {str(e)}"

            self.write(json.dumps(standard_output.dict()))
            return

        # 3.转发请求
        target_server_dict =  base_config.server_dict.get(standard_input.server_name)
        if target_server_dict is None:
            self.set_status(400)
            standard_output.status = False 
            standard_output.msg  = f"Invalid input data: No Such Service {standard_input.server_name}"
            self.write(json.dumps(standard_output.dict()))
            return

        # 这里控制版本
        await self.forward_request(target_server_dict['base'])

然后进行调用

请求端:

python">import httpx
data_dict =   {'server_name':'test_server', 'address': '上海九六广场'}
resp = httpx.post('http://localhost:9001/ai_server/' ,json = data_dict)
In [31]: resp.json()
Out[31]:
{'name': 'get_lng_lat',
 'status': True,
 'msg': 'ok',
 'duration': 86,
 'data': {'status': 0,
  'lng': 121.5312514401146,
  'lat': 31.23340608690243,
  'precise': 1,
  'confidence': 75,
  'comprehension': 100,
  'level': '购物'}}

服务端:

python">...
解析请求体
Parsed request data: {'server_name': 'test_server', 'address': '上海九六广场'}
2025-02-02 01:09:04 - my_logger - INFO - Forwarding request to http://IP/get_lng_lat/
2025-02-02 01:09:04 - tornado.access - INFO - 200 POST /ai_server/ (127.0.0.1) 158.67ms
...

可以看到转发成功了,原始的请求大约80ms,因为转发带来的额外延时大约也是80ms。所以如果对于实时性要求特别高的可能还需要注意。后续再加上其他的额外处理,单个请求最终的延时可能到达200ms,甚至500ms。对于类似大模型之类的调用来说,一般会花费数秒到数十秒来处理一个请求,这种开销就可以接受。


http://www.niftyadmin.cn/n/5841115.html

相关文章

【Qt】常用的容器

Qt提供了多个基于模板的容器类&#xff0c;这些容器类可用于存储指定类型的数据项。例如常用的字符串列表类 QStringList 可用来操作一个 QList<QString>列表。 Qt的容器类比标准模板库(standard template library&#xff0c;STL)中的容器类更轻巧、使用更安全且更易于使…

【论文复现】基于Otsu方法的多阈值图像分割改进鲸鱼优化算法

目录 1.摘要2.鲸鱼优化算法WOA原理3.改进策略4.结果展示5.参考文献6.代码获取 1.摘要 本文提出了一种基于Otsu方法的多阈值图像分割改进鲸鱼优化算法&#xff08;RAV-WOA&#xff09;。RAV-WOA算法能够在分割灰度图像和彩色图像时&#xff0c;自动选择最优阈值&#xff0c;并确…

【TypeScript】基础:数据类型

文章目录 TypeScript一、简介二、类型声明三、数据类型anyunknownnervervoidobjecttupleenumType一些特殊情况 TypeScript 是JavaScript的超集&#xff0c;代码量比JavaScript复杂、繁多&#xff1b;但是结构更清晰 一、简介 为什么需要TypeScript&#xff1f; JavaScript的…

【大数据技术】教程01:搭建完全分布式高可用大数据集群(VMware+CentOS+FinalShell)

搭建完全分布式高可用大数据集群&#xff08;VMwareCentOSFinalShell&#xff09; 资源下载 VMware Workstation Pro 16CentOS-Stream-10-latest-x86_64-dvd1.isoFinalShell 4.5.12 注&#xff1a;请在阅读本篇文章前&#xff0c;将以上资源下载下来。 写在前面 本章主要介…

Elasticsearch 指南 [8.17] | Search APIs

Search API 返回与请求中定义的查询匹配的搜索结果。 http GET /my-index-000001/_search Request GET /<target>/_search GET /_search POST /<target>/_search POST /_search Prerequisites 如果启用了 Elasticsearch 安全功能&#xff0c;针对目标数据流…

【tiktok 国际版抖抖♬♬ __ac_signature算法】逆向分析

一开始的参数是没有X-Bogus和 __ac_signature的 先是加密请求参数得到乱码。最终得到X-Bogus 然后请求参数添加了X-Bogus之后再去生成__ac_signature __ac_signature的生成需要用到X-Bogus

自定义数据集 使用scikit-learn中svm的包实现svm分类

代码&#xff1a; import numpy as np # 导入用于数值计算的库 import matplotlib.pyplot as plt # 导入用于绘图的库# class1_points 和 class2_points 分别定义了两个类别的数据点&#xff0c;二维坐标 class1_points np.array([[1.9, 1.2],[1.5, 2.1],[1.9, 0.5],[1.5, …

影视文件大数据高速分发方案

在当今的数字时代&#xff0c;影视行业的内容创作和传播方式经历了翻天覆地的变化。随着4K、8K高清视频的普及&#xff0c;以及虚拟现实(VR)和增强现实(AR)技术的发展&#xff0c;影视文件的数据量正以前所未有的速度增长。这就要求行业内的参与者必须拥有高效的大数据传输解决…