Skip to content

HTTP 客户端

HTTP 客户端是 Spire 框架的核心基础设施,提供了高性能、可扩展的 HTTP 通信功能。通过 HttpClientFactory 模式,有效管理 HTTP 请求处理和资源分配,提高应用程序的网络通信性能和可维护性。

快速启动

cangjie
import spire_net_http.*

main() {
    let client = HttpClient()
    let request = HttpRequestMessage(HttpMethod.Get, "https://localhost:7246/js/jquery.min.js")
    let response = client.send(request)
    response.ensureSuccessStatusCode() // 确认状态码是否是2xx
    response.content.readAsString() |> println
    return 0
}

Http请求

GET请求

cangjie
let client = HttpClient()
// 设置基础地址
client.baseAddress = URL.parse("https://localhost:7246")
// 获取字符串
let js = client.getString("/userinfo")
// 获取流
let stream = client.getStream("/userinfo")
// 获取二进制数组
let bytes = client.getByteArray("/userinfo")
// 获取响应体
let response = client.get("/userinfo")

内容类型

FormUrl

cangjie
let client = HttpClient()
client.baseAddress = URL.parse("https://localhost:7246")
let content = FormUrlContent([
    ("grant_type", "password"),
    ("username", "admin"),
    ("password", "1024")
])
let response = client.post("/connect/token", content)
response.ensureSuccessStatusCode()

Json

cangjie
let client = HttpClient()
client.baseAddress = URL.parse("https://localhost:7246")
// TokenRequest需要实现序列化接口
let content = JsonContent.create(TokenRequest(username: "admin", password: "1024", grant_type: "password"))
let response = client.post("/connect/token", content)
response.ensureSuccessStatusCode()

String

