mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-13 23:36:45 +03:00
fix(nfs): accept dirpath any-where under the export, mirroring rclone (#9291)
* fix(nfs): accept any MOUNT3 dirpath, mirroring rclone's permissive policy
weed nfs has exactly one export per process, so the MOUNT3 dirpath
argument has no second export to disambiguate against. Strict
comparison only translated PV-path typos into the inconsistent
"mount succeeds but empty" / "mount fails completely" split that
operators see.
Match rclone's serve nfs Handler.Mount: ignore the dirpath, log an INFO
line when it differs from the configured export, and always serve the
export root. Apply the same change to the UDP MOUNT3 path so kernel
clients defaulting to mountproto=udp see identical behaviour. Access
control still goes through -allowedClients / -ip.bind, and file-handle
scoping in FromHandle is unchanged so handles still cannot escape the
export.
Replace the prior single-path reject tests with table tests covering
the shapes operators commonly hit: root, parent, sibling, deeper child,
unrelated, empty, relative form, exact match, and trailing slash, at
the Handler.Mount, UDP MOUNT3, and full RPC layers.
* feat(nfs): mount at subdirectory when MOUNT3 dirpath is under the export
Make the dirpath argument meaningful when the client asks for a subtree
of the configured export. With -filer.path=/buckets, a client mounting
<server>:/buckets/data lands directly inside /buckets/data instead of
at the export root.
- dirpath equals the export root: serve the export root.
- dirpath strictly under the export, directory entry: serve that
subdirectory; the returned filehandle encodes its inode.
- dirpath strictly under the export, missing or non-directory: reject
with NoEnt or NotDir.
- dirpath outside the export: keep the rclone-style fallback to the
export root.
TCP returns a sub-rooted seaweedFileSystem and lets go-nfs's onMount
call ToHandle to encode the FH; UDP encodes the FH itself. FromHandle
is unchanged: handles are content-addressed by inode and resolve via
the inode index, so they remain stable across mounts and across
process restarts.
The trimmed permissive tests keep their outside-export shapes; new
subexport tests cover under-export directories, missing entries, and
non-directory entries on Handler.Mount, the UDP MOUNT3 wire, and
through the full RPC stack.
* nfs: propagate request context through MOUNT3 resolution
Mount now accepts the gonfs context and threads it through
resolveMountFilesystem and lstatExportStatus so a slow filer call
during MOUNT cannot outlive a cancelled or timed-out request.
lstatExportStatus uses fileInfoForVirtualPath(ctx, "/") directly
instead of billy.Filesystem.Lstat, which would otherwise drop the
context on the floor by calling fileInfoForVirtualPathWithOptions
with context.Background().
Lower the successful subexport-mount log from V(0) to V(1). The
fallback log stays at V(0) so operator typos still surface; the
success line is per-mount churn that adds up on NFS-CSI deployments.
* nfs: mirror TCP defensive checks on the UDP MOUNT3 path
Two transport-parity bugs the rabbit caught:
(1) The exact-export-root and outside-export branches were returning
MNT3_OK unconditionally, while the TCP handler runs lstatExportStatus
on those same branches. If the configured -filer.path has been
removed from the filer, TCP returns NoEnt/ServerFault but UDP would
still hand out a synthetic root handle pointing at nothing. Add
rootMountStatus as the UDP analogue and call it on both branches.
(2) resolveSubexportFileHandle did filer I/O on the single UDP serve
loop with context.Background(). One slow filer round-trip would
block every later MOUNT packet. Wrap each MOUNT call's filer work in
context.WithTimeout(mountUDPLookupTimeout) and thread that ctx
through both rootMountStatus and resolveSubexportFileHandle.
Lower the successful subexport log to V(1) to match the TCP side.
* nfs: assert TCP/UDP MOUNT3 produce byte-identical filehandles
The existing UDP subexport assertions only checked the decoded inode
and kind. A regression that drifted the generation, exportID, or
encoding format on one transport but not the other would have slipped
through. Build the TCP Handler from the same Server, drive its Mount
with the same dirpath, and require ToHandle to match the raw UDP FH
bytes for every OK case.
* nfs: take MOUNT3 dirpath as string in resolveMountFilesystem
Convert req.Dirpath to string once at the call site instead of
sprinkling string(...) casts through every log line and conversion
inside the function. Behavior unchanged.
* nfs: share rootFS lifecycle between TCP and UDP MOUNT handlers
Server.rootFilesystem() lazily constructs the seaweedFileSystem rooted
at the configured export the first time anything asks for it, then
hands the same instance to every subsequent caller. newHandler() and
mountUDPServer.rootMountStatus() now both go through it, so:
- Both transports observe the same chunk reader cache and chunk
invalidator without depending on call order during startup.
- The UDP defensive Lstat doesn't allocate a fresh wrapper per
MOUNT request anymore; one struct lives for the life of the
Server.
The sub-rooted seaweedFileSystem the subexport branch builds in
resolveSubexportFileHandle is still per-request because actualRoot
varies with the requested dirpath.
* nfs: drive rootFilesystem before reading sharedReaderCache on UDP
The UDP listener is started before serve() calls newHandler(), so an
under-export MOUNT3 request can reach resolveSubexportFileHandle before
Server.sharedReaderCache has been assigned. Reading it directly would
hand newSeaweedFileSystem a nil cache and the sub-fs would build a
throwaway ReaderCache that never gets shared with the TCP path.
Take rootFS off Server.rootFilesystem() (which drives the sync.Once
that initializes the shared cache) and read readerCache off that
instead, so subexport sub-fs instances always share the same reader
cache as rootFS regardless of which transport sees the first MOUNT.
* nfs: collapse exact-match and outside-export MOUNT branches
The two branches return the same filesystem (export root) and the
same status; only the log line differs. Combine the conditions and
guard the fallback log inline. Behavior unchanged.
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
billy "github.com/go-git/go-billy/v5"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
gonfs "github.com/willscott/go-nfs"
|
||||
@@ -20,21 +21,75 @@ type Handler struct {
|
||||
|
||||
var _ gonfs.Handler = (*Handler)(nil)
|
||||
|
||||
func (h *Handler) Mount(_ context.Context, conn net.Conn, req gonfs.MountRequest) (gonfs.MountStatus, billy.Filesystem, []gonfs.AuthFlavor) {
|
||||
func (h *Handler) Mount(ctx context.Context, conn net.Conn, req gonfs.MountRequest) (gonfs.MountStatus, billy.Filesystem, []gonfs.AuthFlavor) {
|
||||
if h.server.clientAuthorizer != nil && !h.server.clientAuthorizer.isAllowedConn(conn) {
|
||||
return gonfs.MountStatusErrAcces, nil, []gonfs.AuthFlavor{gonfs.AuthFlavorNull}
|
||||
}
|
||||
requestedPath := normalizeExportRoot(util.FullPath(req.Dirpath))
|
||||
if requestedPath != h.server.exportRoot {
|
||||
return gonfs.MountStatusErrNoEnt, nil, []gonfs.AuthFlavor{gonfs.AuthFlavorNull}
|
||||
fs, status := h.resolveMountFilesystem(ctx, string(req.Dirpath))
|
||||
if status != gonfs.MountStatusOk {
|
||||
return status, nil, []gonfs.AuthFlavor{gonfs.AuthFlavorNull}
|
||||
}
|
||||
if _, err := h.rootFS.Lstat("/"); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return gonfs.MountStatusErrNoEnt, nil, []gonfs.AuthFlavor{gonfs.AuthFlavorNull}
|
||||
return gonfs.MountStatusOk, fs, []gonfs.AuthFlavor{gonfs.AuthFlavorNull, gonfs.AuthFlavorUnix}
|
||||
}
|
||||
|
||||
// resolveMountFilesystem resolves the MOUNT3 dirpath to a filesystem:
|
||||
// exact match serves the export root; a path strictly under the export
|
||||
// is mounted at that subdirectory (NoEnt/NotDir if missing or not a
|
||||
// directory); anything else falls back to the export root with an INFO
|
||||
// log. The UDP MOUNT path mirrors this in mount_udp.go.
|
||||
func (h *Handler) resolveMountFilesystem(ctx context.Context, requestedPath string) (*seaweedFileSystem, gonfs.MountStatus) {
|
||||
requested := normalizeExportRoot(util.FullPath(requestedPath))
|
||||
// Exact match and outside-export both fall back to the export root.
|
||||
// Only the second case logs; the first is the boring common path.
|
||||
if requested == h.server.exportRoot || !requested.IsUnder(h.server.exportRoot) {
|
||||
if requested != h.server.exportRoot {
|
||||
glog.V(0).Infof("nfs mount: client requested %q (outside export %q); serving configured export", requestedPath, h.server.exportRoot)
|
||||
}
|
||||
return gonfs.MountStatusErrServerFault, nil, []gonfs.AuthFlavor{gonfs.AuthFlavorNull}
|
||||
return h.rootFS, h.lstatExportStatus(ctx)
|
||||
}
|
||||
return gonfs.MountStatusOk, h.rootFS, []gonfs.AuthFlavor{gonfs.AuthFlavorNull, gonfs.AuthFlavorUnix}
|
||||
entry, err := h.lookupSubexportEntry(ctx, requested)
|
||||
switch {
|
||||
case err != nil && isLookupNotFound(err):
|
||||
return nil, gonfs.MountStatusErrNoEnt
|
||||
case err != nil:
|
||||
glog.Errorf("nfs mount: lookup %q under export %q failed: %v", requested, h.server.exportRoot, err)
|
||||
return nil, gonfs.MountStatusErrServerFault
|
||||
case entry == nil:
|
||||
return nil, gonfs.MountStatusErrNoEnt
|
||||
case !entry.IsDirectory:
|
||||
return nil, gonfs.MountStatusErrNotDir
|
||||
}
|
||||
glog.V(1).Infof("nfs mount: client requested %q under export %q; mounting at subdirectory", requestedPath, h.server.exportRoot)
|
||||
return newSeaweedFileSystem(h.server, requested, h.server.sharedReaderCache), gonfs.MountStatusOk
|
||||
}
|
||||
|
||||
func (h *Handler) lstatExportStatus(ctx context.Context) gonfs.MountStatus {
|
||||
if _, err := h.rootFS.fileInfoForVirtualPath(ctx, "/"); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return gonfs.MountStatusErrNoEnt
|
||||
}
|
||||
return gonfs.MountStatusErrServerFault
|
||||
}
|
||||
return gonfs.MountStatusOk
|
||||
}
|
||||
|
||||
func (h *Handler) lookupSubexportEntry(ctx context.Context, p util.FullPath) (*filer_pb.Entry, error) {
|
||||
var entry *filer_pb.Entry
|
||||
err := h.server.withInternalClient(false, func(client nfsFilerClient) error {
|
||||
dir, name := p.DirAndName()
|
||||
resp, lerr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: dir,
|
||||
Name: name,
|
||||
})
|
||||
if lerr != nil {
|
||||
return lerr
|
||||
}
|
||||
if resp != nil {
|
||||
entry = resp.Entry
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return entry, err
|
||||
}
|
||||
|
||||
func (h *Handler) Change(filesystem billy.Filesystem) billy.Change {
|
||||
|
||||
@@ -330,6 +330,168 @@ func nfsLink(target *nfsclient.Target, sourceHandle []byte, linkPath string) err
|
||||
return nfsclient.NFS3Error(status)
|
||||
}
|
||||
|
||||
func TestSeaweedNFSAcceptsAnyMountPathOverRPC(t *testing.T) {
|
||||
const exportRoot = "/buckets/data"
|
||||
|
||||
client := &fakeNFSFilerClient{
|
||||
entries: map[util.FullPath]*filer_pb.Entry{
|
||||
"/buckets": testEntry("buckets", true, 100, uint32(0755), nil),
|
||||
"/buckets/data": testEntry("data", true, 101, uint32(0755), nil),
|
||||
},
|
||||
kv: map[string][]byte{
|
||||
string(filer.InodeIndexKey(100)): testIndexRecord(t, 100, 1, "/buckets"),
|
||||
string(filer.InodeIndexKey(101)): testIndexRecord(t, 101, 1, "/buckets/data"),
|
||||
},
|
||||
}
|
||||
|
||||
server := newTestServer(t, exportRoot, client)
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
handler, err := server.newHandler()
|
||||
require.NoError(t, err)
|
||||
|
||||
serveDone := make(chan error, 1)
|
||||
go func() {
|
||||
serveDone <- gonfs.Serve(listener, handler)
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
_ = listener.Close()
|
||||
select {
|
||||
case err := <-serveDone:
|
||||
if err != nil && !isClosedNetworkErr(err) {
|
||||
t.Errorf("nfs server exited with error: %v", err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Errorf("timed out waiting for nfs server shutdown")
|
||||
}
|
||||
})
|
||||
|
||||
dirpaths := []string{
|
||||
"/",
|
||||
"/buckets",
|
||||
"/buckets/other",
|
||||
"/wrong/path",
|
||||
exportRoot,
|
||||
exportRoot + "/",
|
||||
}
|
||||
for _, dirpath := range dirpaths {
|
||||
t.Run(dirpath, func(t *testing.T) {
|
||||
var rpcClient *rpc.Client
|
||||
var dialErr error
|
||||
for attempt := 0; attempt < 10; attempt++ {
|
||||
rpcClient, dialErr = rpc.DialTCP(listener.Addr().Network(), listener.Addr().String(), false)
|
||||
if dialErr == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
require.NoError(t, dialErr)
|
||||
defer rpcClient.Close()
|
||||
|
||||
mounter := &nfsclient.Mount{Client: rpcClient}
|
||||
target, err := mounter.Mount(dirpath, rpc.AuthNull)
|
||||
require.NoErrorf(t, err, "Mount(%q)", dirpath)
|
||||
defer target.Close()
|
||||
|
||||
entries, err := target.ReadDirPlus("/")
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, entries, "Mount(%q) should land at the empty export root", dirpath)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeaweedNFSSubexportMountOverRPC(t *testing.T) {
|
||||
const exportRoot = "/buckets"
|
||||
|
||||
client := &fakeNFSFilerClient{
|
||||
entries: map[util.FullPath]*filer_pb.Entry{
|
||||
"/buckets": testEntry("buckets", true, 100, uint32(0755), nil),
|
||||
"/buckets/data": testEntry("data", true, 101, uint32(0755), nil),
|
||||
"/buckets/data/inner": testEntry("inner", false, 104, uint32(0644), []byte("payload")),
|
||||
"/buckets/other": testEntry("other", true, 105, uint32(0755), nil),
|
||||
"/buckets/file.txt": testEntry("file.txt", false, 103, uint32(0644), []byte("hi")),
|
||||
},
|
||||
kv: map[string][]byte{
|
||||
string(filer.InodeIndexKey(100)): testIndexRecord(t, 100, 1, "/buckets"),
|
||||
string(filer.InodeIndexKey(101)): testIndexRecord(t, 101, 1, "/buckets/data"),
|
||||
string(filer.InodeIndexKey(103)): testIndexRecord(t, 103, 1, "/buckets/file.txt"),
|
||||
string(filer.InodeIndexKey(104)): testIndexRecord(t, 104, 1, "/buckets/data/inner"),
|
||||
string(filer.InodeIndexKey(105)): testIndexRecord(t, 105, 1, "/buckets/other"),
|
||||
},
|
||||
}
|
||||
|
||||
server := newTestServer(t, exportRoot, client)
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
handler, err := server.newHandler()
|
||||
require.NoError(t, err)
|
||||
|
||||
serveDone := make(chan error, 1)
|
||||
go func() {
|
||||
serveDone <- gonfs.Serve(listener, handler)
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
_ = listener.Close()
|
||||
select {
|
||||
case err := <-serveDone:
|
||||
if err != nil && !isClosedNetworkErr(err) {
|
||||
t.Errorf("nfs server exited with error: %v", err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Errorf("timed out waiting for nfs server shutdown")
|
||||
}
|
||||
})
|
||||
|
||||
dial := func(t *testing.T) *rpc.Client {
|
||||
t.Helper()
|
||||
var rpcClient *rpc.Client
|
||||
var dialErr error
|
||||
for attempt := 0; attempt < 10; attempt++ {
|
||||
rpcClient, dialErr = rpc.DialTCP(listener.Addr().Network(), listener.Addr().String(), false)
|
||||
if dialErr == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
require.NoError(t, dialErr)
|
||||
t.Cleanup(func() { rpcClient.Close() })
|
||||
return rpcClient
|
||||
}
|
||||
|
||||
t.Run("mounts_under_export_at_subdirectory", func(t *testing.T) {
|
||||
mounter := &nfsclient.Mount{Client: dial(t)}
|
||||
target, err := mounter.Mount("/buckets/data", rpc.AuthNull)
|
||||
require.NoError(t, err)
|
||||
defer target.Close()
|
||||
|
||||
entries, err := target.ReadDirPlus("/")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 1)
|
||||
assert.Equal(t, "inner", entries[0].Name())
|
||||
|
||||
readFile, err := target.Open("/inner")
|
||||
require.NoError(t, err)
|
||||
defer readFile.Close()
|
||||
data, err := io.ReadAll(readFile)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []byte("payload"), data)
|
||||
})
|
||||
|
||||
t.Run("missing_entry_under_export_rejects", func(t *testing.T) {
|
||||
mounter := &nfsclient.Mount{Client: dial(t)}
|
||||
_, err := mounter.Mount("/buckets/missing", rpc.AuthNull)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("regular_file_under_export_rejects", func(t *testing.T) {
|
||||
mounter := &nfsclient.Mount{Client: dial(t)}
|
||||
_, err := mounter.Mount("/buckets/file.txt", rpc.AuthNull)
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSeaweedNFSServesInlineRoundTripOverRPC(t *testing.T) {
|
||||
client := &fakeNFSFilerClient{
|
||||
kv: map[string][]byte{
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package nfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -38,6 +40,12 @@ const (
|
||||
// listening goroutines back off identically under host pressure.
|
||||
mountUDPRetryBackoff = 50 * time.Millisecond
|
||||
|
||||
// mountUDPLookupTimeout bounds any filer round-trip the UDP MOUNT
|
||||
// path makes (export-root existence check, subexport lookup). The
|
||||
// UDP serve loop is single-threaded, so a stalled filer call would
|
||||
// otherwise block every later MOUNT packet.
|
||||
mountUDPLookupTimeout = 5 * time.Second
|
||||
|
||||
mountVersion = 3
|
||||
|
||||
mountProcNull = 0
|
||||
@@ -45,10 +53,11 @@ const (
|
||||
mountProcUmnt = 3
|
||||
|
||||
// MOUNT v3 status codes (mountstat3 in RFC 1813 §5.1.1).
|
||||
mnt3StatOK uint32 = 0
|
||||
mnt3ErrAcces uint32 = 13
|
||||
mnt3ErrNoEnt uint32 = 2
|
||||
mnt3ErrNotDir uint32 = 20
|
||||
mnt3StatOK uint32 = 0
|
||||
mnt3ErrAcces uint32 = 13
|
||||
mnt3ErrNoEnt uint32 = 2
|
||||
mnt3ErrNotDir uint32 = 20
|
||||
mnt3ErrServerFault uint32 = 10006
|
||||
|
||||
// XDR opaque length cap for dirpath. RFC 1813 §5.1 limits MNTPATHLEN
|
||||
// to 1024; cap a bit higher for headroom and reject anything beyond.
|
||||
@@ -198,14 +207,14 @@ func (m *mountUDPServer) handleCall(callBuf []byte, addr *net.UDPAddr) []byte {
|
||||
}
|
||||
}
|
||||
|
||||
// handleMount implements MOUNT v3 MNT. The wire format is RFC 1813 §5.1.4:
|
||||
// handleMount implements MOUNT v3 MNT. RFC 1813 §5.1.4:
|
||||
//
|
||||
// MOUNT3args { dirpath3 dirpath; } // XDR opaque
|
||||
// MOUNT3res { mountstat3 status; if OK { handle, auth_flavors[] } }
|
||||
//
|
||||
// We mirror handler.go's Mount(): export-path mismatch returns NoEnt; root
|
||||
// inode is encoded as a synthetic directory filehandle so it round-trips with
|
||||
// the TCP MOUNT path without an extra filer round-trip per UDP MOUNT call.
|
||||
// Mirrors Handler.resolveMountFilesystem: exact match returns the
|
||||
// synthetic root handle; under-export resolves to the subdirectory's
|
||||
// handle; outside-export falls back to the synthetic root.
|
||||
func (m *mountUDPServer) handleMount(xid uint32, args []byte, addr *net.UDPAddr) []byte {
|
||||
if len(args) < 4 {
|
||||
return encodeAcceptedReply(xid, rpcAcceptGarbageArgs, nil)
|
||||
@@ -219,16 +228,85 @@ func (m *mountUDPServer) handleMount(xid uint32, args []byte, addr *net.UDPAddr)
|
||||
return encodeAcceptedReply(xid, rpcAcceptGarbageArgs, nil)
|
||||
}
|
||||
dirpath := string(args[4 : 4+pathLen])
|
||||
|
||||
requestedPath := normalizeExportRoot(util.FullPath(dirpath))
|
||||
if requestedPath != m.server.exportRoot {
|
||||
glog.V(1).Infof("mount udp: client %s requested %q but export is %q", addr, dirpath, m.server.exportRoot)
|
||||
return encodeMountStatus(xid, mnt3ErrNoEnt)
|
||||
}
|
||||
|
||||
rootHandle := NewFileHandle(m.server.exportID, FileHandleKindDirectory, 0, filer.InodeIndexInitialGeneration).Encode()
|
||||
requested := normalizeExportRoot(util.FullPath(dirpath))
|
||||
flavors := []uint32{authFlavorNull, authFlavorUnix}
|
||||
return encodeMountSuccess(xid, rootHandle, flavors)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), mountUDPLookupTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Exact match and outside-export both fall back to the synthetic root
|
||||
// handle. Only the second case logs; the first is the common path.
|
||||
if requested == m.server.exportRoot || !requested.IsUnder(m.server.exportRoot) {
|
||||
if requested != m.server.exportRoot {
|
||||
glog.V(0).Infof("mount udp: client %s requested %q (outside export %q); serving configured export", addr, dirpath, m.server.exportRoot)
|
||||
}
|
||||
if status := m.rootMountStatus(ctx); status != mnt3StatOK {
|
||||
return encodeMountStatus(xid, status)
|
||||
}
|
||||
return encodeMountSuccess(xid, syntheticRootHandle(m.server), flavors)
|
||||
}
|
||||
fh, status := m.resolveSubexportFileHandle(ctx, requested)
|
||||
if status != mnt3StatOK {
|
||||
return encodeMountStatus(xid, status)
|
||||
}
|
||||
glog.V(1).Infof("mount udp: client %s requested %q under export %q; mounting at subdirectory", addr, dirpath, m.server.exportRoot)
|
||||
return encodeMountSuccess(xid, fh, flavors)
|
||||
}
|
||||
|
||||
// rootMountStatus is the UDP analogue of Handler.lstatExportStatus:
|
||||
// confirms the configured export root still exists in the filer so the
|
||||
// transport-OK branches can't hand out a handle pointing at a deleted
|
||||
// directory. Reuses the Server's shared rootFS instance so we don't
|
||||
// construct a wrapper per MOUNT request.
|
||||
func (m *mountUDPServer) rootMountStatus(ctx context.Context) uint32 {
|
||||
if m.server.withInternalClient == nil {
|
||||
return mnt3StatOK
|
||||
}
|
||||
switch _, err := m.server.rootFilesystem().fileInfoForVirtualPath(ctx, "/"); {
|
||||
case err == nil:
|
||||
return mnt3StatOK
|
||||
case os.IsNotExist(err):
|
||||
return mnt3ErrNoEnt
|
||||
default:
|
||||
glog.Errorf("mount udp: export root %q lookup failed: %v", m.server.exportRoot, err)
|
||||
return mnt3ErrServerFault
|
||||
}
|
||||
}
|
||||
|
||||
// resolveSubexportFileHandle is the UDP analogue of the sub-fs branch in
|
||||
// Handler.resolveMountFilesystem. The TCP path lets go-nfs's onMount call
|
||||
// ToHandle on the returned filesystem; UDP encodes the FH itself, so the
|
||||
// inode/generation lookup happens explicitly here.
|
||||
//
|
||||
// The UDP listener is up before serve() runs newHandler(), so a subexport
|
||||
// MOUNT can land here before sharedReaderCache has been assigned. Resolve
|
||||
// the rootFS first to drive Server.rootFilesystem's sync.Once and read
|
||||
// the cache directly off it, so the new sub-fs always shares the same
|
||||
// reader cache the TCP path uses.
|
||||
func (m *mountUDPServer) resolveSubexportFileHandle(ctx context.Context, requested util.FullPath) ([]byte, uint32) {
|
||||
if m.server.withInternalClient == nil {
|
||||
return nil, mnt3ErrServerFault
|
||||
}
|
||||
rootFS := m.server.rootFilesystem()
|
||||
subFS := newSeaweedFileSystem(m.server, requested, rootFS.readerCache)
|
||||
info, err := subFS.fileInfoForVirtualPath(ctx, "/")
|
||||
switch {
|
||||
case err == nil:
|
||||
case os.IsNotExist(err):
|
||||
return nil, mnt3ErrNoEnt
|
||||
default:
|
||||
glog.Errorf("mount udp: subexport lookup %q failed: %v", requested, err)
|
||||
return nil, mnt3ErrServerFault
|
||||
}
|
||||
if !info.entry.IsDirectory {
|
||||
return nil, mnt3ErrNotDir
|
||||
}
|
||||
inode := info.entry.GetAttributes().GetInode()
|
||||
return NewFileHandle(m.server.exportID, FileHandleKindDirectory, inode, info.generation).Encode(), mnt3StatOK
|
||||
}
|
||||
|
||||
func syntheticRootHandle(s *Server) []byte {
|
||||
return NewFileHandle(s.exportID, FileHandleKindDirectory, 0, filer.InodeIndexInitialGeneration).Encode()
|
||||
}
|
||||
|
||||
// encodeMountStatus returns a MOUNT MNT reply carrying just an error status.
|
||||
|
||||
@@ -1,12 +1,18 @@
|
||||
package nfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
gonfs "github.com/willscott/go-nfs"
|
||||
)
|
||||
|
||||
// buildMountCallFrame constructs a MOUNT v3 RPC CALL with an opaque dirpath
|
||||
@@ -32,9 +38,15 @@ func buildMountCallFrame(xid, prog, vers, proc uint32, dirpath string) []byte {
|
||||
|
||||
func newMountUDPTestServer(t *testing.T, exportPath string) (*mountUDPServer, *net.UDPConn) {
|
||||
t.Helper()
|
||||
return newMountUDPTestServerWithClient(t, exportPath, nil)
|
||||
}
|
||||
|
||||
// newMountUDPTestServerWithClient wires Server.withInternalClient when
|
||||
// client is non-nil, so the under-export lookup branch in handleMount
|
||||
// can find directory entries.
|
||||
func newMountUDPTestServerWithClient(t *testing.T, exportPath string, client *fakeNFSFilerClient) (*mountUDPServer, *net.UDPConn) {
|
||||
t.Helper()
|
||||
|
||||
// Build a minimal Server with just the fields the UDP MOUNT path
|
||||
// uses: exportRoot, exportID, and a permissive clientAuthorizer.
|
||||
exportRoot := normalizeExportRoot(util.FullPath(exportPath))
|
||||
authz, err := newClientAuthorizer(nil)
|
||||
if err != nil {
|
||||
@@ -46,6 +58,11 @@ func newMountUDPTestServer(t *testing.T, exportPath string) (*mountUDPServer, *n
|
||||
exportID: exportIDForRoot(exportRoot),
|
||||
clientAuthorizer: authz,
|
||||
}
|
||||
if client != nil {
|
||||
srv.withInternalClient = func(_ bool, fn func(nfsFilerClient) error) error {
|
||||
return fn(client)
|
||||
}
|
||||
}
|
||||
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
@@ -182,24 +199,132 @@ func TestMountUDPMntReturnsHandleAndFlavors(t *testing.T) {
|
||||
_ = m
|
||||
}
|
||||
|
||||
func TestMountUDPMntRejectsWrongPath(t *testing.T) {
|
||||
_, conn := newMountUDPTestServer(t, "/exports")
|
||||
func TestMountUDPMntAcceptsAnyPath(t *testing.T) {
|
||||
const exportRoot = "/buckets/data"
|
||||
|
||||
_, conn := newMountUDPTestServer(t, exportRoot)
|
||||
target := conn.LocalAddr().(*net.UDPAddr)
|
||||
|
||||
reply := sendMountUDP(t, target, buildMountCallFrame(99, mountProgram, 3, mountProcMnt, "/somewhere/else"))
|
||||
_, astat, body := parseRPCReply(t, reply)
|
||||
dirpaths := []string{
|
||||
"/",
|
||||
"/buckets",
|
||||
"/buckets/other",
|
||||
"/wrong/path",
|
||||
"",
|
||||
"buckets/data",
|
||||
exportRoot,
|
||||
exportRoot + "/",
|
||||
}
|
||||
for i, dirpath := range dirpaths {
|
||||
t.Run(dirpath, func(t *testing.T) {
|
||||
xid := uint32(1000 + i)
|
||||
reply := sendMountUDP(t, target, buildMountCallFrame(xid, mountProgram, 3, mountProcMnt, dirpath))
|
||||
_, astat, body := parseRPCReply(t, reply)
|
||||
if astat != rpcAcceptSuccess {
|
||||
t.Fatalf("accept_stat=%d want SUCCESS(0)", astat)
|
||||
}
|
||||
if len(body) < 4 {
|
||||
t.Fatalf("body too short: %d bytes", len(body))
|
||||
}
|
||||
if got := binary.BigEndian.Uint32(body[0:4]); got != mnt3StatOK {
|
||||
t.Errorf("MNT(%q): mountstat3=%d want OK(0)", dirpath, got)
|
||||
}
|
||||
if len(body) <= 4 {
|
||||
t.Errorf("MNT(%q) success body must include handle and flavors", dirpath)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if astat != rpcAcceptSuccess {
|
||||
t.Fatalf("accept_stat=%d want SUCCESS(0); MNT3ERR is in the body, not at the RPC layer", astat)
|
||||
func TestMountUDPSubexportMount(t *testing.T) {
|
||||
const exportRoot = "/buckets"
|
||||
|
||||
client := &fakeNFSFilerClient{
|
||||
entries: map[util.FullPath]*filer_pb.Entry{
|
||||
"/buckets": testEntry("buckets", true, 100, uint32(0755), nil),
|
||||
"/buckets/data": testEntry("data", true, 101, uint32(0755), nil),
|
||||
"/buckets/data/nested": testEntry("nested", true, 102, uint32(0755), nil),
|
||||
"/buckets/file.txt": testEntry("file.txt", false, 103, uint32(0644), []byte("hi")),
|
||||
},
|
||||
kv: map[string][]byte{
|
||||
string(filer.InodeIndexKey(100)): testIndexRecord(t, 100, 1, "/buckets"),
|
||||
string(filer.InodeIndexKey(101)): testIndexRecord(t, 101, 1, "/buckets/data"),
|
||||
string(filer.InodeIndexKey(102)): testIndexRecord(t, 102, 1, "/buckets/data/nested"),
|
||||
string(filer.InodeIndexKey(103)): testIndexRecord(t, 103, 1, "/buckets/file.txt"),
|
||||
},
|
||||
}
|
||||
if len(body) < 4 {
|
||||
t.Fatalf("body too short: %d bytes", len(body))
|
||||
|
||||
m, conn := newMountUDPTestServerWithClient(t, exportRoot, client)
|
||||
target := conn.LocalAddr().(*net.UDPAddr)
|
||||
|
||||
// Build a TCP Handler from the same Server so we can compare the
|
||||
// raw FH bytes both transports produce for the same subdirectory.
|
||||
tcpHandler, err := m.server.newHandler()
|
||||
require.NoError(t, err)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
dirpath string
|
||||
wantStatus uint32
|
||||
wantInode uint64
|
||||
}{
|
||||
{name: "subdirectory_one_level", dirpath: "/buckets/data", wantStatus: mnt3StatOK, wantInode: 101},
|
||||
{name: "subdirectory_two_levels", dirpath: "/buckets/data/nested", wantStatus: mnt3StatOK, wantInode: 102},
|
||||
{name: "subdirectory_trailing_slash", dirpath: "/buckets/data/", wantStatus: mnt3StatOK, wantInode: 101},
|
||||
{name: "missing_under_export", dirpath: "/buckets/missing", wantStatus: mnt3ErrNoEnt},
|
||||
{name: "deep_missing_under_export", dirpath: "/buckets/data/no-such-thing", wantStatus: mnt3ErrNoEnt},
|
||||
{name: "regular_file_not_directory", dirpath: "/buckets/file.txt", wantStatus: mnt3ErrNotDir},
|
||||
}
|
||||
if status := binary.BigEndian.Uint32(body[0:4]); status != mnt3ErrNoEnt {
|
||||
t.Errorf("mountstat3=%d want NoEnt(2)", status)
|
||||
}
|
||||
if len(body) != 4 {
|
||||
t.Errorf("error reply should carry only the status; got %d trailing bytes", len(body)-4)
|
||||
for i, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
xid := uint32(2000 + i)
|
||||
reply := sendMountUDP(t, target, buildMountCallFrame(xid, mountProgram, 3, mountProcMnt, tc.dirpath))
|
||||
_, astat, body := parseRPCReply(t, reply)
|
||||
if astat != rpcAcceptSuccess {
|
||||
t.Fatalf("accept_stat=%d want SUCCESS(0)", astat)
|
||||
}
|
||||
if len(body) < 4 {
|
||||
t.Fatalf("body too short: %d bytes", len(body))
|
||||
}
|
||||
got := binary.BigEndian.Uint32(body[0:4])
|
||||
if got != tc.wantStatus {
|
||||
t.Fatalf("MNT(%q) status=%d want %d", tc.dirpath, got, tc.wantStatus)
|
||||
}
|
||||
if tc.wantStatus != mnt3StatOK {
|
||||
if len(body) != 4 {
|
||||
t.Errorf("MNT(%q) error body should carry only the status; got %d trailing bytes", tc.dirpath, len(body)-4)
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(body) < 8 {
|
||||
t.Fatalf("MNT(%q) success body missing handle length", tc.dirpath)
|
||||
}
|
||||
handleLen := binary.BigEndian.Uint32(body[4:8])
|
||||
if uint32(len(body)) < 8+handleLen {
|
||||
t.Fatalf("MNT(%q) success body truncated", tc.dirpath)
|
||||
}
|
||||
udpHandleBytes := body[8 : 8+handleLen]
|
||||
handle, err := DecodeFileHandle(udpHandleBytes)
|
||||
if err != nil {
|
||||
t.Fatalf("MNT(%q) handle decode: %v", tc.dirpath, err)
|
||||
}
|
||||
if handle.Inode != tc.wantInode {
|
||||
t.Errorf("MNT(%q) FH inode=%d want %d", tc.dirpath, handle.Inode, tc.wantInode)
|
||||
}
|
||||
if handle.Kind != FileHandleKindDirectory {
|
||||
t.Errorf("MNT(%q) FH kind=%d want directory", tc.dirpath, handle.Kind)
|
||||
}
|
||||
|
||||
// Transport parity: drive the TCP Handler with the same dirpath
|
||||
// and confirm the bytes go-nfs's onMount would write match the
|
||||
// UDP responder's bytes exactly. A regression that drifts the
|
||||
// generation, exportID, or kind on one transport would fail here.
|
||||
tcpStatus, tcpFS, _ := tcpHandler.Mount(context.Background(), nil, gonfs.MountRequest{Dirpath: []byte(tc.dirpath)})
|
||||
require.Equal(t, gonfs.MountStatusOk, tcpStatus, "TCP Mount(%q)", tc.dirpath)
|
||||
tcpHandleBytes := tcpHandler.ToHandle(tcpFS, nil)
|
||||
require.NotEmpty(t, tcpHandleBytes, "TCP Mount(%q) ToHandle returned empty", tc.dirpath)
|
||||
assert.Equal(t, tcpHandleBytes, udpHandleBytes, "TCP/UDP FH bytes diverge for %q", tc.dirpath)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -46,6 +47,9 @@ type Server struct {
|
||||
newUploader func() (chunkUploader, error)
|
||||
withFilerClient filerClientExecutor
|
||||
withInternalClient internalClientExecutor
|
||||
|
||||
rootFSOnce sync.Once
|
||||
rootFS *seaweedFileSystem
|
||||
}
|
||||
|
||||
func NewServer(option *Option) (*Server, error) {
|
||||
@@ -199,19 +203,30 @@ func (s *Server) newHandler() (*Handler, error) {
|
||||
if s == nil {
|
||||
return nil, errors.New("nfs server is not configured")
|
||||
}
|
||||
rootFS := newSeaweedFileSystem(s, s.exportRoot, s.sharedReaderCache)
|
||||
if s.sharedReaderCache == nil {
|
||||
s.sharedReaderCache = rootFS.readerCache
|
||||
}
|
||||
if s.chunkInvalidator == nil {
|
||||
s.chunkInvalidator = s.sharedReaderCache
|
||||
}
|
||||
return &Handler{
|
||||
server: s,
|
||||
rootFS: rootFS,
|
||||
rootFS: s.rootFilesystem(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// rootFilesystem returns a single seaweedFileSystem rooted at the
|
||||
// configured export, building it on first call. Both the TCP handler
|
||||
// (via newHandler) and the UDP MOUNT path use the same instance so
|
||||
// they share the chunk reader cache and don't reconstruct a wrapper
|
||||
// per request.
|
||||
func (s *Server) rootFilesystem() *seaweedFileSystem {
|
||||
s.rootFSOnce.Do(func() {
|
||||
s.rootFS = newSeaweedFileSystem(s, s.exportRoot, s.sharedReaderCache)
|
||||
if s.sharedReaderCache == nil {
|
||||
s.sharedReaderCache = s.rootFS.readerCache
|
||||
}
|
||||
if s.chunkInvalidator == nil {
|
||||
s.chunkInvalidator = s.sharedReaderCache
|
||||
}
|
||||
})
|
||||
return s.rootFS
|
||||
}
|
||||
|
||||
func (s *Server) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
if s == nil || s.withFilerClient == nil {
|
||||
return errors.New("nfs filer client is not configured")
|
||||
|
||||
@@ -507,23 +507,92 @@ func TestHandlerMountAndFileHandleRoundTrip(t *testing.T) {
|
||||
assert.Equal(t, []string{"demo.txt"}, path)
|
||||
}
|
||||
|
||||
func TestHandlerRejectsUnexpectedMountPath(t *testing.T) {
|
||||
func TestHandlerAcceptsAnyMountPath(t *testing.T) {
|
||||
const exportRoot = "/buckets/data"
|
||||
|
||||
client := &fakeNFSFilerClient{
|
||||
entries: map[util.FullPath]*filer_pb.Entry{
|
||||
"/exports": testEntry("exports", true, 101, uint32(0755), nil),
|
||||
"/buckets": testEntry("buckets", true, 100, uint32(0755), nil),
|
||||
"/buckets/data": testEntry("data", true, 101, uint32(0755), nil),
|
||||
},
|
||||
kv: map[string][]byte{
|
||||
string(filer.InodeIndexKey(101)): testIndexRecord(t, 101, 1, "/exports"),
|
||||
string(filer.InodeIndexKey(100)): testIndexRecord(t, 100, 1, "/buckets"),
|
||||
string(filer.InodeIndexKey(101)): testIndexRecord(t, 101, 1, "/buckets/data"),
|
||||
},
|
||||
}
|
||||
|
||||
server := newTestServer(t, "/exports", client)
|
||||
server := newTestServer(t, exportRoot, client)
|
||||
handler, err := server.newHandler()
|
||||
require.NoError(t, err)
|
||||
|
||||
status, filesystem, _ := handler.Mount(context.Background(), nil, gonfs.MountRequest{Dirpath: []byte("/wrong")})
|
||||
assert.Equal(t, gonfs.MountStatusErrNoEnt, status)
|
||||
assert.Nil(t, filesystem)
|
||||
dirpaths := []string{
|
||||
"/",
|
||||
"/buckets",
|
||||
"/buckets/other",
|
||||
"/wrong/path",
|
||||
"",
|
||||
"buckets/data",
|
||||
exportRoot,
|
||||
exportRoot + "/",
|
||||
}
|
||||
for _, dirpath := range dirpaths {
|
||||
t.Run(dirpath, func(t *testing.T) {
|
||||
status, fs, _ := handler.Mount(context.Background(), nil, gonfs.MountRequest{Dirpath: []byte(dirpath)})
|
||||
assert.Equal(t, gonfs.MountStatusOk, status, "Mount(%q)", dirpath)
|
||||
assert.NotNil(t, fs, "Mount(%q)", dirpath)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlerSubexportMount(t *testing.T) {
|
||||
const exportRoot = "/buckets"
|
||||
|
||||
client := &fakeNFSFilerClient{
|
||||
entries: map[util.FullPath]*filer_pb.Entry{
|
||||
"/buckets": testEntry("buckets", true, 100, uint32(0755), nil),
|
||||
"/buckets/data": testEntry("data", true, 101, uint32(0755), nil),
|
||||
"/buckets/data/nested": testEntry("nested", true, 102, uint32(0755), nil),
|
||||
"/buckets/file.txt": testEntry("file.txt", false, 103, uint32(0644), []byte("hi")),
|
||||
},
|
||||
kv: map[string][]byte{
|
||||
string(filer.InodeIndexKey(100)): testIndexRecord(t, 100, 1, "/buckets"),
|
||||
string(filer.InodeIndexKey(101)): testIndexRecord(t, 101, 1, "/buckets/data"),
|
||||
string(filer.InodeIndexKey(102)): testIndexRecord(t, 102, 1, "/buckets/data/nested"),
|
||||
string(filer.InodeIndexKey(103)): testIndexRecord(t, 103, 1, "/buckets/file.txt"),
|
||||
},
|
||||
}
|
||||
|
||||
server := newTestServer(t, exportRoot, client)
|
||||
handler, err := server.newHandler()
|
||||
require.NoError(t, err)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
dirpath string
|
||||
wantStatus gonfs.MountStatus
|
||||
wantSub util.FullPath
|
||||
}{
|
||||
{name: "subdirectory_one_level", dirpath: "/buckets/data", wantStatus: gonfs.MountStatusOk, wantSub: "/buckets/data"},
|
||||
{name: "subdirectory_two_levels", dirpath: "/buckets/data/nested", wantStatus: gonfs.MountStatusOk, wantSub: "/buckets/data/nested"},
|
||||
{name: "subdirectory_trailing_slash", dirpath: "/buckets/data/", wantStatus: gonfs.MountStatusOk, wantSub: "/buckets/data"},
|
||||
{name: "missing_under_export", dirpath: "/buckets/missing", wantStatus: gonfs.MountStatusErrNoEnt},
|
||||
{name: "deep_missing_under_export", dirpath: "/buckets/data/no-such-thing", wantStatus: gonfs.MountStatusErrNoEnt},
|
||||
{name: "regular_file_not_directory", dirpath: "/buckets/file.txt", wantStatus: gonfs.MountStatusErrNotDir},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
status, fs, _ := handler.Mount(context.Background(), nil, gonfs.MountRequest{Dirpath: []byte(tc.dirpath)})
|
||||
assert.Equal(t, tc.wantStatus, status, "Mount(%q)", tc.dirpath)
|
||||
if tc.wantStatus != gonfs.MountStatusOk {
|
||||
assert.Nil(t, fs)
|
||||
return
|
||||
}
|
||||
require.NotNil(t, fs)
|
||||
subFS, ok := fs.(*seaweedFileSystem)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, tc.wantSub, subFS.actualRoot)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandlerRejectsMountFromUnauthorizedClient(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user