Skip to content

Commit 7d21f4f

Browse files
committed
grpc: support channel idleness
1 parent 1db474c commit 7d21f4f

12 files changed

+1459
-146
lines changed

balancer_conn_wrappers.go

Lines changed: 191 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ import (
3232
"google.golang.org/grpc/resolver"
3333
)
3434

35+
type ccbMode int
36+
37+
const (
38+
ccbModeActive = iota
39+
ccbModeIdle
40+
ccbModeClosed
41+
ccbModeExitingIdle
42+
)
43+
3544
// ccBalancerWrapper sits between the ClientConn and the Balancer.
3645
//
3746
// ccBalancerWrapper implements methods corresponding to the ones on the
@@ -46,16 +55,25 @@ import (
4655
// It uses the gracefulswitch.Balancer internally to ensure that balancer
4756
// switches happen in a graceful manner.
4857
type ccBalancerWrapper struct {
49-
cc *ClientConn
58+
// The following fields are initialized when the wrapper is created and are
59+
// read-only afterwards, and therefore can be accessed without a mutex.
60+
cc *ClientConn
61+
opts balancer.BuildOptions
5062

5163
// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
52-
// mutually exclusive manner as they are scheduled on the
53-
// CallbackSerializer. Fields accessed *only* in serializer callbacks, can
54-
// therefore be accessed without a mutex.
55-
serializer *grpcsync.CallbackSerializer
56-
serializerCancel context.CancelFunc
57-
balancer *gracefulswitch.Balancer
58-
curBalancerName string
64+
// mutually exclusive manner as they are scheduled in the serializer. Fields
65+
// accessed *only* in these serializer callbacks, can therefore be accessed
66+
// without a mutex.
67+
balancer *gracefulswitch.Balancer
68+
curBalancerName string
69+
70+
// mu guards access to the below fields. Access to the serializer and its
71+
// cancel function needs to be mutex protected because they are overwritten
72+
// when the wrapper exits idle mode.
73+
mu sync.Mutex
74+
serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
75+
serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
76+
mode ccbMode // Tracks the current mode of the wrapper.
5977
}
6078

6179
// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
@@ -64,6 +82,7 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc
6482
ctx, cancel := context.WithCancel(context.Background())
6583
ccb := &ccBalancerWrapper{
6684
cc: cc,
85+
opts: bopts,
6786
serializer: grpcsync.NewCallbackSerializer(ctx),
6887
serializerCancel: cancel,
6988
}
@@ -74,8 +93,12 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc
7493
// updateClientConnState is invoked by grpc to push a ClientConnState update to
7594
// the underlying balancer.
7695
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
96+
ccb.mu.Lock()
7797
errCh := make(chan error, 1)
78-
ccb.serializer.Schedule(func(_ context.Context) {
98+
// Here and everywhere else where Schedule() is called, it is done with the
99+
// lock held. But the lock guards only the scheduling part. The actual
100+
// callback is called asynchronously without the lock being held.
101+
ok := ccb.serializer.Schedule(func(_ context.Context) {
79102
// If the addresses specified in the update contain addresses of type
80103
// "grpclb" and the selected LB policy is not "grpclb", these addresses
81104
// will be filtered out and ccs will be modified with the updated
@@ -92,16 +115,19 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
92115
}
93116
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
94117
})
95-
96-
// If the balancer wrapper is closed when waiting for this state update to
97-
// be handled, the callback serializer will be closed as well, and we can
98-
// rely on its Done channel to ensure that we don't block here forever.
99-
select {
100-
case err := <-errCh:
101-
return err
102-
case <-ccb.serializer.Done:
103-
return nil
118+
if !ok {
119+
// If we are unable to schedule a function with the serializer, it
120+
// indicates that it has been closed. A serializer is only closed when
121+
// the wrapper is closed or is in idle.
122+
ccb.mu.Unlock()
123+
return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
104124
}
125+
ccb.mu.Unlock()
126+
127+
// We get here only if the above call to Schedule succeeds, in which case it
128+
// is guaranteed that the scheduled function will run. Therefore it is safe
129+
// to block on this channel.
130+
return <-errCh
105131
}
106132

107133
// updateSubConnState is invoked by grpc to push a subConn state update to the
@@ -120,21 +146,19 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti
120146
if sc == nil {
121147
return
122148
}
149+
ccb.mu.Lock()
123150
ccb.serializer.Schedule(func(_ context.Context) {
124151
ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
125152
})
126-
}
127-
128-
func (ccb *ccBalancerWrapper) exitIdle() {
129-
ccb.serializer.Schedule(func(_ context.Context) {
130-
ccb.balancer.ExitIdle()
131-
})
153+
ccb.mu.Unlock()
132154
}
133155

134156
func (ccb *ccBalancerWrapper) resolverError(err error) {
157+
ccb.mu.Lock()
135158
ccb.serializer.Schedule(func(_ context.Context) {
136159
ccb.balancer.ResolverError(err)
137160
})
161+
ccb.mu.Unlock()
138162
}
139163

140164
// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
@@ -148,42 +172,141 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {
148172
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
149173
// the graceful balancer switching process if the name does not change.
150174
func (ccb *ccBalancerWrapper) switchTo(name string) {
175+
ccb.mu.Lock()
151176
ccb.serializer.Schedule(func(_ context.Context) {
152177
// TODO: Other languages use case-sensitive balancer registries. We should
153178
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
154179
if strings.EqualFold(ccb.curBalancerName, name) {
155180
return
156181
}
182+
ccb.buildLoadBalancingPolicy(name)
183+
})
184+
ccb.mu.Unlock()
185+
}
157186

158-
// Use the default LB policy, pick_first, if no LB policy with name is
159-
// found in the registry.
160-
builder := balancer.Get(name)
161-
if builder == nil {
162-
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
163-
builder = newPickfirstBuilder()
164-
} else {
165-
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
166-
}
187+
// buildLoadBalancingPolicy performs the following:
188+
// - retrieve a balancer builder for the given name. Use the default LB
189+
// policy, pick_first, if no LB policy with name is found in the registry.
190+
// - instruct the gracefulswitch balancer to switch to the above builder. This
191+
// will actually build the new balancer.
192+
// - update the `curBalancerName` field
193+
//
194+
// Must be called from a serializer callback.
195+
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
196+
builder := balancer.Get(name)
197+
if builder == nil {
198+
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
199+
builder = newPickfirstBuilder()
200+
} else {
201+
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
202+
}
203+
204+
if err := ccb.balancer.SwitchTo(builder); err != nil {
205+
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
206+
return
207+
}
208+
ccb.curBalancerName = builder.Name()
209+
}
210+
211+
func (ccb *ccBalancerWrapper) close() {
212+
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
213+
ccb.handleCloseAndEnterIdle(ccbModeClosed)
214+
}
215+
216+
// enterIdleMode is invoked by grpc when the channel enters idle mode upon
217+
// expiry of idle_timeout. This call blocks until the balancer is closed.
218+
func (ccb *ccBalancerWrapper) enterIdleMode() {
219+
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
220+
ccb.handleCloseAndEnterIdle(ccbModeIdle)
221+
}
222+
223+
// handleCloseAndEnterIdle is invoked when the channel is being closed or when
224+
// it enters idle mode upon expiry of idle_timeout.
225+
//
226+
// This call is not scheduled on the serializer because we need to ensure that
227+
// the current serializer is completely shutdown before the next one is created
228+
// (when exiting idle).
229+
func (ccb *ccBalancerWrapper) handleCloseAndEnterIdle(m ccbMode) {
230+
ccb.mu.Lock()
231+
if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
232+
ccb.mu.Unlock()
233+
return
234+
}
235+
236+
// Close the serializer to ensure that no more calls from gRPC are sent
237+
// to the balancer.
238+
ccb.serializerCancel()
239+
ccb.mode = m
240+
done := ccb.serializer.Done
241+
b := ccb.balancer
242+
ccb.mu.Unlock()
167243

168-
if err := ccb.balancer.SwitchTo(builder); err != nil {
169-
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
244+
b.Close()
245+
<-done
246+
}
247+
248+
// exitIdleMode is invoked by grpc when the channel exits idle mode either
249+
// because of an RPC or because of an invocation of the Connect() API. This
250+
// recreates the balancer that was closed previously when entering idle mode.
251+
//
252+
// If the channel is not in idle mode, we know for a fact that we are here as a
253+
// result of the user calling the Connect() method on the ClientConn. In this
254+
// case, we can simply forward the call to the underlying balancer, instructing
255+
// it to reconnect to the backends.
256+
func (ccb *ccBalancerWrapper) exitIdleMode() {
257+
ccb.mu.Lock()
258+
if ccb.mode == ccbModeClosed {
259+
// Request to exit idle is a no-op when wrapper is already closed.
260+
ccb.mu.Unlock()
261+
return
262+
}
263+
264+
if ccb.mode == ccbModeIdle {
265+
// Recreate the serializer which was closed when we entered idle.
266+
ctx, cancel := context.WithCancel(context.Background())
267+
ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
268+
ccb.serializerCancel = cancel
269+
}
270+
271+
// The ClientConn guarantees that mutual exclusion between close() and
272+
// exitIdleMode(), and since we just created a new serializer, we can be
273+
// sure that the below function will be scheduled.
274+
done := make(chan struct{})
275+
ccb.serializer.Schedule(func(_ context.Context) {
276+
defer close(done)
277+
278+
ccb.mu.Lock()
279+
defer ccb.mu.Unlock()
280+
281+
if ccb.mode != ccbModeIdle {
282+
ccb.balancer.ExitIdle()
170283
return
171284
}
172-
ccb.curBalancerName = builder.Name()
285+
286+
// Gracefulswitch balancer does not support a switchTo operation after
287+
// being closed. Hence we need to create a new one here.
288+
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
289+
ccb.buildLoadBalancingPolicy(ccb.curBalancerName)
290+
ccb.mode = ccbModeActive
291+
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
292+
173293
})
294+
ccb.mu.Unlock()
295+
296+
<-done
174297
}
175298

176-
func (ccb *ccBalancerWrapper) close() {
177-
// Close the serializer to ensure that no more calls from gRPC are sent to
178-
// the balancer. We don't have to worry about suppressing calls from a
179-
// closed balancer because these are handled by the ClientConn (balancer
180-
// wrapper is only ever closed when the ClientConn is closed).
181-
ccb.serializerCancel()
182-
<-ccb.serializer.Done
183-
ccb.balancer.Close()
299+
func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
300+
ccb.mu.Lock()
301+
defer ccb.mu.Unlock()
302+
return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
184303
}
185304

186305
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
306+
if ccb.isIdleOrClosed() {
307+
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
308+
}
309+
187310
if len(addrs) <= 0 {
188311
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
189312
}
@@ -200,6 +323,18 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
200323
}
201324

