Golang分布式注册中心实现流程讲解
来源:脚本之家
2023-05-12 09:46:33
0浏览
收藏
编程并不是一个机械性的工作,而是需要有思考,有创新的工作,语法是固定的,但解决问题的思路则是依靠人的思维,这就需要我们坚持学习和更新自己的知识。今天golang学习网就整理分享《Golang分布式注册中心实现流程讲解》,文章讲解的知识点主要包括分布式、注册中心,如果你对Golang方面的知识点感兴趣,就不要错过golang学习网,在这可以对大家的知识积累有所帮助,助力开发能力的提升。
动手实现一个分布式注册中心
以一个日志微服务为例,将日志服务注册到注册中心展开!
日志服务
log/Server.go
其实这一个日志类的功能就是有基本的写文件功能,然后就是注册一个http的接口去写日志进去
package log import ( "io/ioutil" stlog "log" "net/http" "os" ) var log *stlog.Logger type fileLog string // 编写日志的方法 func (fl fileLog) Write(data []byte) (int, error) { f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) if err != nil { return 0, err } defer f.Close() return f.Write(data) } // 启动一个日志对象 参数为日志文件名 func Run(destination string) { log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags) } // 自身注册的一个服务方法 func RegisterHandlers() { http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: msg, err := ioutil.ReadAll(r.Body) if err != nil || len(msg) == 0 { w.WriteHeader(http.StatusBadRequest) return } write(string(msg)) default: w.WriteHeader(http.StatusMethodNotAllowed) return } }) } func write(message string) { log.Printf("%v\n", message) }
log/Client.go
提供给外部服务的接口,定义好日志的命名格式,来显示调用接口去使用已经注册好的日志接口并且返回状态
package log import ( "bytes" "distributed/registry" "fmt" "net/http" stlog "log" ) func SetClientLogger(serviceURL string, clientService registry.ServiceName) { stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService)) stlog.SetFlags(0) stlog.SetOutput(&clientLogger{url: serviceURL}) } type clientLogger struct { url string } func (cl clientLogger) Write(data []byte) (int, error) { b := bytes.NewBuffer([]byte(data)) res, err := http.Post(cl.url+"/log", "text/plain", b) if err != nil { return 0, err } if res.StatusCode != http.StatusOK { return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status) } return len(data), nil }
主启动程序LogService
启动服务Logservice
,主要执行start方法,里面有细节实现服务注册与服务发现
package main import ( "context" "distributed/log" "distributed/registry" "distributed/service" "fmt" stlog "log" ) func main() { // 初始化启动一个日志文件对象 log.Run("./distributed.log") // 日志服务注册的端口和地址 host, port := "localhost", "4000" serviceAddress := fmt.Sprintf("http://%s:%s", host, port) // 初始化注册对象 r := registry.Registration{ ServiceName: registry.LogService, // 自身服务名 ServiceURL: serviceAddress, // 自身服务地址 RequiredServices: make([]registry.ServiceName, 0),// 依赖服务 ServiceUpdateURL: serviceAddress + "/services", // 服务列表 HeartbeatURL: serviceAddress + "/heartbeat", // 心跳 } // 启动日志服务包含服务注册,发现等细节 ctx, err := service.Start( context.Background(), host, port, r, log.RegisterHandlers, ) // 异常写入到日志中 if err != nil { stlog.Fatalln(err) } // 超时停止退出服务
服务启动与注册
service/service.go
Start 启动服务的主方法
/* host: 地址 port: 端口号 reg: 注册的服务对象 registerHandlersFunc: 注册方法 */ func Start(ctx context.Context, host, port string, reg registry.Registration, registerHandlersFunc func()) (context.Context, error) { registerHandlersFunc() // 启动注册方法 // 启动服务 ctx = startService(ctx, reg.ServiceName, host, port) // 注册服务 err := registry.RegisterService(reg) if err != nil { return ctx, err } return ctx, nil }
startService
func startService(ctx context.Context, serviceName registry.ServiceName, host, port string) context.Context { ctx, cancel := context.WithCancel(ctx) var srv http.Server srv.Addr = ":" + port // 该协程为监听http服务,并且停止服务的时候cancel go func() { log.Println(srv.ListenAndServe()) // 删除对应的服务 err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port)) if err != nil { log.Println(err) } cancel() }() // 该协程为监听手动停止服务的信号 go func() { fmt.Printf("%v started. Press any key to stop. \n", serviceName) var s string fmt.Scanln(&s) err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port)) if err != nil { log.Println(err) } srv.Shutdown(ctx) cancel() }() return ctx }
服务注册与发现
registry/client.go
注册服务的时候会连着心跳以及服务更新的方法一起注册!
而服务更新里面的细节就是自己自定义了一个Handler然后ServeHttp方法里面去update
全局的服务提供对象,
update主要是更新服务和删除服务的最新消息
然后就是提供一个注销服务的方法
package registry import ( "bytes" "encoding/json" "fmt" "log" "math/rand" "net/http" "net/url" "sync" ) // 注册服务 func RegisterService(r Registration) error { // 获得心跳地址并注册 heartbeatURL, err := url.Parse(r.HeartbeatURL) if err != nil { return err } http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) // 获得服务更新地址,并且自定义http服务的handler,因为每次更新服务的时候,可以在ServeHttp方法里面去维护 serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL) if err != nil { return err } http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{}) // 写入buf值将服务对象发送给注册中心的services地址 buf := new(bytes.Buffer) enc := json.NewEncoder(buf) err = enc.Encode(r) if err != nil { return err } res, err := http.Post(ServicesURL, "application/json", buf) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("Failed to register service. Registry service "+ "responded with code %v", res.StatusCode) } return nil } type serviceUpdateHanlder struct{} func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } dec := json.NewDecoder(r.Body) var p patch err := dec.Decode(&p) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } fmt.Printf("Updated received %v\n", p) prov.Update(p) // 更新服务提供对象 } // 删除对应注册中心的服务地址 func ShutdownService(url string) error { req, err := http.NewRequest(http.MethodDelete, ServicesURL, bytes.NewBuffer([]byte(url))) if err != nil { return err } req.Header.Add("Content-Type", "text/plain") res, err := http.DefaultClient.Do(req) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("Failed to deregister service. Registry "+ "service responded with code %v", res.StatusCode) } return nil } // 更新服务列表 func (p *providers) Update(pat patch) { p.mutex.Lock() defer p.mutex.Unlock() // 将patch中有新增的进行添加 for _, patchEntry := range pat.Added { if _, ok := p.services[patchEntry.Name]; !ok { p.services[patchEntry.Name] = make([]string, 0) } p.services[patchEntry.Name] = append(p.services[patchEntry.Name], patchEntry.URL) } // 将patch中被标记删除的 for _, patchEntry := range pat.Removed { if providerURLs, ok := p.services[patchEntry.Name]; ok { for i := range providerURLs { if providerURLs[i] == patchEntry.URL { p.services[patchEntry.Name] = append(providerURLs[:i], providerURLs[i+1:]...) } } } } } // 根据服务名负载均衡随机获取服务地址 func (p providers) get(name ServiceName) (string, error) { providers, ok := p.services[name] if !ok { return "", fmt.Errorf("No providers available for service %v", name) } idx := int(rand.Float32() * float32(len(providers))) return providers[idx], nil } // 对外暴露生产者的方法 func GetProvider(name ServiceName) (string, error) { return prov.get(name) } type providers struct { services map[ServiceName][]string mutex *sync.RWMutex } // 服务提供对象 var prov = providers{ services: make(map[ServiceName][]string), // 服务列表 服务名->集群地址集合 mutex: new(sync.RWMutex), // 锁 防止服务注册更新时的并发情况 }
registry/registration.go
主要是一些关于服务使用到的参数以及对象!
package registry type Registration struct { ServiceName ServiceName // 服务名 ServiceURL string // 服务地址 RequiredServices []ServiceName // 依赖的服务 ServiceUpdateURL string // 服务更新的地址 HeartbeatURL string // 心跳地址 } type ServiceName string // 服务名集合 const ( LogService = ServiceName("LogService") GradingService = ServiceName("GradingService") PortalService = ServiceName("Portald") ) // 服务对象参数 type patchEntry struct { Name ServiceName URL string } // 更新的服务对象参数 type patch struct { Added []patchEntry Removed []patchEntry }
registry/server.go
服务端的注册中心服务的增删改查管理以及心跳检测,及时将最新的更新的服务消息通知回给客户端
package registry import ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "sync" "time" ) const ServerPort = ":3000" const ServicesURL = "http://localhost" + ServerPort + "/services" // 注册中心地址 // 服务对象集合 type registry struct { registrations []Registration mutex *sync.RWMutex } // 添加服务 func (r *registry) add(reg Registration) error { r.mutex.Lock() r.registrations = append(r.registrations, reg) r.mutex.Unlock() err := r.sendRequiredServices(reg) r.notify(patch{ Added: []patchEntry{ patchEntry{ Name: reg.ServiceName, URL: reg.ServiceURL, }, }, }) return err } // 通知服务接口请求去刷新改变后到服务 func (r registry) notify(fullPatch patch) { r.mutex.RLock() defer r.mutex.RUnlock() for _, reg := range r.registrations { go func(reg Registration) { for _, reqService := range reg.RequiredServices { p := patch{Added: []patchEntry{}, Removed: []patchEntry{}} sendUpdate := false for _, added := range fullPatch.Added { if added.Name == reqService { p.Added = append(p.Added, added) sendUpdate = true } } for _, removed := range fullPatch.Removed { if removed.Name == reqService { p.Removed = append(p.Removed, removed) sendUpdate = true } } if sendUpdate { err := r.sendPatch(p, reg.ServiceUpdateURL) if err != nil { log.Println(err) return } } } }(reg) } } // 更新每个服务的依赖服务 func (r registry) sendRequiredServices(reg Registration) error { r.mutex.RLock() defer r.mutex.RUnlock() var p patch for _, serviceReg := range r.registrations { for _, reqService := range reg.RequiredServices { if serviceReg.ServiceName == reqService { p.Added = append(p.Added, patchEntry{ Name: serviceReg.ServiceName, URL: serviceReg.ServiceURL, }) } } } err := r.sendPatch(p, reg.ServiceUpdateURL) if err != nil { return err } return nil } // 告诉客户端更新,最新的服务列表是这个 func (r registry) sendPatch(p patch, url string) error { d, err := json.Marshal(p) if err != nil { return err } _, err = http.Post(url, "application/json", bytes.NewBuffer(d)) if err != nil { return err } return nil } // 注册中心删除服务对象 func (r *registry) remove(url string) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { // 通知客户端更新对象信息 r.notify(patch{ Removed: []patchEntry{ { Name: r.registrations[i].ServiceName, URL: r.registrations[i].ServiceURL, }, }, }) r.mutex.Lock() reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...) r.mutex.Unlock() return nil } } return fmt.Errorf("Service at URL %s not found", url) } // 心跳检测 func (r *registry) heartbeat(freq time.Duration) { for { var wg sync.WaitGroup for _, reg := range r.registrations { wg.Add(1) go func(reg Registration) { defer wg.Done() success := true for attemps := 0; attemps
本篇关于《Golang分布式注册中心实现流程讲解》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!
版本声明
本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除

- 上一篇
- GoLang职责链模式代码实现介绍

- 下一篇
- Go语言基础学习之Context的使用详解
查看更多
最新文章
-
- Golang · Go教程 | 33分钟前 |
- DebianSyslog在虚拟机中的实用攻略
- 467浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- DebianOpenSSL安装失败的终极解决方案
- 501浏览 收藏
-
- Golang · Go教程 | 9小时前 |
- Debian数据快速提取技巧
- 216浏览 收藏
-
- Golang · Go教程 | 12小时前 |
- Debian系统JS依赖管理终极攻略
- 218浏览 收藏
-
- Golang · Go教程 | 14小时前 |
- Debian上Hadoop作业调度实用技巧
- 100浏览 收藏
-
- Golang · Go教程 | 14小时前 |
- Go语言闭包误区与匿名函数深度解析
- 222浏览 收藏
-
- Golang · Go教程 | 14小时前 |
- Debian系统安全回收数据的正确攻略
- 111浏览 收藏
-
- Golang · Go教程 | 16小时前 |
- Debian高效fetch技巧与使用攻略
- 125浏览 收藏
查看更多
课程推荐
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
查看更多
AI推荐
-
- 笔灵AI生成答辩PPT
- 探索笔灵AI生成答辩PPT的强大功能,快速制作高质量答辩PPT。精准内容提取、多样模板匹配、数据可视化、配套自述稿生成,让您的学术和职场展示更加专业与高效。
- 15次使用
-
- 知网AIGC检测服务系统
- 知网AIGC检测服务系统,专注于检测学术文本中的疑似AI生成内容。依托知网海量高质量文献资源,结合先进的“知识增强AIGC检测技术”,系统能够从语言模式和语义逻辑两方面精准识别AI生成内容,适用于学术研究、教育和企业领域,确保文本的真实性和原创性。
- 24次使用
-
- AIGC检测-Aibiye
- AIbiye官网推出的AIGC检测服务,专注于检测ChatGPT、Gemini、Claude等AIGC工具生成的文本,帮助用户确保论文的原创性和学术规范。支持txt和doc(x)格式,检测范围为论文正文,提供高准确性和便捷的用户体验。
- 30次使用
-
- 易笔AI论文
- 易笔AI论文平台提供自动写作、格式校对、查重检测等功能,支持多种学术领域的论文生成。价格优惠,界面友好,操作简便,适用于学术研究者、学生及论文辅导机构。
- 42次使用
-
- 笔启AI论文写作平台
- 笔启AI论文写作平台提供多类型论文生成服务,支持多语言写作,满足学术研究者、学生和职场人士的需求。平台采用AI 4.0版本,确保论文质量和原创性,并提供查重保障和隐私保护。
- 35次使用
查看更多
相关文章
-
- Go语言实战之实现一个简单分布式系统
- 2022-12-23 220浏览
-
- Go与Redis实现分布式互斥锁和红锁
- 2022-12-22 117浏览
-
- golang 基于 mysql 简单实现分布式读写锁
- 2023-01-07 384浏览
-
- Golang分布式应用之Redis示例详解
- 2023-01-07 113浏览
-
- Golang分布式应用定时任务示例详解
- 2022-12-24 269浏览