Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Golang message streaming practice in Eleme

Golang message streaming practice in Eleme

halfrost

March 15, 2019
Tweet

More Decks by halfrost

Other Decks in Programming

Transcript

  1. Taco⽴项背景及⺫标 • 背景 • ⽼项⺫moses系统的缺陷 • 没有⾃有通道,当个推通道出现问题时,推送功能不可⽤ • 国内android⼤⼚禁⽌app链式启动,个推失去了相互唤醒的保活机制,推送触达下降 严重

    • APNS通道在美国布署,推送质量⽆法保证 • 项⺫由Python编写,占⽤资源较多 • 需要⼀个性能更⾼、低延迟、⾃建⻓连接的推送系统 • 愿景 • 建⽴推送平台,统⼀公司的所有推送系统 • 功能: • 数据的上⾏(⽀持Android, iOS, H5) • 数据的下⾏(⽀持Android, iOS, H5) • 配置下发(⽀持Android, iOS)
  2. 客户端的展⽰模型 • 客户端状态 • Taco SDK不在线 • Taco SDK在线,⽤户App在后台 •

    Taco SDK在线,⽤户App在前台 • 可靠推送 VS ⽤户体验 • 在优先保证可靠推送的情况下,保证⽤户体验 • Taco SDK在线时,优化使⽤⾃建的⻓连通道 • Taco服务端推送多条消息时,由客户端做去重处理 • ⽤户App在前台时,不弹出系统通知
  3. App状态 ⾏为 Taco SDK不在 线 由第三⽅通道SDK展⽰通知,⽤户点击后(App/Taco SDK激 活,转到前台),再将业务数据传到App。 Taco SDK在线,

    App在后台 Taco SDK弹出系统通知,⽤户点击通知拉起应⽤或主动切换到 前台时,将业务数据传到App Taco SDK在线, App在前台 将业务数据传到App 通知类型 App状态 ⾏为 Taco SDK不在 线 不展⽰系统通知。第三⽅通道SDK接收到数据后,若此时App 在线,将数据传给App,否则,丢弃。 Taco SDK在线, App在后台 不展⽰系统通知,Taco SDK将业务数据透传给App Taco SDK在线, App在前台 不展⽰系统通知,Taco SDK将业务数据透传给App 透传类型
  4. 如何将客户端路由数据同步到多个机房? • 现状 • ⽤户链接信息的存储在Redis,但Redis不⽀持多机房同 步 • 解决思路 a. 将路由信息存储在MySQL中,由db负责同步。

    Redis只做cache提⾼查询速度。 b. 调研使⽤⼀个⽀持多数据中⼼复制的Cache系统, 如Acrospike, 携程开源的x-pipe c. 程序⾃⼰处理多数据中⼼的Redis的同步。 • 写多份、删多份 • ⽤⼀个proxy来处理多个数据中⼼的Redis读写操作,⽐如改造 Codis d. 根据⽤户的地理位置,对⽤户进⾏分区。对某个⽤户 的连接、推送、上⾏都转到其归属idc处理。
  5. 期望的 Replicator 组件 Redis 集群 应用 Keeper DC1 伪Slave,实 现Redis复制

    协议 Redis 集群 应用 Replicator DC2 持久化并推送变更到 多个机房 Keeper 复制变更 Replicator 复制变更 缓存复制组件的需求: • ⾼可靠,数据不丢 • 对应⽤的低耦合 • 秒级同步 处理的情况 • 相同key更新的冲突 处理 • 循环复制
  6. Redis DRC 组件 Redis 集群 应用 Replicator Redis 集群 应用

    Replicator DC1 DC2 Kafka Kafka 同步流程 • 各机房的Replicator订阅 别的机房的Kafka消息 • 应⽤写本机房Redis(另外 写⼀条记录操作的时间 戳),写⼀条操作数据到 本机房Kafka • 异地机房的Replicator收 到Kafka消息后,与机房 的操作记录⽐对,如果 key相同且⽐它操作时间 早,则执⾏复制操作,否 则,丢弃。 复制变更 复制变更
  7. • 我们的做法 • 实现了⼀个Redis DRC组件(利⽤MQ做跨机房同步) • 感想 • 对程序侵⼊性⽐较⼤,不合适 •

    后续的改进 • 推动公司的缓存组件EKV⽀持跨机房同步 • ⾃研跨IDC复制的缓存组件 • 多活环境下,理想的存储组件 (Yahoo!Pnuts) • ⽀持多IDC复制 • 可指定某个key/记录的归属IDC • 读操作:只从本key归属的IDC查询 • 写操作:当写的key不是它归属的IDC时,写到IDC后, 需同步复制到其归属的IDC,再返回。当写的key是其 归属IDC时,就写IDC,返回,再异步复制到其它IDC • ⽀持key/记录的归属IDC变更
  8. 多活 • 多活的⺫标 • 突破单机房的物理容量限制 • ⽤户就近接⼊,提升服务品质 • 异常切换: 机房发⽣灾难/容量受限时,切部分流量到另⼀个机房

    • 多活不等于多写 • IDC流量切分(uid\loc\shopid) • 订单在同⼀个IDC中完成流转、避免IDC多写 • 避免了数据冲突的发⽣ • 每个机房保存全量(所有IDC)的数据(通过复制组件保证) • 限制 • 复制组件可能延迟或者中断,故业务逻辑不应该强依赖于复制
  9. • 客户端的多活路由如何做? • ⽅案1: • 客户端选择连接不同地域的⺴关服务 • ⺴关选择就近的内⺴机房 • shard切换时,通知客户端。客户端断开连接,并连到新的服务器

    • ⽅案2: • 通过DNS将各地域的客户端解析到相应的机房 • 根据shard key/value连接到相应的内⺴机房 • shard切换时,只修改⺴关到内⺴机房的路由 • 服务端MySQL的多活路由 • 特点:推送是由服务端发起的,没法知道客户端路由到哪个机房。 • ⽅案: • 每个消息有⼀个全局的MsgID,其中嵌⼊了Shard ID • 推送时,选择就近机房操作MySQL,并随机选择⼀个本机房的ShardID嵌⼊MsgID。 • ⺴关在收到ACK时,从MsgID中提取出ShardID,根据GZS查询到它对应的EZONE,然 后调⽤对应机房的服务。 • 多活切换 • 若某个内部机房挂掉 • 通知GZS修改指向: 将受影响的shard切到另⼀个内部机房 • 若某个云机房挂掉 • DNS切换: 将域名映射切到另⼀个云机房
  10. 后端技术栈 • 语⾔: Golang • RPC框架: gRPC • 存储: MySQL/ElasticSearch/Redis/Hive/MaxQ

    • ⽇志收集: ELK • 链路跟踪: ETrace • 服务注册/更新/治理: Huskar • 埋点监控: Statsd/Influxdb/Grafana • 报警: watch dogger
  11. • 崩溃原因/现象:close_wait状态的连接太多,内存占⽤过⼤,被 OS杀了 • 触发时机:断⺴演练时切DNS(全量到北京机房) • 处理过程 (1) 调整了⼏个参数(keepalive相关),观察了⼀会,close_wait下降很快,内存下 来了。

    (2) (第⼆天)观察到问题没有改善,现象仍旧存在 (3) 把DNS切回上海机房 (4) 回滚到前⼀个版本,没有解决 (5) 为了减少影响,在低峰期重启了所有⺴关进程 (6) 基本定位close_wait的原因是⼼跳定时管理器的bug导致(连接没有正常关闭)。 改了⼏版,快速上线。 (7) 观察发现ack曲线不正常(没有数据),go routine数也⼀直在增⻓。怀疑是⺴关 ack/openack的逻辑有问题,验证了下,的确有问题。 (8) 梳理ack流程的代码,将有可能导致ack失败的代码修改(异步调⽤->同步调⽤), 上线。 (9) 现象没有改善(ack曲线不正常),⾼峰期来了,降级(⻓连接->第三⽅通道),紧 急扩容了4台机器。 (10) 通过观察协程栈信息,发现有⼤量协程阻塞在gRPC内部函数中。给RPC调⽤加 上了超时时间,上线。
  12. 阻塞在 func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream,

    err error) { …… sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) …… gRPC基础知识 • 传输协议 HTTP2 • (默认)与对端服务只建⼀个TCP连接 • 连接的多路复⽤:每发起⼀个RPC调⽤,会创建⼀个stream,然后 ⼀个⼀个Frame发过去 配额streamsQuota(默认100) • gRPC client通过streamsQuota 限制同⼀时候并发创建stream的个数 (即限制了并⾏发起RPC调⽤的个数)。 原因 • RPC调⽤的处理耗时⾼,⾼峰期的并发请求多,导致并发创建stream 太多,所以在Newstream()时⼀直阻塞着。 解决⽅法 • RPC调⽤加上超时时间 • 优化后端的处理耗时
  13. • 事故改进措施 • 重写⼼跳定时管理器模块,严格测试 • 提⾼系统的⾃我保护能⼒。对每个gRPC调⽤都设置超 时(默认5秒,可配置),加上监控 • 制定⺴关异常的的处理措施,撰写SOP⽂档 •

    应⽤机器加上报警(cpu,memory,disk,net in/out) • 感想 • 系统的核⼼模块要经过严格测试 • 发⽣故障时,测试必须在低峰期进⾏,减少对业务的影响 • 线上出现重⼤故障时,代码、架构都是次要的,主要看运维操作
  14. • 故障影响:消息推送延迟超过 30 分钟 • 触发时机:多活切换时把所有流量打到了上海机房 • 处理过程 (1) 开始定位问题,由于近期上线了⼀个Q消费限流的功

    能,怀疑此变更导致的问题。再三确认后,实现逻辑 没有问题。 (2) 通知多活切换切回来 (3) 确认时光机发现⽆变更,怀疑是依赖存储的容量问题, 确认了下,没问题。 (4) 观察Q的消费曲线:堆积量⼀直在增加,但是消费速 度没涨。和Q的同事确认后,找到Root Cause。 (5) 堆积的消息慢慢消费完了,服务恢复。
  15. • Root Cause • 核⼼Queue的消费者数只有2个(太少),并发消费能⼒ 不⾜,触发Q的磁盘存储模式(为了换取更⼤消息堆积 量),使消费者进⼊慢消费状态,加剧了消息的堆积 • 改进措施 •

    Q的使⽤姿势改造: 扩⼤关键Queue的连接数(动态可 配置) • 重要的业务⽅分Queue隔离,量⼩的共⽤⼀个Queue • 对消息分级(amqp优先级)处理 • 制定SOP • 改进⽅向 • 业务⽅对我们服务的调⽤量是不可预知的,根据当前的负载动态 调整处理能⼒(机器数,docker动态扩容)
  16. 排查过程 (1) 根据goroutine stack上的信息,初步判断是多协程操作 同⼀个变量,导致slice跨界访问(slice out of bound) (2) 检查了⼏遍代码,没找到多协程race

    condition的迹象, 怀疑是sync.pool中重复利⽤bufio.Reader时,没有将 buf索引置0。修改完,上线。 (3) 现象仍在,尝试在测试环境复现,复现不出来。 (4) 在索引跨界访问的代码处,加log,上报埋点。了解在崩 溃前的细节(每个索引的值/由哪些函数变更了索引)。修改 完,上线。 (5) 在测试环境运⾏-race编译的⽂件,来记录崩溃时的细节, 但-race会导致内存/cpu使⽤增⼤数倍,导致机器内存耗 光僵死,⽆奈放弃。 (6) 源码啃了⼏遍,⺴上查询相关的pr,⽆果。 (7) 线上复现后,拿到log分析,发现经常有连续两次的跨界 访问导致的崩溃,怀疑使⽤sync.pool时存在race condition(⼀个协程归还对象到sync.pool,但另⼀个协 程还在使⽤此对象)。去掉sync.pool,修改上线。
  17. • Root Cause 在serve websocket连接时,先从sync.pool中拿到 bufio.Reader,再启⼀个协程来处理这个连接。 readBufPool = &sync.Pool{ New:

    func() interface{} { return bufio.NewReaderSize(nil, BufSize) }, } func (s Server) serveWebSocket(w http.ResponseWriter, req *http.Request) { rwc, _, err := w.(http.Hijacker).Hijack() rb := readBufPool.Get().(*bufio.Reader) wb := writeBufPool.Get().(*bufio.Writer) rb.Reset(rwc) wb.Reset(rwc) buf := bufio.NewReadWriter(rb, wb) conn, err := newServerConn(rwc, buf, req, &s.Config, s.Handshake) …… conn = newHybiConn(&s.Config, buf, rwc, req) go func() { s.Handler(conn) rwc.Close() readBufPool.Put(rb) }() 在s.Handler函数中,先设置⼼跳定时器,登录,启⼀个 读协程,本协程进⾏写操作。 func (sess *session) Serve() { // 设置⼼跳超时定时器…... go sess.SignIn() // 登录 <-sess.signInDone if sess.id == 0 { goto done } if sess.s.opts.Debug || sess.debug { log.Infof(“sess: sign in success, %s”, sess) } sess.resetHB() sess.s.putSession(sess) go sess.loopRead() // 读协程 sess.loopWrite() // 写协程 done: sess.clean() } 注:读协程会⼀直使⽤传⼊的bufio.Reader。 虽然读协程和写协程使⽤了同步设施,保证有⼀⽅退出, 另⼀⽅也会退出,但不保证退出的时序。 如果本协程(写协程)先退出,会先将rb归还到sync.pool, ⽽读协程可能还在使⽤rb对象,就会产⽣Race Condition。
  18. Context 接⼝原型: type Context interface { Deadline() (deadline time.Time, ok

    bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} } context package的public function: func WithCancel(parent Context) (ctx Context, cancel CancelFunc) func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) func WithValue(parent Context, key interface{}, val interface{}) Context
  19. Context 原理-key/value存取 func doFunc(ctx context.Context, …) { …… newCtx :=

    context.WithValue(ctx, key, value) doChildFunc(newCtx, …) ...... } • Context上下⽂数据是⼀个树结 构,每个结点只保存⼀个key- value对。 • 查询时,如果本结点查不到,则 递归向⽗结点查询,直到查询成 功或失败。
  20. Context 取消控制 Ctx1 Ctx2 Ctx3 Ctx5 Ctx6 Ctx7 Ctx4 Ctx8

    Ctx9 当cancel掉某个结点cancelCtx,会cancel本结点和其⼦树的所有Context cancel
  21. 某天,为了⽀持ETrace,需要在gRPC调⽤过 程中传递RequestID, RpcID,我们的解决⽅ 案是Context。修改后,遇到⼀系列的坑...... 补充背景 所有mysql、maxq、redis的操作接⼝前都会加上context,如果这个 context(或其⽗context)被cancel了,则操作会失败。 func (tx *Tx)

    QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) func(process func(context.Context, redis.Cmder) error) func(context.Context, redis.Cmder) error func (ch *Channel) Consume(ctx context.Context, handler Handler, queue string, dc <-chan amqp.Delivery) error func (ch *Channel) Publish(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (err error)
  22. Case 1 • 现象 上线后,5分钟后所有⽤户登录失败,不断收到报警。 • 原因 内部有⼀个localCache,每5分钟Refresh(调⽤注册的回调函数)⼀ 次。 回调函数原型

    func getAppIDAndAlias(ctx context.Context, appKey, appSecret string) (string, string, error) 第一次cache.Get(ctx, appKey, appSeret)传的ctx是gRPC call传进来的 context,而gRPC在请求结束或失败时会cancel掉context,导致之后 cache Refresh()时,执行失败。 • 解决⽅法 在Refresh时不使⽤缓存的context,使⽤⼀个不会cancel的 context
  23. Case 2 • 现象 上线后,不断收到报警(sys err过多)。看log/etrace产⽣2种sys err: l context canceled

    l sql: Transaction has already been committed or rolled back • 原因 出现context canceled的场景: gRPC handler处理过程中连接被 断开。
  24. 客户端 gRPC-gateway gRPC-client gRPC-server 3 2 1 4 • 客户端发送http

    restful请求 • gRPC-gateway与客户端建⽴连接,接收请求,转换参数,调⽤后⾯的grpc- server。 • gRPC-server处理请求。其中,gRPC-server会对每个请求启⼀个stream,由 这个stream创建context。 • 客户端连接断开 • gRPC-gateway收到连接断开的信号,导致context cancel。gRPC client 在发送rpc请求后由于外部异常使它的请求终⽌了(即它的context被cancel), 会发⼀个RST_STREAM。 • gRPC server收到后,⻢上终⽌请求(即gRPC server的stream context被 cancel)。 连接断开 RST_STREAM
  25. Root Cause • 出现sql: Transaction has already been committed or

    rolled back的原因: 在db.BeginTx时,会启⼀个协程awaitDone: func (tx *Tx) awaitDone() { // Wait for either the transaction to be committed or rolled // back, or for the associated context to be closed. <-tx.ctx.Done() // Discard and close the connection used to ensure the // transaction is closed and the resources are released. This // rollback does nothing if the transaction has already been // committed or rolled back. tx.rollback(true) } 在context被cancel时,会进⾏rollback(),⽽rollback时,会操作原⼦变量。 之后,在另⼀个协程中tx.Commit()时,会判断原⼦变量,如果变了,会抛出此错误。 • 解决⽅法 这2个error都是由连接断开导致的,是正常的。将这2个错误转成user error。
  26. 处理过程 (1) 初步怀疑是跨机房的多个事务操作同⼀条记录导致的。 (2) 将某个接⼝降级,降低多个事务操作同⼀记录的概率。 (3) 减少事务的个数 • 将单条sql的事务去掉 •

    通过业务逻辑的转移减少不必要的事务 (4) 调整MySQL Innodb参数: innodb_lock_wait_timeout(120s->50s) • 考虑调整单个事务的超时时间 (5) 使⽤Redis实现⼀个分布式锁:同⼀时间,只能有⼀个事务操作这条记录,操作完后 再放下⼀个事务。 (6) DAL同学报有事务没提交,查看代码,找到root cause
  27. Root Cause 在开始事务BeginTxx() 会启⼀个协程: https://github.com/golang/go/blob/master/src/database/sql/sql.go# L1595 // awaitDone blocks until

    the context in Tx is canceled and rolls back // the transaction if it's not already done. func (tx *Tx) awaitDone() { // Wait for either the transaction to be committed or rolled // back, or for the associated context to be closed. <-tx.ctx.Done() // Discard and close the connection used to ensure the // transaction is closed and the resources are released. This // rollback does nothing if the transaction has already been // committed or rolled back. tx.rollback(true) } 在rollback(true)中,会先判断原⼦变量tx.done是否为1,如果1, 则返回;如果是0,则加1,并进⾏rollback操作。 在提交事务Commit()时,会先操作原⼦变量 tx.done,然后判断context是否被cancel了,如 果被cancel,则返回;如果没有,则进⾏commit 操作。 // Commit commits the transaction. func (tx *Tx) Commit() error { if !atomic.CompareAndSwapInt32(&tx.do ne, 0, 1) { return ErrTxDone } select { default: case <-tx.ctx.Done(): return tx.ctx.Err() } var err error withLock(tx.dc, func() { err = tx.txi.Commit() }) if err != driver.ErrBadConn { tx.closePrepared() } tx.close(err) return err } 如果先进⾏commit()过程中,先操作原⼦变量,然 后context cancel,之后 另⼀个协程在进⾏ rollback()会因为原⼦变量置为1⽽返回。导致 commit()没有执⾏,rollback()也没有执⾏。
  28. • 处理⽅法 • 推荐做法: 给官⽅go/src/database/driver/sql提pr (已提pr,合⼊Go 1.9.2) • 不⽤transaction •

    不⽤gRPC handler传进来的context,使⽤⼀个不会 取消的context • 让⼯具组修改源码后编译⼀个go的基础镜像,在CI构 建时指定使⽤这个镜像 • 事务阻塞的原因 • 某个事务执⾏的时候太⻓,阻塞了其他事务 • 某个事务没提交,也没回滚
  29. Context 使⽤姿势 • 不要把Context存在⼀个结构体中,显式地传⼊函数。Context变量需要 作为第⼀个参数使⽤,⼀般命名为ctx • 即使⽅法允许,也不要传⼊⼀个nil的Context,如果你不确定你要⽤什么 Context的时候传⼀个context.TODO • 使⽤context的Value相关⽅法只应该⽤于在程序和接⼝中传递和请求相

    关的元数据,不要⽤它来传递⼀些可选的参数 • Context是协程安全的: 同样的Context可以传递到不同的routine中 • 最重要 • 当下游routine依赖传⼊的context做取消控制时,要搞清楚传⼊ 的ctx何时会被Cancel。如果还没搞清楚,保险起⻅,使⽤⼀个 默认的context(Background,TODO)传⼊更好!
  30. gRPC Context FAQ Q: WithValue()只能修改并⽣成⼀个新的⼦结点,但是我想让我做的值修改在多个routine(包括⽗结点)中可⻅,怎么做呢? A:⺫前这个做不到。 Q: gRPC可以通过ctx传递跨进程 上下⽂数据吗?cancel信号? A:可以传k-v对(通过incoming,

    outGoing metadata),但不能传cancel信号。 Q: gRPC 传到内部的ctx是外部传过来的ctx,还是stream⽣成的ctx? A: gRPC server的context是由内部stream⽣成的,⽽gRPC client的内部context是 由外部(handler)的context传递进去的。 Q: gRPC client调⽤过程中context被cancel了,会导致gRPC server请求终⽌? A: gRPC client在发送RPC请求后由于外部异常使它的请求终⽌了(即它的context被cancel),它会发⼀个RST_STREAM,对⽅ gRPC server收到后,会⻢上终⽌请求(即导致gRPC server的stream context被cancel)。 gRPC serverstream.ctx被cancel的时机: 1)连接异常断开 2)数据格式读取不对 3)请求处理完(不管成功或失败) 4)收到⼀个RST_STREAM,即通知这个请求终⽌了。
  31. 总结: 经验/最佳实践 • 理解系统中各个组件的原理、特点 • 在引⽤外部库时持谨慎的态度。若引⼊,原理要吃透 (啃源码,pr,issue) • 善⽤pprof •

    异常处理 • 梳理系统中关键路径,Fatal Error加监控、报警,输出runtime stack • 在程序(主逻辑)⼊⼝处捕捉panic(提⾼系统健壮性),加监控、报警,输出runtime stack • 对错误/异常进⾏分级(user/sys/unknown) • 善⽤defer • 并发 • 留⼼协程是否正常退出、资源是否正常释放(协程泄露) • 善⽤并发设施(errgroup,waitgroup,select,atomic,sync.once) • Go的多协程运⾏有调度的开销,要综合考量。性能关键处,还是⽤共享内存+同步 • 完善的监控、报警,各种粒度的限流、降级、熔断 • ⾃动化测试(单元测试、集成测试、压⼒测试) • 编码规范、提交规范、Code Review
  32. Q&A