当前位置:首页 > 文章列表 > Golang > Go问答 > 使用 Go TCP 客户端-服务器实现高吞吐量

使用 Go TCP 客户端-服务器实现高吞吐量

来源:stackoverflow 2024-04-09 15:27:32 0浏览 收藏

一分耕耘,一分收获!既然打开了这篇文章《使用 Go TCP 客户端-服务器实现高吞吐量》,就坚持看下去吧!文中内容包含等等知识点...希望你能在阅读本文后,能真真实实学到知识或者帮你解决心中的疑惑,也欢迎大佬或者新人朋友们多留言评论,多给建议!谢谢!

问题内容

我将开发一个简单的 tcp 客户端和服务器,我希望实现高吞吐量(300000 个请求/秒),这很容易通过服务器硬件上的 cpp 或 c tcp 客户端和服务器达到。我的意思是 48 核和 64g 内存的服务器。

在我的测试台上,客户端和服务器都有 10g 网络接口卡,并且我在服务器端启用了接收端缩放,并在客户端启用了传输数据包引导。

我将客户端配置为每秒发送 10,000 个请求。我只是从 bash 脚本运行 go go run client.go 的多个实例来提高吞吐量。然而,这样一来,go 就会在操作系统上创建大量的线程,大量的线程会导致上下文切换成本很高,而我无法达到这样的吞吐量。我怀疑我从命令行运行的 go 实例的数量。下面的代码是该方法中客户端的代码片段:

func main(cmd_rate_int int, cmd_port string) {

   //runtime.gomaxprocs(2) // set maximum number of processes to be used by this applications

   //var rate float64 = float64(rate_int)

   rate := float64(cmd_rate_int)

   port = cmd_port

   conn, err := net.dial("tcp", port)
   if err != nil {
       fmt.println("error", err)
       os.exit(1)
   }

   var my_random_number float64 = nexttime(rate) * 1000000
   var my_random_int int = int(my_random_number)
   var int_message int64 = time.now().unixnano()
   byte_message := make([]byte, 8)

   go func(conn net.conn) {
       buf := make([]byte, 8)

       for true {
           _, err = io.readfull(conn, buf)
           now := time.now().unixnano()

           if err != nil {
               return
           }

           last := int64(binary.littleendian.uint64(buf))
           fmt.println((now - last) / 1000)
       }
       return

   }(conn)

   for true {
       my_random_number = nexttime(rate) * 1000000
       my_random_int = int(my_random_number)
       time.sleep(time.microsecond * time.duration(my_random_int))
       int_message = time.now().unixnano()
       binary.littleendian.putuint64(byte_message, uint64(int_message))
       conn.write(byte_message)
   }
}

因此,我尝试通过在 main 中调用 go client() 来运行所有 go 线程,这样我就不会在 linux 命令行中运行多个实例。我认为这可能是一个更好的主意。基本上,这确实是一个更好的想法,并且操作系统中的线程数量不会增加到 700 个左右。但吞吐量仍然很低,而且似乎没有利用底层硬件的所有功能。实际上,您可能想查看我在第二种方法中运行的代码:

func main() {

   //runtime.GOMAXPROCS(2) // set maximum number of processes to be used by this applications
   args := os.Args[1:]
   rate_int, _ := strconv.Atoi(args[0])
   client_size, _ := strconv.Atoi(args[1])
   port := args[2]

   i := 0
   for i <= client_size {
       go client.Main(rate_int, port)
       i = i + 1
   }

   for true {

   }
}

我想知道为了达到高吞吐量的最佳实践是什么?我一直听说 go 是轻量级的、高性能的,可以与 c/cpp pthread 相媲美。不过,我认为就性能而言,c/cpp 仍然远远优于 go。在这个问题上我可能会做一些非常错误的事情,所以如果有人可以帮助使用 go 实现高吞吐量,我会很高兴。


解决方案


这是对操作代码的快速修改。 由于原始源代码正在运行,它没有提供解决方案,但它说明了存储桶令牌的用法,以及其他一些小技巧。

它确实重新使用了与 op 源代码类似的默认值。

它表明您不需要两个文件/程序来提供客户端和服务器。

它演示了flag包的用法。

它展示了如何使用 time.unix(x,y) 适当地解析 unix 纳米时间戳

它展示了如何利用 io.copy 在同一个 net.conn 上写入您所读的内容。而不是手动编写。

不过,这对于生产交付来说是不合适的。

package main