202325
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
326+
if ccb.isIdleOrClosed() {
327+
// It it safe to ignore this call when the balancer is closed or in idle
328+
// because the ClientConn takes care of closing the connections.
329+
//
330+
// Not returning early from here when the balancer is closed or in idle
331+
// leads to a deadlock though, because of the following sequence of
332+
// calls when holding cc.mu:
333+
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
334+
// ccb.RemoveAddrConn --> cc.removeAddrConn
335+
return
336+
}
337+
203338
acbw, ok := sc.(*acBalancerWrapper)
204339
if !ok {
205340
return
@@ -208,6 +343,10 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
208343
}
209344

210345
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
346+
if ccb.isIdleOrClosed() {
347+
return
348+
}
349+
211350
acbw, ok := sc.(*acBalancerWrapper)
212351
if !ok {
213352
return
@@ -216,6 +355,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
216355
}
217356

218357
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
358+
if ccb.isIdleOrClosed() {
359+
return
360+
}
361+
219362
// Update picker before updating state. Even though the ordering here does
220363
// not matter, it can lead to multiple calls of Pick in the common start-up
221364
// case where we wait for ready and then perform an RPC. If the picker is
@@ -226,6 +369,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
226369
}
227370

228371
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
372+
if ccb.isIdleOrClosed() {
373+
return
374+
}
375+
229376
ccb.cc.resolveNow(o)
230377
}
231378

call.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,22 @@ import (
2727
//
2828
// All errors returned by Invoke are compatible with the status package.
2929
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
30+
if err := cc.idlenessMgr.onCallBegin(); err != nil {
31+
return err
32+
}
33+
3034
// allow interceptor to see all applicable call options, which means those
3135
// configured as defaults from dial option as well as per-call options
3236
opts = combine(cc.dopts.callOptions, opts)
3337

38+
var err error
3439
if cc.dopts.unaryInt != nil {
35-
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
40+
err = cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
41+
} else {
42+
err = invoke(ctx, method, args, reply, cc, opts...)
3643
}
37-
return invoke(ctx, method, args, reply, cc, opts...)
44+
cc.idlenessMgr.onCallEnd()
45+
return err
3846
}
3947

4048
func combine(o1 []CallOption, o2 []CallOption) []CallOption {

0 commit comments

Comments
 (0)