Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
for {
select {
case <-rr.ctx.Done():
rr.mu.Lock()
rr.done = true
rr.mu.Unlock()
return
case <-rr.managerRetry:
return
Expand Down Expand Up @@ -1349,7 +1351,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
rr.mu.Unlock()
}

rr.mu.Lock()
rr.objectSize = size
rr.mu.Unlock()

go streamManager()
go streamReceiver()
Expand Down Expand Up @@ -1424,8 +1428,12 @@ func (r *gRPCBidiReader) reopenStream(failSpec []rangeSpec) error {

// Add will add current range to stream.
func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) {
if offset > mr.objectSize {
callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", mr.objectSize))
mr.mu.Lock()
objectSize := mr.objectSize
mr.mu.Unlock()

if offset > objectSize {
callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", objectSize))
return
}
if limit < 0 {
Expand Down Expand Up @@ -1463,8 +1471,10 @@ func (mr *gRPCBidiReader) close() error {
if mr.cancel != nil {
mr.cancel()
}
mr.mu.Lock()
mr.done = true
mr.activeTask = 0
mr.mu.Unlock()
mr.closeReceiver <- true
mr.closeManager <- true
return nil
Expand Down Expand Up @@ -1877,11 +1887,11 @@ type gRPCBidiReader struct {
closeManager chan bool
managerRetry chan bool
receiverRetry chan bool
mu sync.Mutex // protects all vars in gRPCBidiReader from concurrent access
mp map[int64]rangeSpec // always use the mutex when accessing the map
mu sync.Mutex // protects map from concurrent access.
done bool
activeTask int64
objectSize int64
done bool // always use the mutex when accessing this variable
activeTask int64 // always use the mutex when accessing this variable
objectSize int64 // always use the mutex when accessing this variable
retrier func(error, string)
streamRecreation bool // This helps us identify if stream recreation is in progress or not. If stream recreation gets called from two goroutine then this will stop second one.
}
Expand Down