mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix data race on err result in FetchAndWriteNeedle
The local-write and replica-write goroutines all wrote the named err return under an unsynchronized err==nil check. Give each goroutine its own error slot and combine after wg.Wait(): local error wins, then the first replica failure.
This commit is contained in:
@@ -237,6 +237,8 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var localErr error
|
||||
replicaErrs := make([]error, len(req.Replicas))
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -250,18 +252,16 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
|
||||
n.LastModified = uint64(time.Now().Unix())
|
||||
n.SetHasLastModifiedDate()
|
||||
if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil {
|
||||
if err == nil {
|
||||
err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, localWriteErr)
|
||||
}
|
||||
localErr = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, localWriteErr)
|
||||
} else {
|
||||
resp.ETag = n.Etag()
|
||||
}
|
||||
}()
|
||||
if len(req.Replicas) > 0 {
|
||||
fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie)
|
||||
for _, replica := range req.Replicas {
|
||||
for i, replica := range req.Replicas {
|
||||
wg.Add(1)
|
||||
go func(targetVolumeServer string) {
|
||||
go func(idx int, targetVolumeServer string) {
|
||||
defer wg.Done()
|
||||
uploadOption := &operation.UploadOption{
|
||||
UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()),
|
||||
@@ -275,19 +275,27 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
|
||||
}
|
||||
|
||||
uploader, uploaderErr := operation.NewUploader()
|
||||
if uploaderErr != nil && err == nil {
|
||||
err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr)
|
||||
if uploaderErr != nil {
|
||||
replicaErrs[idx] = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr)
|
||||
return
|
||||
}
|
||||
|
||||
if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil {
|
||||
err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
|
||||
if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil {
|
||||
replicaErrs[idx] = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
|
||||
}
|
||||
}(replica.Url)
|
||||
}(i, replica.Url)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// local write error wins; otherwise surface the first replica failure
|
||||
err = localErr
|
||||
for _, replicaErr := range replicaErrs {
|
||||
if err == nil {
|
||||
err = replicaErr
|
||||
}
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user