Golang分布式注册中心实现流程讲解
来源:脚本之家
2023-05-12 09:46:33
0浏览
收藏
编程并不是一个机械性的工作,而是需要有思考,有创新的工作,语法是固定的,但解决问题的思路则是依靠人的思维,这就需要我们坚持学习和更新自己的知识。今天golang学习网就整理分享《Golang分布式注册中心实现流程讲解》,文章讲解的知识点主要包括分布式、注册中心,如果你对Golang方面的知识点感兴趣,就不要错过golang学习网,在这可以对大家的知识积累有所帮助,助力开发能力的提升。
动手实现一个分布式注册中心
以一个日志微服务为例,将日志服务注册到注册中心展开!

日志服务
log/Server.go
其实这一个日志类的功能就是有基本的写文件功能,然后就是注册一个http的接口去写日志进去
package log
import (
"io/ioutil"
stlog "log"
"net/http"
"os"
)
var log *stlog.Logger
type fileLog string
// 编写日志的方法
func (fl fileLog) Write(data []byte) (int, error) {
f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return 0, err
}
defer f.Close()
return f.Write(data)
}
// 启动一个日志对象 参数为日志文件名
func Run(destination string) {
log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags)
}
// 自身注册的一个服务方法
func RegisterHandlers() {
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
msg, err := ioutil.ReadAll(r.Body)
if err != nil || len(msg) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
write(string(msg))
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
})
}
func write(message string) {
log.Printf("%v\n", message)
}log/Client.go
提供给外部服务的接口,定义好日志的命名格式,来显示调用接口去使用已经注册好的日志接口并且返回状态
package log
import (
"bytes"
"distributed/registry"
"fmt"
"net/http"
stlog "log"
)
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
stlog.SetFlags(0)
stlog.SetOutput(&clientLogger{url: serviceURL})
}
type clientLogger struct {
url string
}
func (cl clientLogger) Write(data []byte) (int, error) {
b := bytes.NewBuffer([]byte(data))
res, err := http.Post(cl.url+"/log", "text/plain", b)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status)
}
return len(data), nil
}主启动程序LogService
启动服务Logservice,主要执行start方法,里面有细节实现服务注册与服务发现
package main
import (
"context"
"distributed/log"
"distributed/registry"
"distributed/service"
"fmt"
stlog "log"
)
func main() {
// 初始化启动一个日志文件对象
log.Run("./distributed.log")
// 日志服务注册的端口和地址
host, port := "localhost", "4000"
serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
// 初始化注册对象
r := registry.Registration{
ServiceName: registry.LogService, // 自身服务名
ServiceURL: serviceAddress, // 自身服务地址
RequiredServices: make([]registry.ServiceName, 0),// 依赖服务
ServiceUpdateURL: serviceAddress + "/services", // 服务列表
HeartbeatURL: serviceAddress + "/heartbeat", // 心跳
}
// 启动日志服务包含服务注册,发现等细节
ctx, err := service.Start(
context.Background(),
host,
port,
r,
log.RegisterHandlers,
)
// 异常写入到日志中
if err != nil {
stlog.Fatalln(err)
}
// 超时停止退出服务
服务启动与注册
service/service.go
Start 启动服务的主方法
/*
host: 地址
port: 端口号
reg: 注册的服务对象
registerHandlersFunc: 注册方法
*/
func Start(ctx context.Context, host, port string,
reg registry.Registration,
registerHandlersFunc func()) (context.Context, error) {
registerHandlersFunc() // 启动注册方法
// 启动服务
ctx = startService(ctx, reg.ServiceName, host, port)
// 注册服务
err := registry.RegisterService(reg)
if err != nil {
return ctx, err
}
return ctx, nil
}startService
func startService(ctx context.Context, serviceName registry.ServiceName,
host, port string) context.Context {
ctx, cancel := context.WithCancel(ctx)
var srv http.Server
srv.Addr = ":" + port
// 该协程为监听http服务,并且停止服务的时候cancel
go func() {
log.Println(srv.ListenAndServe())
// 删除对应的服务
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
cancel()
}()
// 该协程为监听手动停止服务的信号
go func() {
fmt.Printf("%v started. Press any key to stop. \n", serviceName)
var s string
fmt.Scanln(&s)
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
srv.Shutdown(ctx)
cancel()
}()
return ctx
}服务注册与发现
registry/client.go
注册服务的时候会连着心跳以及服务更新的方法一起注册!
而服务更新里面的细节就是自己自定义了一个Handler然后ServeHttp方法里面去update全局的服务提供对象,
update主要是更新服务和删除服务的最新消息
然后就是提供一个注销服务的方法
package registry
import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sync"
)
// 注册服务
func RegisterService(r Registration) error {
// 获得心跳地址并注册
heartbeatURL, err := url.Parse(r.HeartbeatURL)
if err != nil {
return err
}
http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
// 获得服务更新地址,并且自定义http服务的handler,因为每次更新服务的时候,可以在ServeHttp方法里面去维护
serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
if err != nil {
return err
}
http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{})
// 写入buf值将服务对象发送给注册中心的services地址
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
err = enc.Encode(r)
if err != nil {
return err
}
res, err := http.Post(ServicesURL, "application/json", buf)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to register service. Registry service "+
"responded with code %v", res.StatusCode)
}
return nil
}
type serviceUpdateHanlder struct{}
func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
dec := json.NewDecoder(r.Body)
var p patch
err := dec.Decode(&p)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
fmt.Printf("Updated received %v\n", p)
prov.Update(p) // 更新服务提供对象
}
// 删除对应注册中心的服务地址
func ShutdownService(url string) error {
req, err := http.NewRequest(http.MethodDelete, ServicesURL,
bytes.NewBuffer([]byte(url)))
if err != nil {
return err
}
req.Header.Add("Content-Type", "text/plain")
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to deregister service. Registry "+
"service responded with code %v", res.StatusCode)
}
return nil
}
// 更新服务列表
func (p *providers) Update(pat patch) {
p.mutex.Lock()
defer p.mutex.Unlock()
// 将patch中有新增的进行添加
for _, patchEntry := range pat.Added {
if _, ok := p.services[patchEntry.Name]; !ok {
p.services[patchEntry.Name] = make([]string, 0)
}
p.services[patchEntry.Name] = append(p.services[patchEntry.Name],
patchEntry.URL)
}
// 将patch中被标记删除的
for _, patchEntry := range pat.Removed {
if providerURLs, ok := p.services[patchEntry.Name]; ok {
for i := range providerURLs {
if providerURLs[i] == patchEntry.URL {
p.services[patchEntry.Name] = append(providerURLs[:i],
providerURLs[i+1:]...)
}
}
}
}
}
// 根据服务名负载均衡随机获取服务地址
func (p providers) get(name ServiceName) (string, error) {
providers, ok := p.services[name]
if !ok {
return "", fmt.Errorf("No providers available for service %v", name)
}
idx := int(rand.Float32() * float32(len(providers)))
return providers[idx], nil
}
// 对外暴露生产者的方法
func GetProvider(name ServiceName) (string, error) {
return prov.get(name)
}
type providers struct {
services map[ServiceName][]string
mutex *sync.RWMutex
}
// 服务提供对象
var prov = providers{
services: make(map[ServiceName][]string), // 服务列表 服务名->集群地址集合
mutex: new(sync.RWMutex), // 锁 防止服务注册更新时的并发情况
}registry/registration.go
主要是一些关于服务使用到的参数以及对象!
package registry
type Registration struct {
ServiceName ServiceName // 服务名
ServiceURL string // 服务地址
RequiredServices []ServiceName // 依赖的服务
ServiceUpdateURL string // 服务更新的地址
HeartbeatURL string // 心跳地址
}
type ServiceName string
// 服务名集合
const (
LogService = ServiceName("LogService")
GradingService = ServiceName("GradingService")
PortalService = ServiceName("Portald")
)
// 服务对象参数
type patchEntry struct {
Name ServiceName
URL string
}
// 更新的服务对象参数
type patch struct {
Added []patchEntry
Removed []patchEntry
}registry/server.go
服务端的注册中心服务的增删改查管理以及心跳检测,及时将最新的更新的服务消息通知回给客户端
package registry
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services" // 注册中心地址
// 服务对象集合
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 添加服务
func (r *registry) add(reg Registration) error {
r.mutex.Lock()
r.registrations = append(r.registrations, reg)
r.mutex.Unlock()
err := r.sendRequiredServices(reg)
r.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})
return err
}
// 通知服务接口请求去刷新改变后到服务
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, reg := range r.registrations {
go func(reg Registration) {
for _, reqService := range reg.RequiredServices {
p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
sendUpdate := false
for _, added := range fullPatch.Added {
if added.Name == reqService {
p.Added = append(p.Added, added)
sendUpdate = true
}
}
for _, removed := range fullPatch.Removed {
if removed.Name == reqService {
p.Removed = append(p.Removed, removed)
sendUpdate = true
}
}
if sendUpdate {
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
log.Println(err)
return
}
}
}
}(reg)
}
}
// 更新每个服务的依赖服务
func (r registry) sendRequiredServices(reg Registration) error {
r.mutex.RLock()
defer r.mutex.RUnlock()
var p patch
for _, serviceReg := range r.registrations {
for _, reqService := range reg.RequiredServices {
if serviceReg.ServiceName == reqService {
p.Added = append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
return err
}
return nil
}
// 告诉客户端更新,最新的服务列表是这个
func (r registry) sendPatch(p patch, url string) error {
d, err := json.Marshal(p)
if err != nil {
return err
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
if err != nil {
return err
}
return nil
}
// 注册中心删除服务对象
func (r *registry) remove(url string) error {
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
// 通知客户端更新对象信息
r.notify(patch{
Removed: []patchEntry{
{
Name: r.registrations[i].ServiceName,
URL: r.registrations[i].ServiceURL,
},
},
})
r.mutex.Lock()
reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
r.mutex.Unlock()
return nil
}
}
return fmt.Errorf("Service at URL %s not found", url)
}
// 心跳检测
func (r *registry) heartbeat(freq time.Duration) {
for {
var wg sync.WaitGroup
for _, reg := range r.registrations {
wg.Add(1)
go func(reg Registration) {
defer wg.Done()
success := true
for attemps := 0; attemps 本篇关于《Golang分布式注册中心实现流程讲解》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!
版本声明
本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
GoLang职责链模式代码实现介绍
- 上一篇
- GoLang职责链模式代码实现介绍
- 下一篇
- Go语言基础学习之Context的使用详解
查看更多
最新文章
-
- Golang · Go教程 | 12分钟前 |
- Golang并发队列实现与使用技巧
- 132浏览 收藏
-
- Golang · Go教程 | 14分钟前 |
- Go调用DLL传递数组指针方法解析
- 450浏览 收藏
-
- Golang · Go教程 | 20分钟前 |
- MacGo安装ld链接器失败解决方法
- 209浏览 收藏
-
- Golang · Go教程 | 21分钟前 |
- Golang模块自动同步技巧分享
- 237浏览 收藏
-
- Golang · Go教程 | 23分钟前 |
- Golang反向代理开发入门教程
- 188浏览 收藏
-
- Golang · Go教程 | 28分钟前 |
- Golang读写CSV文件教程详解
- 317浏览 收藏
-
- Golang · Go教程 | 33分钟前 |
- Golang微服务异常处理技巧分享
- 209浏览 收藏
-
- Golang · Go教程 | 51分钟前 | golang Goroutine channel sync.Mutex 高并发队列
- Golang高并发队列处理技巧解析
- 351浏览 收藏
-
- Golang · Go教程 | 57分钟前 |
- Golang协程等待技巧:WaitGroup使用详解
- 212浏览 收藏
-
- Golang · Go教程 | 58分钟前 |
- Golang字符串操作实战教程
- 126浏览 收藏
查看更多
课程推荐
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
查看更多
AI推荐
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3161次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3374次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3402次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4505次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3783次使用
查看更多
相关文章
-
- Go语言实战之实现一个简单分布式系统
- 2022-12-23 220浏览
-
- Go与Redis实现分布式互斥锁和红锁
- 2022-12-22 117浏览
-
- golang 基于 mysql 简单实现分布式读写锁
- 2023-01-07 384浏览
-
- Golang分布式应用之Redis示例详解
- 2023-01-07 113浏览
-
- Golang分布式应用定时任务示例详解
- 2022-12-24 269浏览