import (
    "encoding/binary"
    "flag"
    "fmt"
    "io"
    "log"
    "math"
    "math/rand"
    "net"
    "os"
    "sync/atomic"
    "time"

    "github.com/juju/ratelimit"
)

var total_rcv int64

func main() {

    var cmd_rate_int float64
    var cmd_port string
    var client_size int

    flag.float64var(&cmd_rate_int, "rate", 400000, "change rate of message reading")
    flag.stringvar(&cmd_port, "port", ":9090", "port to listen")
    flag.intvar(&client_size, "size", 20, "number of clients")

    flag.parse()

    t := flag.arg(0)

    if t == "server" {
        server(cmd_port)

    } else if t == "client" {
        for i := 0; i < client_size; i++ {
            go client(cmd_rate_int, cmd_port)
        }
        // <-make(chan bool) // infinite wait.
        <-time.after(time.second * 2)
        fmt.println("total exchanged", total_rcv)

    } else if t == "client_ratelimit" {
        bucket := ratelimit.newbucketwithquantum(time.second, int64(cmd_rate_int), int64(cmd_rate_int))
        for i := 0; i < client_size; i++ {
            go clientratelimite(bucket, cmd_port)
        }
        // <-make(chan bool) // infinite wait.
        <-time.after(time.second * 3)
        fmt.println("total exchanged", total_rcv)
    }
}

func server(cmd_port string) {
    ln, err := net.listen("tcp", cmd_port)
    if err != nil {
        panic(err)
    }

    for {
        conn, err := ln.accept()
        if err != nil {
            panic(err)
        }
        go io.copy(conn, conn)
    }
}

func client(cmd_rate_int float64, cmd_port string) {

    conn, err := net.dial("tcp", cmd_port)
    if err != nil {
        log.println("error", err)
        os.exit(1)
    }
    defer conn.close()

    go func(conn net.conn) {
        buf := make([]byte, 8)
        for {
            _, err := io.readfull(conn, buf)
            if err != nil {
                break
            }
            // int_message := int64(binary.littleendian.uint64(buf))
            // t2 := time.unix(0, int_message)
            // fmt.println("roudntrip", time.now().sub(t2))
            atomic.addint64(&total_rcv, 1)
        }
        return
    }(conn)

    byte_message := make([]byte, 8)
    for {
        wait := time.microsecond * time.duration(nexttime(cmd_rate_int))
        if wait > 0 {
            time.sleep(wait)
            fmt.println("wait", wait)
        }
        int_message := time.now().unixnano()
        binary.littleendian.putuint64(byte_message, uint64(int_message))
        _, err := conn.write(byte_message)
        if err != nil {
            log.println("error", err)
            return
        }
    }
}

func clientratelimite(bucket *ratelimit.bucket, cmd_port string) {

    conn, err := net.dial("tcp", cmd_port)
    if err != nil {
        log.println("error", err)
        os.exit(1)
    }
    defer conn.close()

    go func(conn net.conn) {
        buf := make([]byte, 8)
        for {
            _, err := io.readfull(conn, buf)
            if err != nil {
                break
            }
            // int_message := int64(binary.littleendian.uint64(buf))
            // t2 := time.unix(0, int_message)
            // fmt.println("roudntrip", time.now().sub(t2))
            atomic.addint64(&total_rcv, 1)
        }
        return
    }(conn)

    byte_message := make([]byte, 8)
    for {
        bucket.wait(1)
        int_message := time.now().unixnano()
        binary.littleendian.putuint64(byte_message, uint64(int_message))
        _, err := conn.write(byte_message)
        if err != nil {
            log.println("error", err)
            return
        }
    }
}

func nexttime(rate float64) float64 {
    return -1 * math.log(1.0-rand.float64()) / rate
}

编辑这是一个非常糟糕的答案。检查 mh-cbon 评论以了解原因。

我不完全理解你是如何尝试这样做的,但是如果我想控制 go 上的速率,我通常会执行 2 个嵌套的 for 循环:

