C# HttpClient 使用 Consul 发现服务

Stella981
• 阅读 632

  试用了Overt.Core.Grpc, 把 GRPC 的使用改造得像 WCF, 性能测试也非常不错, 非常推荐各位使用.
  但已有项目大多是 http 请求, 改造成 GRPC 的话, 工作量比较大, 于是又找到了 Steeltoe.Discovery, 在 Startup 给 HttpClient 添加 DelegatingHandler, 动态改变请求url中的 host 和 port, 将http请求指向consul 发现的服务实例, 这样就实现了服务的动态发现.
  经过性能测试, Steeltoe.Discovery 只有 Overt.Core.Grpc 的20%, 非常难以接受, 于是自己实现了一套基于 consul 的服务发现工具. 嗯, 名字好难取啊, 暂定为 ConsulDiscovery.HttpClient 吧
  功能很简单:

  1. webapi 从json中读取配置信息 ConsulDiscoveryOptions;
  2. 如果自己是一个服务, 则将自己注册到consul中并设置健康检查Url;
  3. ConsulDiscovery.HttpClient 内有一个consul client 定时刷新所有服务的url访问地址.

  比较核心的两个类

C# HttpClient 使用 Consul 发现服务 C# HttpClient 使用 Consul 发现服务

using Consul;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsulDiscovery.HttpClient
{
    public class DiscoveryClient : IDisposable
    {
        private readonly ConsulDiscoveryOptions consulDiscoveryOptions;
        private readonly Timer timer;
        private readonly ConsulClient consulClient;
        private readonly string serviceIdInConsul;

        public Dictionary<string, List<string>> AllServices { get; private set; } = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);


        public DiscoveryClient(IOptions<ConsulDiscoveryOptions> options)
        {
            consulDiscoveryOptions = options.Value;
            consulClient = new ConsulClient(x => x.Address = new Uri($"http://{consulDiscoveryOptions.ConsulServerSetting.IP}:{consulDiscoveryOptions.ConsulServerSetting.Port}"));
            timer = new Timer(Refresh);

            if (consulDiscoveryOptions.ServiceRegisterSetting != null)
            {
                serviceIdInConsul = Guid.NewGuid().ToString();
            }
        }

        public void Start()
        {
            var checkErrorMsg = CheckParams();
            if (checkErrorMsg != null)
            {
                throw new ArgumentException(checkErrorMsg);
            }
            RegisterToConsul();
            timer.Change(0, consulDiscoveryOptions.ConsulServerSetting.RefreshIntervalInMilliseconds);
        }

        public void Stop()
        {
            Dispose();
        }

        private string CheckParams()
        {
            if (string.IsNullOrWhiteSpace(consulDiscoveryOptions.ConsulServerSetting.IP))
            {
                return "Consul服务器地址 ConsulDiscoveryOptions.ConsulServerSetting.IP 不能为空";
            }

            if (consulDiscoveryOptions.ServiceRegisterSetting != null)
            {
                var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting;
                if (string.IsNullOrWhiteSpace(registerSetting.ServiceName))
                {
                    return "服务名称 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceName 不能为空";
                }
                if (string.IsNullOrWhiteSpace(registerSetting.ServiceIP))
                {
                    return "服务地址 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceIP 不能为空";
                }
            }
            return null;
        }

        private void RegisterToConsul()
        {
            if (string.IsNullOrEmpty(serviceIdInConsul))
            {
                return;
            }

            var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting;
            var httpCheck = new AgentServiceCheck()
            {
                HTTP = $"{registerSetting.ServiceScheme}{Uri.SchemeDelimiter}{registerSetting.ServiceIP}:{registerSetting.ServicePort}/{registerSetting.HealthCheckRelativeUrl.TrimStart('/')}",
                Interval = TimeSpan.FromMilliseconds(registerSetting.HealthCheckIntervalInMilliseconds),
                Timeout = TimeSpan.FromMilliseconds(registerSetting.HealthCheckTimeOutInMilliseconds),
                DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(10),
            };
            var registration = new AgentServiceRegistration()
            {
                ID = serviceIdInConsul,
                Name = registerSetting.ServiceName,
                Address = registerSetting.ServiceIP,
                Port = registerSetting.ServicePort,
                Check = httpCheck,
                Meta = new Dictionary<string, string>() { ["scheme"] = registerSetting.ServiceScheme },
            };
            consulClient.Agent.ServiceRegister(registration).Wait();
        }

        private void DeregisterFromConsul()
        {
            if (string.IsNullOrEmpty(serviceIdInConsul))
            {
                return;
            }
            try
            {
                consulClient.Agent.ServiceDeregister(serviceIdInConsul).Wait();
            }
            catch
            { }
        }

        private void Refresh(object state)
        {
            Dictionary<string, AgentService>.ValueCollection serversInConsul;
            try
            {
                serversInConsul = consulClient.Agent.Services().Result.Response.Values;
            }
            catch // (Exception ex)
            {
                // 如果连接consul出错, 则不更新服务列表. 继续使用以前获取到的服务列表
                // 但是如果很长时间都不能连接consul, 服务列表里的一些实例已经不可用了, 还一直提供这样旧的列表也不合理, 所以要不要在这里实现 健康检查? 这样的话, 就得把检查地址变成不能设置的
                return;
            }

            // 1. 更新服务列表
            // 2. 如果这个程序提供了服务, 还要检测 服务Id 是否在服务列表里
            var tempServices = new Dictionary<string, HashSet<string>>();
            bool needReregisterToConsul = true;
            foreach (var service in serversInConsul)
            {
                var serviceName = service.Service;
                if (!service.Meta.TryGetValue("scheme", out var serviceScheme))
                {
                    serviceScheme = Uri.UriSchemeHttp;
                }
                var serviceHost = $"{serviceScheme}{Uri.SchemeDelimiter}{service.Address}:{service.Port}";
                if (!tempServices.TryGetValue(serviceName, out var serviceHosts))
                {
                    serviceHosts = new HashSet<string>();
                    tempServices[serviceName] = serviceHosts;
                }
                serviceHosts.Add(serviceHost);

                if (needReregisterToConsul && !string.IsNullOrEmpty(serviceIdInConsul) && serviceIdInConsul == service.ID)
                {
                    needReregisterToConsul = false;
                }
            }

            if (needReregisterToConsul)
            {
                RegisterToConsul();
            }

            var tempAllServices = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
            foreach (var item in tempServices)
            {
                tempAllServices[item.Key] = item.Value.ToList();
            }
            AllServices = tempAllServices;
        }


        public void Dispose()
        {
            DeregisterFromConsul();
            consulClient.Dispose();
            timer.Dispose();
        }
    }
}

