Procházet zdrojové kódy

Add loader service

徐启航 před 1 rokem
rodič
revize
cb514cabc9

+ 1 - 2
src/bin/progm/src/boot.rs

@@ -3,7 +3,6 @@ use core::{ffi::CStr, ptr::NonNull};
 
 use either::Either;
 use solvent::prelude::{Flags, Object, Phys, PAGE_MASK};
-use solvent_async::ipc::Channel;
 use solvent_fs::{
     entry::Entry,
     fs,
@@ -108,7 +107,7 @@ pub fn mount() {
                 Default::default(),
                 Path::new(""),
                 OpenOptions::READ | OpenOptions::EXECUTE,
-                Channel::try_from(server).unwrap().into(),
+                server.try_into().unwrap(),
             )
             .expect("Failed to open a connection");
         fs::local()

+ 2 - 0
src/lib/h2o_fs/src/lib.rs

@@ -19,6 +19,8 @@ pub mod entry;
 pub mod file;
 pub mod fs;
 #[cfg(feature = "runtime")]
+pub mod loader;
+#[cfg(feature = "runtime")]
 pub mod mem;
 #[cfg(feature = "runtime")]
 pub mod rpc;

+ 81 - 0
src/lib/h2o_fs/src/loader.rs

@@ -0,0 +1,81 @@
+use alloc::vec::Vec;
+
+use futures::StreamExt;
+use solvent::prelude::Phys;
+use solvent_core::{ffi::OsStr, path::Path};
+use solvent_rpc::{
+    io::{
+        dir::DirectoryClient,
+        file::{File, PhysOptions},
+        Error, OpenOptions,
+    },
+    loader::{LoaderRequest, LoaderServer},
+    Protocol, Server,
+};
+
+pub async fn get_object_from_dir<D: AsRef<DirectoryClient>, P: AsRef<Path>>(
+    dir: D,
+    path: P,
+) -> Result<Phys, Error> {
+    let (file, server) = File::channel();
+    let dir = dir.as_ref();
+    dir.open(
+        path.as_ref().into(),
+        OpenOptions::READ,
+        server.try_into().unwrap(),
+    )
+    .await??;
+    file.phys(PhysOptions::Copy).await?
+}
+
+pub async fn get_object<D: AsRef<DirectoryClient>, P: AsRef<Path>>(
+    dir: impl Iterator<Item = D>,
+    path: P,
+) -> Option<Phys> {
+    let path = path.as_ref();
+    for dir in dir {
+        match get_object_from_dir(dir, path).await {
+            Ok(phys) => return Some(phys),
+            Err(err) => log::warn!("Failed to get object from {path:?}: {err}"),
+        }
+    }
+    None
+}
+
+pub async fn serve<D: AsRef<DirectoryClient>>(
+    server: LoaderServer,
+    dir: impl Iterator<Item = D> + Clone,
+) {
+    let (mut request, _) = server.serve();
+    while let Some(request) = request.next().await {
+        let request = match request {
+            Ok(request) => request,
+            Err(err) => {
+                log::warn!("RPC receive error: {err}");
+                continue;
+            }
+        };
+        match request {
+            LoaderRequest::GetObject { path, responder } => {
+                let dir = dir.clone();
+                let fut = async move {
+                    let mut ret = Vec::new();
+                    for (index, path) in path.into_iter().enumerate() {
+                        match get_object(dir.clone(), OsStr::from_bytes(path.as_bytes())).await {
+                            Some(obj) => ret.push(obj),
+                            None => return Err(index),
+                        }
+                    }
+                    Ok(ret)
+                };
+                let res = responder.send(fut.await);
+                if let Err(err) = res {
+                    log::warn!("RPC send error: {err}");
+                }
+            }
+            LoaderRequest::Unknown(_) => {
+                log::warn!("RPC received unknown request")
+            }
+        }
+    }
+}

+ 17 - 0
src/lib/h2o_rpc/gen/src/types.rs

@@ -559,6 +559,16 @@ impl Protocol {
                     }
                 }
 
+                impl TryFrom<#server> for solvent::ipc::Channel {
+                    type Error = #server;
+
+                    #[inline]
+                    fn try_from(server: #server) -> Result<Self, Self::Error> {
+                        solvent::ipc::Channel::try_from(server.inner)
+                            .map_err(|inner| #server { inner })
+                    }
+                }
+
                 #vis enum #request {
                     #(#requests,)*
                     Unknown(solvent_rpc::Request),
@@ -758,12 +768,19 @@ impl Protocol {
 
                 impl solvent_rpc::sync::Client for #client {
                     type EventReceiver = #event_receiver;
+                    #[cfg(feature = "runtime")]
+                    type Async = super::#client;
 
                     #[inline]
                     fn from_inner(inner: solvent_rpc::sync::ClientImpl) -> Self {
                         #client { inner }
                     }
 
+                    #[inline]
+                    fn into_inner(this: Self) -> solvent_rpc::sync::ClientImpl {
+                        this.inner
+                    }
+
                     #[inline]
                     fn event_receiver(&self, timeout: Option<Duration>) -> Option<#event_receiver> {
                         self.inner

+ 9 - 0
src/lib/h2o_rpc/src/server.rs

@@ -75,6 +75,15 @@ impl TryFrom<ServerImpl> for Channel {
     }
 }
 
+impl TryFrom<ServerImpl> for solvent::ipc::Channel {
+    type Error = ServerImpl;
+
+    #[inline]
+    fn try_from(value: ServerImpl) -> Result<Self, Self::Error> {
+        Channel::try_from(value).map(Into::into)
+    }
+}
+
 impl SerdePacket for ServerImpl {
     fn serialize(self, ser: &mut packet::Serializer) -> Result<(), Error> {
         match Channel::try_from(self) {

+ 20 - 0
src/lib/h2o_rpc/src/sync/client.rs

@@ -38,6 +38,13 @@ impl ClientImpl {
         }
     }
 
+    #[cfg(feature = "runtime")]
+    fn into_async(self) -> Result<crate::ClientImpl, Self> {
+        let channel = Channel::try_from(self)?;
+        let channel = solvent_async::ipc::Channel::new(channel);
+        Ok(crate::ClientImpl::from(channel))
+    }
+
     #[inline]
     pub fn call(&self, packet: Packet) -> Result<Packet, Error> {
         self.inner.call(packet)
@@ -271,9 +278,22 @@ impl Inner {
 
 pub trait Client: SerdePacket + From<Channel> + AsRef<Channel> {
     type EventReceiver: EventReceiver;
+    #[cfg(feature = "runtime")]
+    type Async: crate::Client;
 
     fn from_inner(inner: ClientImpl) -> Self;
 
+    fn into_inner(this: Self) -> ClientImpl;
+
+    #[inline]
+    #[cfg(feature = "runtime")]
+    fn into_async(self) -> Result<Self::Async, Self> {
+        Self::into_inner(self)
+            .into_async()
+            .map(<Self::Async as crate::Client>::from_inner)
+            .map_err(Self::from_inner)
+    }
+
     fn event_receiver(&self, timeout: Option<Duration>) -> Option<Self::EventReceiver>;
 }