for ;; time.sleep(time.second) {
  go func (){
    for i:=0; i

我在每个循环中启动一个 goroutine 来:

  • 在外部循环上,确保迭代之间只有 1 秒
  • 在内循环上,确保我可以启动所有我想要的请求

把它放在像你这样的问题上,它看起来像:

package main

import (
        "net"
        "os"
        "time"
)

const (
        rate    = 100000
        address = "localhost:8090"
)

func main() {
        conn, err := net.dial("tcp", address)
        if err != nil {
                os.stderr.write([]byte(err.error() + "\n"))
                os.exit(1)
        }

        for ; err == nil; time.sleep(time.second) {
                go func() {
                        for i := 0; i < rate; i++ {
                                go func(conn net.conn) {
                                        if _, err := conn.write([]byte("01234567")); err != nil {
                                                os.stderr.write([]byte("\nconnection closed: " + err.error() + "\n"))
                                        }
                                }(conn)
                        }
                }()
        }
}

要验证这是否确实发送了目标请求速率,您可以使用如下所示的测试 tcp 侦听器:

package main

import (
        "fmt"
        "net"
        "os"
        "time"
)

const (
        address = ":8090"
        payloadSize = 8
)
func main() {
        count := 0
        b := make([]byte, payloadSize)
        l, err := net.Listen("tcp", address)
        if err != nil {
                fmt.Fprintf(os.Stdout, "\nCan't listen to address %v: %v\n", address, err)
                return
        }


   defer l.Close()
    go func() {
            for ; ; time.Sleep(time.Second) {
                    fmt.Fprintf(os.Stdout, "\rRate: %v/s       ", count)
                    count = 0
            }
    }()
    for {
            conn, err := l.Accept()
            if err != nil {
                    fmt.Fprintf(os.Stderr, "\nFailed to accept connection: %v\n", err)
            }
            for {
                    _, err := conn.Read(b)
                    if err != nil {
                            fmt.Fprintf(os.Stderr, "\nConnection closed: %v\n", err)
                            break
                    }
                    count = count + 1
            }
    }

}

我发现了一些问题,因为无法同时写入连接,并出现错误 inconsistent fdmutex。这是由于达到了 0xfffff 并发写入,而 fdmutex 不支持该并发写入。为了缓解此问题,请确保不要超过该并发写入数量。在我的系统中,它是 >100k/s。这不是您期望的 300k/s,但我的系统还没有为此做好准备。

好了,本文到此结束,带大家了解了《使用 Go TCP 客户端-服务器实现高吞吐量》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多Golang知识!

版本声明
本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
PHP 服务器环境详解:必备组件一览PHP 服务器环境详解:必备组件一览
上一篇
PHP 服务器环境详解:必备组件一览
取消WIN10睡眠模式下的密码保护的简单教程
下一篇
取消WIN10睡眠模式下的密码保护的简单教程
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • AI Make Song:零门槛AI音乐创作平台,助你轻松制作个性化音乐
    AI Make Song
    AI Make Song是一款革命性的AI音乐生成平台,提供文本和歌词转音乐的双模式输入,支持多语言及商业友好版权体系。无论你是音乐爱好者、内容创作者还是广告从业者,都能在这里实现“用文字创造音乐”的梦想。平台已生成超百万首原创音乐,覆盖全球20个国家,用户满意度高达95%。
    18次使用
  • SongGenerator.io:零门槛AI音乐生成器,快速创作高质量音乐
    SongGenerator
    探索SongGenerator.io,零门槛、全免费的AI音乐生成器。无需注册,通过简单文本输入即可生成多风格音乐,适用于内容创作者、音乐爱好者和教育工作者。日均生成量超10万次,全球50国家用户信赖。
    14次使用
  •  BeArt AI换脸:免费在线工具,轻松实现照片、视频、GIF换脸
    BeArt AI换脸
    探索BeArt AI换脸工具,免费在线使用,无需下载软件,即可对照片、视频和GIF进行高质量换脸。体验快速、流畅、无水印的换脸效果,适用于娱乐创作、影视制作、广告营销等多种场景。
    14次使用
  • SEO标题协启动:AI驱动的智能对话与内容生成平台 - 提升创作效率
    协启动
    SEO摘要协启动(XieQiDong Chatbot)是由深圳协启动传媒有限公司运营的AI智能服务平台,提供多模型支持的对话服务、文档处理和图像生成工具,旨在提升用户内容创作与信息处理效率。平台支持订阅制付费,适合个人及企业用户,满足日常聊天、文案生成、学习辅助等需求。
    17次使用
  • Brev AI:零注册门槛的全功能免费AI音乐创作平台
    Brev AI
    探索Brev AI,一个无需注册即可免费使用的AI音乐创作平台,提供多功能工具如音乐生成、去人声、歌词创作等,适用于内容创作、商业配乐和个人创作,满足您的音乐需求。
    19次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码