View Code

C# HttpClient 使用 Consul 发现服务 C# HttpClient 使用 Consul 发现服务

using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace ConsulDiscovery.HttpClient
{
    public class DiscoveryHttpMessageHandler : DelegatingHandler
    {
        private static readonly Random random = new Random((int)DateTime.Now.Ticks);

        private readonly DiscoveryClient discoveryClient;

        public DiscoveryHttpMessageHandler(DiscoveryClient discoveryClient)
        {
            this.discoveryClient = discoveryClient;
        }

        protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
        {
            if (discoveryClient.AllServices.TryGetValue(request.RequestUri.Host, out var serviceHosts))
            {
                if (serviceHosts.Count > 0)
                {
                    var index = random.Next(serviceHosts.Count);
                    request.RequestUri = new Uri(new Uri(serviceHosts[index]), request.RequestUri.PathAndQuery);
                }
            }
            return await base.SendAsync(request, cancellationToken).ConfigureAwait(false);
        }
    }
}

View Code

  使用方法

  为了简单, 我为新建的WebApi 增加了一个 HelloController, 提供 SayHelloService 服务, 并把自己注册到Consul.

  当我们访问这个WebApi的 /WeatherForecast 时, 其Get()方法会访问 http://SayHelloService/Hello/NetCore, 这就相当于一次远程调用, 只是调用的就是这个WebApi的/Hello/NetCore

  1. appsettings.json 增加

"ConsulDiscoveryOptions": {
    "ConsulServerSetting": {
      "IP": "127.0.0.1", // 必填
      "Port": 8500, // 必填
      "RefreshIntervalInMilliseconds": 1000
    },
    "ServiceRegisterSetting": {
      "ServiceName": "SayHelloService", // 必填
      "ServiceIP": "127.0.0.1", // 必填
      "ServicePort": 5000, // 必填
      "ServiceScheme": "http", // 只能是http 或者 https, 默认http, 
      "HealthCheckRelativeUrl": "/HealthCheck",
      "HealthCheckIntervalInMilliseconds": 500,
      "HealthCheckTimeOutInMilliseconds": 2000
    }
  }

  2.修改Startup.cs

using ConsulDiscovery.HttpClient;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

namespace WebApplication1
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();

            // 注册 ConsulDiscovery 相关配置
            services.AddConsulDiscovery(Configuration);
            // 配置 SayHelloService 的HttpClient
            services.AddHttpClient("SayHelloService", c =>
                {
                    c.BaseAddress = new Uri("http://SayHelloService");
                })
                .AddHttpMessageHandler<DiscoveryHttpMessageHandler>();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            // 启动 ConsulDiscovery
            app.StartConsulDiscovery(lifetime);
        }
    }
}

  3. 添加 HelloController

using Microsoft.AspNetCore.Mvc;

namespace WebApplication1.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class HelloController : ControllerBase
    {
        [HttpGet]
        [Route("{name}")]
        public string Get(string name)
        {
            return $"Hello {name}";
        }
    }
}

  4. 修改WeatherForecast

using Microsoft.AspNetCore.Mvc;
using System.Net.Http;
using System.Threading.Tasks;

namespace WebApplication1.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {
        private readonly IHttpClientFactory httpClientFactory;

        public WeatherForecastController(IHttpClientFactory httpClientFactory)
        {
            this.httpClientFactory = httpClientFactory;
        }

        [HttpGet]
        public async Task<string> Get()
        {
            var httpClient = httpClientFactory.CreateClient("SayHelloService");
            var result = await httpClient.GetStringAsync("Hello/NetCore");
            return $"WeatherForecast return:           {result}";
        }
    }
}

  5. 启动consul

consul agent -dev

  6. 启动 WebApplication1 并访问 http://localhost:5000/weatherforecast

C# HttpClient 使用 Consul 发现服务

  以上示例可以到 https://github.com/zhouandke/ConsulDiscovery.HttpClient 下载, 请记住一定要 启动consul: consul agent -dev

  End

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Easter79 Easter79
2年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这