cangjie
let client = HttpClient()
client.baseAddress = URL.parse("https://localhost:7246")
let content = StringContent(#"
{
    "username": "admin",
    "password": "1024",
    "grant_type": "password"
}
"#,"application/json")
let response = client.post("/connect/token", content)
response.ensureSuccessStatusCode()

ByteArray

cangjie
let client = HttpClient()
client.baseAddress = URL.parse("https://localhost:7246")
let content = ByteArrayContent(#"
{
    "username": "admin",
    "password": "1024",
    "grant_type": "password"
}
"#.toArray())
content.headers.add("content-type", "application/json")
let response = client.post("/connect/token", content)
response.ensureSuccessStatusCode()

Stream

cangjie
let client = HttpClient()
client.baseAddress = URL.parse("https://localhost:7246")
let buffer = ByteBuffer()
buffer.write(#"
{
    "username": "admin",
    "password": "1024",
    "grant_type": "password"
}
"#.toArray())
let content = StreamContent(buffer)
content.headers.add("content-type", "application/json")
let response = client.post("/connect/token", content)
response.ensureSuccessStatusCode()

MultipartForm

cangjie
let client = HttpClient()
client.baseAddress = URL.parse("https://localhost:7246")
let content = MultipartFormDataContent()
let file = File("cjpm.toml", OpenMode.Read)
content.add(StreamContent(file), "file", "cjpm.tom")
let response = client.post("/upload/file", content)
response.ensureSuccessStatusCode()

请求管道

我们可以通过实现DelegatingHandler抽象类来定义自定义消息处理处理器。通过链表的设计模式来实现的AOP编程思想。

请求流程

  • HttpClient:头节点,负责前端交互。
  • DelegatingHandler:用户自定义处理器,如身份认证,日志记录,错误处理等等。
  • HttpClientHandler:尾节点,负责最终的http处理。

代码演示

cangjie
main() {
    let client = HttpClientHandler() |>
        {f => Custom2Handler(f)} |>
        {f => Custom1Handler(f)} |>
        {f => HttpClient(f)}
    // 可以简写为:
    // let client = HttpClient(Custom1Handler(Custom2Handler(HttpClientHandler())))    
    let content = client.getString("https://localhost:7246/hello")
    return 0
}

public class Custom1Handler <: DelegatingHandler {
    public init(handler: HttpMessageHandler) {
        super(handler)
    }

    protected override func send(request: HttpRequestMessage): HttpResponseMessage {
        println("handler1:start")
        let response =  super.send(request)
        println("handler1:endle")
        return response
    }
}

public class Custom2Handler <: DelegatingHandler {
    public init(handler: HttpMessageHandler) {
        super(handler)
    }

    protected override func send(request: HttpRequestMessage): HttpResponseMessage {
        println("handler2:start")
        let response =  super.send(request)
        println("handler2:endle")
        return response
    }
}

运行结果

bash
handler1:start
handler2:start
handler2:endle
handler1:endle

实用案例

我们可以通过请求管道来实现日志打印能力。

cangjie
main() {
    let client = HttpClientHandler() |>
        {f => LoggingHandler(f)} |> 
        {f => HttpClient(f)}
    let content = client.getString("https://localhost:7246/hello")
    return 0
}

public class LoggingHandler <: DelegatingHandler {
    public init(handler: HttpMessageHandler) {
        super(handler)
    }

    protected override func send(request: HttpRequestMessage): HttpResponseMessage {
        println("requestUri: ${request.requestUri}")
        // 发送请求
        let response = super.send(request)
        println("response:${response.content.readAsString()}")
        return response
    }
}

运行结果

bash
root@localhost:~# cjpm run
requestUri: https://localhost:7246/hello
response: hello

容器集成

我们提供了容器集成方案,并围绕生产环境需求做了大量优化工作。在生产环境中,我们强烈推荐采用容器集成方案,该方案具备以下生产级别的核心能力:

  • DNS刷新与连接池管理:当HttpClient以单例模式使用时,由于底层连接池的缓存机制,可能会出现因DNS解析结果更新而无法及时感知的问题。一旦域名对应的IP地址发生变更,原有连接池仍可能继续使用已缓存的无效连接,从而导致服务不可用。通过容器集成,我们可以主动配置连接池的缓存超时时间,确保DNS变化能够被及时识别并更新,保障服务的高可用性。

  • 依赖注入支持:集成容器后,自定义处理器等组件能够无缝融入依赖注入体系,轻松获取所需服务与组件实例。这不仅提升了代码的可维护性和可测试性,也使得应用架构更加清晰和松耦合。

  • 资源管理与弹性伸缩:容器集成方案还提供了完善的资源管控能力,支持对内存、线程、连接等关键资源进行统一管理,并能够根据负载情况实现弹性扩缩容,更好地适应生产环境中的流量波动与高并发场景。

默认客户端

我们可以注册一个非命名客户端到容器中,并设置缓存时间。

cangjie
import spire_net_http.*
import spire_extensions_http.*
import spire_extensions_injection.*

main() {
    let services = ServiceCollection()
    // 注册默认客户端
    services.addHttpClientCore()
    // 配置默认客户端
    services.configureHttpClientDefaults {
        c =>
            // 设置处理器生命周期为1s
            c.setHandlerLifetime(Duration.second * 1)
    }
    let provider = services.build()
    let client = provider.getOrThrow<HttpClient>()
    client.getString("https://localhost:7246/hello") |> println
    return 0
}
  • 说明,我们强烈建议使用IHttpClientFactory来创建HttpClient,而不是直接从容器解析HttpClient,否则将失去容器集成的所有优势。

命名客户端

默认客户端共享同一个http处理器,我们更加推荐使用命名化客户端,专项专用。

cangjie
let services = ServiceCollection()
services.addHttpClient("obs")
    // 设置处理器销毁时间,默认就是2分钟
    .setHandlerLifetime(Duration.minute * 2) 
let provider = services.build()
let clientFactory = provider.getOrThrow<IHttpClientFactory>()
let obsClient = clientFactory.createClient("obs")
obsClient.getString("https://localhost:7246/hello") |> println
  • 2分钟之内,同名客户端共享处理器(dns,连接池等等)。
  • 2分钟之后,处理器会被加入到过期队列中,等待gc确认销毁。一旦被gc标记为可销毁,将释放所有缓存(dns,连接池等等)。

类型化客户端

类型化客户端即命名客户端,名称为类型的完全限定名。并同时将该类型注册到容器。

cangjie
main() {
    let services = ServiceCollection()
    services.addHttpClient<ObsClient, ObsClient>()
    let provider = services.build()
    let obsClient = provider.getOrThrow<ObsClient>()
    obsClient.healthCheck() |> println
    return 0
}

public class ObsClient {
    public ObsClient(let client: HttpClient) {
        
    }

    public func healthCheck() {
        client.getString("https://localhost:7246/hello")
    }
}

上面我们提到,不能直接注入HttpClient,但是类型化客户端,我们使用的是addHttpClient<ObsClient, ObsClient>()注册的,底层通过容器协调器来直接创建的,因此不会被根容器缓存。

请求管道

cangjie
main() {
    let services = ServiceCollection()
    // 注册处理器,不能注册为单例的。
    services.addTransient<ObsAuthHandler, ObsAuthHandler>()
    // 注册类型客户端
    services.addHttpClient<ObsClient, ObsClient>()
        // 关联到该客户端
        .addHttpMessageHandler<ObsAuthHandler>()
    let provider = services.build()
    let obsClient = provider.getOrThrow<ObsClient>()
    obsClient.healthCheck() |> println
    return 0
}

public class ObsClient {
    public ObsClient(let client: HttpClient) {
        
    }

    public func healthCheck() {
        client.getString("https://localhost:7246/hello")
    }
}
// 区别于非容器化,我们无需编写构造器,向下传递处理器链。
public class ObsAuthHandler <: DelegatingHandler {
    protected override func send(request: HttpRequestMessage): HttpResponseMessage {
        request.headers.add("authorization", "Beaerer...")
        println("已处理身份认证")
        let response = super.send(request)
        return response
    }
}

生命周期

通过该案例,我们可以发现:

  1. 没有释放之前会共用处理器,资源复用
  2. 处理器在指定生命周期到期后会安全的释放,节约资源,刷新dns
  3. 处理器释放之后可以再次创建。
cangjie
import std.runtime.*
import spire_net_http.*
import spire_extensions_http.*
import spire_extensions_injection.*

main() {
    let services = ServiceCollection()
    // 注册处理器
    services.addTransient<ObsAuthHandler, ObsAuthHandler>()
    services.addHttpClientCore()
    services.configureHttpClientDefaults {
        c =>
            // 设置处理器生命周期为1s
            c.setHandlerLifetime(Duration.second * 3)
            c.addHttpMessageHandler<ObsAuthHandler>()
    }
    let provider = services.build()
    var clientFactory = provider.getOrThrow<IHttpClientFactory>()
    run(clientFactory)
    // 如果本次请求小于指定的生命周期,那么不再创建新的处理器。
    run(clientFactory) 
    for (pattern in 0..5) {
        // 等待gc确认可以安全销毁
        gc(heavy: true)
        sleep(Duration.second * 3)
    }
    run(clientFactory)
    while (true) {
        sleep(Duration.second * 1)
        gc(heavy: true)
    }
    return 0
}

public func run(clientFactory: IHttpClientFactory) {
    let client = clientFactory.createClient()
    client.getString("https://localhost:7246/hello") |> println
}

public class ObsAuthHandler <: DelegatingHandler {
    public init() {
        println("创建处理器")
    }

    protected override func send(request: HttpRequestMessage): HttpResponseMessage {
        request.requestUri
        let response = super.send(request)
        response.content.readAsString()
        return response
    }

    public override func close(_: Bool) {
        println("释放处理器")
    }
}