Parcourir la source

Kernel timer syscalls

徐启航 il y a 2 ans
Parent
commit
ba734420ce

+ 1 - 3
h2o/kernel/src/cpu/time.rs

@@ -6,9 +6,7 @@ use core::{
     time::Duration,
 };
 
-pub use self::timer::{
-    tick as timer_tick, Callback as TimerCallback, CallbackArg, Timer, Type as TimerType,
-};
+pub use self::timer::{tick as timer_tick, Timer};
 
 #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
 #[repr(transparent)]

+ 2 - 2
h2o/kernel/src/cpu/time/chip.rs

@@ -1,7 +1,7 @@
-use archop::Azy;
-
 use core::sync::atomic::Ordering::Release;
 
+use archop::Azy;
+
 use super::Instant;
 use crate::{cpu::arch::tsc::TSC_CLOCK, dev::hpet::HPET_CLOCK};
 

+ 134 - 60
h2o/kernel/src/cpu/time/timer.rs

@@ -1,13 +1,15 @@
-use alloc::collections::LinkedList;
+use alloc::{collections::LinkedList, sync::Weak};
 use core::{
     cell::UnsafeCell,
-    ptr::NonNull,
     sync::atomic::{AtomicBool, Ordering::*},
     time::Duration,
 };
 
+use spin::RwLock;
+use sv_call::ipc::SIG_TIMER;
+
 use super::Instant;
-use crate::sched::{ipc::Arsc, task, PREEMPT};
+use crate::sched::{ipc::Arsc, task, Event, PREEMPT, SCHED};
 
 #[thread_local]
 static TIMER_QUEUE: TimerQueue = TimerQueue::new();
@@ -68,59 +70,65 @@ impl TimerQueue {
     }
 }
 
-pub type CallbackArg = NonNull<task::Blocked>;
+#[derive(Debug)]
+pub enum Callback {
+    Task(task::Blocked),
+    Event(Weak<dyn Event>),
+}
 
-type CallbackFn = fn(Arsc<Timer>, Instant, CallbackArg);
+impl From<task::Blocked> for Callback {
+    fn from(task: task::Blocked) -> Self {
+        Self::Task(task)
+    }
+}
 
-#[derive(Debug)]
-pub struct Callback {
-    func: CallbackFn,
-    arg: CallbackArg,
-    fired: AtomicBool,
+impl From<Weak<dyn Event>> for Callback {
+    fn from(event: Weak<dyn Event>) -> Self {
+        Self::Event(event)
+    }
 }
 
 impl Callback {
-    pub fn new(func: CallbackFn, arg: CallbackArg) -> Self {
-        Callback {
-            fired: AtomicBool::new(false),
-            func,
-            arg,
+    fn call(self, timer: &Timer) {
+        timer.fired.store(true, Release);
+        match self {
+            Callback::Task(task) => SCHED.unblock(task, true),
+            Callback::Event(event) => {
+                if let Some(event) = event.upgrade() {
+                    event.notify(0, SIG_TIMER)
+                }
+            }
         }
     }
 
-    pub fn call(&self, timer: Arsc<Timer>, cur_time: Instant) {
-        (self.func)(timer, cur_time, self.arg);
-        self.fired.store(true, Release);
+    fn cancel(self, preempt: bool) {
+        match self {
+            Callback::Task(task) => SCHED.unblock(task, preempt),
+            Callback::Event(event) => {
+                if let Some(event) = event.upgrade() {
+                    event.cancel()
+                }
+            }
+        }
     }
 }
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum Type {
-    Oneshot,
-    // Periodic,
-}
-
 #[derive(Debug)]
 pub struct Timer {
-    ty: Type,
-    callback: Callback,
-    duration: Duration,
+    callback: RwLock<Option<Callback>>,
     deadline: Instant,
-    cancel: AtomicBool,
+    fired: AtomicBool,
 }
 
 impl Timer {
-    pub fn activate(
-        ty: Type,
+    pub fn activate<C: Into<Callback>>(
         duration: Duration,
-        callback: Callback,
+        callback: C,
     ) -> sv_call::Result<Arsc<Self>> {
         let ret = Arsc::try_new(Timer {
-            ty,
-            callback,
-            duration,
+            callback: RwLock::new(Some(callback.into())),
             deadline: Instant::now() + duration,
-            cancel: AtomicBool::new(false),
+            fired: AtomicBool::new(false),
         })?;
         if duration < Duration::MAX {
             TIMER_QUEUE.push(Arsc::clone(&ret));
@@ -128,32 +136,25 @@ impl Timer {
         Ok(ret)
     }
 
-    #[inline]
-    pub fn ty(&self) -> Type {
-        self.ty
-    }
-
-    #[inline]
-    pub fn duration(&self) -> Duration {
-        self.duration
-    }
-
-    pub fn cancel(self: &Arsc<Self>) -> bool {
-        let ret = self.cancel.swap(true, AcqRel);
+    pub fn cancel(self: &Arsc<Self>, preempt: bool) -> bool {
         TIMER_QUEUE.pop(self);
-        ret
+        match PREEMPT.scope(|| self.callback.write().take()) {
+            Some(callback) => {
+                callback.cancel(preempt);
+                true
+            }
+            None => false,
+        }
     }
 
-    pub fn is_canceled(&self) -> bool {
-        self.cancel.load(Acquire)
+    pub fn fire(&self) {
+        if let Some(callback) = PREEMPT.scope(|| self.callback.write().take()) {
+            callback.call(self);
+        }
     }
 
     pub fn is_fired(&self) -> bool {
-        self.callback.fired.load(Acquire)
-    }
-
-    pub fn callback_arg(&self) -> CallbackArg {
-        self.callback.arg
+        self.fired.load(Acquire)
     }
 }
 
@@ -163,17 +164,90 @@ pub unsafe fn tick() {
         let mut cur = queue.cursor_front_mut();
         loop {
             match cur.current() {
-                Some(t) if t.is_canceled() => {
+                Some(timer) if timer.callback.try_read().map_or(false, |r| r.is_none()) => {
                     cur.remove_current();
                 }
-                Some(t) if t.deadline <= now => {
+                Some(timer) if timer.deadline <= now => {
                     let timer = cur.remove_current().unwrap();
-                    if !timer.cancel() {
-                        timer.callback.call(Arsc::clone(&timer), now);
-                    }
+                    timer.fire();
                 }
                 _ => break,
             }
         }
     })
 }
+
+mod syscall {
+    use alloc::sync::{Arc, Weak};
+
+    use spin::Mutex;
+    use sv_call::*;
+
+    use super::Timer;
+    use crate::{
+        cpu::time,
+        sched::{task::hdl::DefaultFeature, Arsc, Event, EventData, SCHED},
+    };
+
+    #[derive(Debug, Default)]
+    struct TimerEvent {
+        event_data: EventData,
+        timer: Mutex<Option<Arsc<Timer>>>,
+    }
+
+    unsafe impl Send for TimerEvent {}
+    unsafe impl Sync for TimerEvent {}
+
+    impl Event for TimerEvent {
+        fn event_data(&self) -> &EventData {
+            &self.event_data
+        }
+    }
+
+    impl Drop for TimerEvent {
+        fn drop(&mut self) {
+            match self.timer.get_mut().take() {
+                Some(timer) => {
+                    timer.cancel(false);
+                }
+                None => self.cancel(),
+            }
+        }
+    }
+
+    unsafe impl DefaultFeature for TimerEvent {
+        fn default_features() -> sv_call::Feature {
+            Feature::SEND | Feature::SYNC | Feature::WAIT | Feature::WRITE
+        }
+    }
+
+    #[syscall]
+    fn timer_new() -> Result<Handle> {
+        let event = Arc::new(TimerEvent::default());
+        let e = Arc::downgrade(&event);
+        SCHED.with_current(|cur| cur.space().handles().insert_raw(event, Some(e)))
+    }
+
+    #[syscall]
+    fn timer_set(handle: Handle, duration_us: u64) -> Result {
+        SCHED.with_current(|cur| {
+            let event = cur.space().handles().get::<TimerEvent>(handle)?;
+
+            if !event.features().contains(Feature::WRITE) {
+                return Err(EPERM);
+            }
+
+            let mut timer = event.timer.lock();
+            if let Some(timer) = timer.take() {
+                timer.cancel(false);
+            }
+            if duration_us > 0 {
+                *timer = Some(Timer::activate(
+                    time::from_us(duration_us),
+                    Weak::clone(event.event()),
+                )?);
+            }
+            Ok(())
+        })
+    }
+}

+ 3 - 14
h2o/kernel/src/sched/imp.rs

@@ -2,12 +2,11 @@ pub mod deque;
 pub mod epoch;
 pub mod waiter;
 
-use alloc::{boxed::Box, vec::Vec};
+use alloc::vec::Vec;
 use core::{
     assert_matches::assert_matches,
     cell::UnsafeCell,
     hint, mem,
-    ptr::NonNull,
     sync::atomic::{AtomicU64, Ordering::*},
     time::Duration,
 };
@@ -19,7 +18,7 @@ use deque::{Injector, Steal, Worker};
 
 use super::{ipc::Arsc, task};
 use crate::cpu::{
-    time::{CallbackArg, Instant, Timer, TimerCallback, TimerType},
+    time::{Instant, Timer},
     Lazy,
 };
 
@@ -150,12 +149,7 @@ impl Scheduler {
 
         self.schedule_impl(Instant::now(), pree, None, |task| {
             let blocked = task::Ready::block(task, block_desc);
-            let blocked = unsafe { NonNull::new_unchecked(Box::into_raw(box blocked)) };
-            let timer = Timer::activate(
-                TimerType::Oneshot,
-                duration,
-                TimerCallback::new(block_callback, blocked),
-            )?;
+            let timer = Timer::activate(duration, blocked)?;
             if let Some(wq) = wq {
                 wq.push(Arsc::clone(&timer));
             }
@@ -408,11 +402,6 @@ fn select_cpu(
     Some(ret)
 }
 
-fn block_callback(arg: CallbackArg) {
-    let blocked = unsafe { Box::from_raw(arg.as_ptr()) };
-    SCHED.unblock(Box::into_inner(blocked), true);
-}
-
 /// # Safety
 ///
 /// This function must be called only in task-migrate IPI handlers.

+ 3 - 1
h2o/kernel/src/sched/imp/waiter.rs

@@ -41,6 +41,8 @@ impl Blocker {
         let status = self.status.lock();
         if timeout.is_zero() || status.1 != 0 {
             Ok(())
+        } else if self.event.strong_count() == 0 {
+            Err(sv_call::EPIPE)
         } else {
             self.wo.wait((status, pree), timeout, "Blocker::wait")
         }
@@ -83,6 +85,6 @@ impl Waiter for Blocker {
 
 unsafe impl DefaultFeature for Blocker {
     fn default_features() -> sv_call::Feature {
-        Feature::SEND | Feature::WAIT
+        Feature::SEND
     }
 }

+ 1 - 1
h2o/kernel/src/sched/ipc.rs

@@ -10,7 +10,7 @@ use core::{
 };
 
 use spin::Mutex;
-pub use sv_call::ipc::{SIG_GENERIC, SIG_READ, SIG_WRITE};
+pub use sv_call::ipc::{SIG_GENERIC, SIG_READ, SIG_WRITE, SIG_TIMER};
 
 pub use self::{
     arsc::Arsc,

+ 45 - 56
h2o/kernel/src/sched/ipc/channel.rs

@@ -6,7 +6,7 @@ use alloc::{
 };
 use core::{
     mem,
-    sync::atomic::{AtomicUsize, Ordering::SeqCst},
+    sync::atomic::{AtomicU64, AtomicUsize, Ordering::SeqCst},
 };
 
 use bytes::Bytes;
@@ -95,8 +95,9 @@ pub struct Channel {
 
 impl Channel {
     pub fn new() -> (Self, Self) {
-        // TODO: Find a better way to acquire an unique id.
-        let peer_id = unsafe { archop::msr::rdtsc() };
+        static PEER_ID: AtomicU64 = AtomicU64::new(0);
+        let peer_id = PEER_ID.fetch_add(1, SeqCst);
+
         let q1 = Arc::new(ChannelSide::default());
         let q2 = Arc::new(ChannelSide::default());
         let c1 = Channel {
@@ -128,35 +129,34 @@ impl Channel {
     ///
     /// Returns error if the peer is closed or if the channel is full.
     pub fn send(&self, msg: &mut Packet) -> sv_call::Result {
-        match self.peer.upgrade() {
-            None => Err(sv_call::EPIPE),
-            Some(peer) => {
-                let called = PREEMPT.scope(|| {
-                    let mut callers = peer.callers.lock();
-                    let called = callers.get_mut(&msg.id);
-                    if let Some(caller) = called {
-                        let _old = caller.cell.replace(mem::take(msg));
-                        caller.event.notify(0, SIG_READ);
-                        debug_assert!(_old.is_none());
-                        true
-                    } else {
-                        false
-                    }
-                });
-                if called {
-                    Ok(())
-                } else if peer.msgs.len() >= MAX_QUEUE_SIZE {
-                    Err(sv_call::ENOSPC)
-                } else {
-                    peer.msgs.push(mem::take(msg));
-                    peer.event.notify(0, SIG_READ);
-                    Ok(())
-                }
+        let peer = self.peer.upgrade().ok_or(sv_call::EPIPE)?;
+        let called = PREEMPT.scope(|| {
+            let mut callers = peer.callers.lock();
+            let called = callers.get_mut(&msg.id);
+            if let Some(caller) = called {
+                let _old = caller.cell.replace(mem::take(msg));
+                caller.event.notify(0, SIG_READ);
+                debug_assert!(_old.is_none());
+                true
+            } else {
+                false
             }
+        });
+        if called {
+            Ok(())
+        } else if peer.msgs.len() >= MAX_QUEUE_SIZE {
+            Err(sv_call::ENOSPC)
+        } else {
+            peer.msgs.push(mem::take(msg));
+            peer.event.notify(0, SIG_READ);
+            Ok(())
         }
     }
 
-    fn get_packet(
+    /// # Safety
+    ///
+    /// `head` must contains a valid packet.
+    unsafe fn get_packet(
         head: &mut Option<Packet>,
         buffer_cap: &mut usize,
         handle_cap: &mut usize,
@@ -182,15 +182,12 @@ impl Channel {
         buffer_cap: &mut usize,
         handle_cap: &mut usize,
     ) -> sv_call::Result<Packet> {
-        if self.peer.strong_count() == 0 {
-            return Err(sv_call::EPIPE);
-        }
         let _pree = PREEMPT.lock();
         let mut head = self.head.lock();
         if head.is_none() {
             *head = Some(self.me.msgs.pop().ok_or(sv_call::ENOENT)?);
         }
-        Self::get_packet(&mut head, buffer_cap, handle_cap)
+        unsafe { Self::get_packet(&mut head, buffer_cap, handle_cap) }
     }
 
     #[inline]
@@ -206,31 +203,26 @@ impl Channel {
     }
 
     pub fn call_send(&self, msg: &mut Packet) -> sv_call::Result<usize> {
-        match self.peer.upgrade() {
-            None => Err(sv_call::EPIPE),
-            Some(peer) => {
-                if peer.msgs.len() >= MAX_QUEUE_SIZE {
-                    Err(sv_call::ENOSPC)
-                } else {
-                    let id = Self::next_msg_id(&self.me.msg_id);
-                    msg.id = id;
-                    self.me
-                        .callers
-                        .lock()
-                        .try_insert(id, Caller::default())
-                        .map_err(|_| sv_call::EEXIST)?;
-                    peer.msgs.push(mem::take(msg));
-                    peer.event.notify(0, SIG_READ);
-                    Ok(id)
-                }
-            }
+        let peer = self.peer.upgrade().ok_or(sv_call::EPIPE)?;
+        if peer.msgs.len() >= MAX_QUEUE_SIZE {
+            Err(sv_call::ENOSPC)
+        } else {
+            let id = Self::next_msg_id(&self.me.msg_id);
+            msg.id = id;
+            PREEMPT.scope(|| {
+                { self.me.callers.lock() }
+                    .try_insert(id, Caller::default())
+                    .map_or(Err(sv_call::EEXIST), |_| Ok(()))
+            })?;
+            peer.msgs.push(mem::take(msg));
+            peer.event.notify(0, SIG_READ);
+            Ok(id)
         }
     }
 
     fn call_event(&self, id: usize) -> sv_call::Result<Arc<BasicEvent>> {
         PREEMPT.scope(|| {
-            let callers = self.me.callers.lock();
-            callers
+            { self.me.callers.lock() }
                 .get(&id)
                 .map_or(Err(sv_call::ENOENT), |ent| Ok(Arc::clone(&ent.event)))
         })
@@ -242,9 +234,6 @@ impl Channel {
         buffer_cap: &mut usize,
         handle_cap: &mut usize,
     ) -> sv_call::Result<Packet> {
-        if self.peer.strong_count() == 0 {
-            return Err(sv_call::EPIPE);
-        }
         let _pree = PREEMPT.lock();
         let mut callers = self.me.callers.lock();
         let mut caller = match callers.entry(id) {
@@ -255,7 +244,7 @@ impl Channel {
             let packet = caller.get_mut().cell.take().ok_or(sv_call::ENOENT)?;
             caller.get_mut().head = Some(packet);
         }
-        Self::get_packet(&mut caller.get_mut().head, buffer_cap, handle_cap)
+        unsafe { Self::get_packet(&mut caller.get_mut().head, buffer_cap, handle_cap) }
             .inspect(|_| drop(caller.remove()))
     }
 }

+ 1 - 1
h2o/kernel/src/sched/task/hdl.rs

@@ -1,6 +1,6 @@
 mod node;
 
-use alloc::sync::{Weak, Arc};
+use alloc::sync::{Arc, Weak};
 use core::{any::Any, pin::Pin, ptr::NonNull};
 
 use archop::Azy;

+ 1 - 1
h2o/kernel/src/sched/task/hdl/node.rs

@@ -1,4 +1,4 @@
-use alloc::sync::{Weak, Arc};
+use alloc::sync::{Arc, Weak};
 use core::{
     any::Any,
     fmt,

+ 1 - 4
h2o/kernel/src/sched/wait.rs

@@ -1,6 +1,5 @@
 mod futex;
 
-use alloc::boxed::Box;
 use core::time::Duration;
 
 use crossbeam_queue::SegQueue;
@@ -48,9 +47,7 @@ impl WaitObject {
         let mut cnt = 0;
         while cnt < num {
             match self.wait_queue.pop() {
-                Some(timer) if !timer.cancel() => {
-                    let blocked = unsafe { Box::from_raw(timer.callback_arg().as_ptr()) };
-                    SCHED.unblock(Box::into_inner(blocked), preempt);
+                Some(timer) if timer.cancel(preempt) => {
                     cnt += 1;
                 }
                 Some(_) => {}

+ 19 - 0
h2o/kernel/syscall/time.json

@@ -8,5 +8,24 @@
                 "ty": "*mut ()"
             }
         ]
+    },
+    {
+        "name": "sv_timer_new",
+        "returns": "Handle",
+        "args": []
+    },
+    {
+        "name": "sv_timer_set",
+        "returns": "()",
+        "args": [
+            {
+                "name": "handle",
+                "ty": "Handle"
+            },
+            {
+                "name": "duration_us",
+                "ty": "u64"
+            }
+        ]
     }
 ]

+ 4 - 3
h2o/libs/syscall/src/ipc.rs

@@ -17,6 +17,7 @@ pub const MAX_BUFFER_SIZE: usize = crate::mem::PAGE_SIZE;
 pub const CUSTOM_MSG_ID_START: usize = 0;
 pub const CUSTOM_MSG_ID_END: usize = 12;
 
-pub const SIG_GENERIC: usize = 0b001;
-pub const SIG_READ: usize = 0b010;
-pub const SIG_WRITE: usize = 0b100;
+pub const SIG_GENERIC: usize = 0b0001;
+pub const SIG_READ: usize = 0b0010;
+pub const SIG_WRITE: usize = 0b0100;
+pub const SIG_TIMER: usize = 0b1000;

+ 2 - 0
h2o/tinit/src/test.rs

@@ -3,9 +3,11 @@ use solvent::prelude::Virt;
 mod ipc;
 mod mem;
 mod task;
+mod time;
 
 pub unsafe fn test_syscall(virt: &Virt) {
     let stack = task::test(virt);
     ipc::test(virt, stack);
     mem::test(virt);
+    time::test();
 }

+ 18 - 0
h2o/tinit/src/test/time.rs

@@ -0,0 +1,18 @@
+use solvent::prelude::Instant;
+use sv_call::{ipc::SIG_TIMER, *};
+
+pub unsafe fn test() {
+    let timer = sv_timer_new().into_res().expect("Failed to create timer");
+    let waiter = sv_obj_await(timer, true, SIG_TIMER)
+        .into_res()
+        .expect("Failed to set wait for timer");
+    sv_timer_set(timer, 10000)
+        .into_res()
+        .expect("Failed to set timer");
+    let time = Instant::now();
+    sv_obj_awend(waiter, u64::MAX)
+        .into_res()
+        .expect("Failed to wait for timer");
+    log::debug!("Waiting for 10ms, actual passed {:?}", time.elapsed());
+    sv_obj_drop(timer).into_res().expect("Failed to drop timer");
+}

+ 2 - 1
h2o/tinit/src/tmain.rs

@@ -80,7 +80,7 @@ fn serve_load(load_rpc: Channel, bootfs: Directory, bootfs_phys: &Phys) -> Error
                     None => return GetObjectResponse::Error { not_found_index: i },
                 }
             }
-            rpc::load::GetObjectResponse::Success(objs)
+            GetObjectResponse::Success(objs)
         });
         match res {
             Ok(()) => hint::spin_loop(),
@@ -194,6 +194,7 @@ extern "C" fn tmain(init_chan: sv_call::Handle) {
     };
 
     me.send(exe_args).expect("Failed to send executable args");
+    drop(me);
 
     let task = Task::exec(
         Some("PROGMGR"),

+ 2 - 2
src/lib/h2o_rs/src/task.rs

@@ -177,6 +177,6 @@ pub unsafe fn exit(retval: usize) -> ! {
     unreachable!("The task failed to exit");
 }
 
-pub fn sleep(ms: u32) -> Result {
-    unsafe { sv_call::sv_task_sleep(ms).into_res() }
+pub fn sleep(duration: Duration) -> Result {
+    unsafe { sv_call::sv_task_sleep(duration.as_millis().try_into()?).into_res() }
 }

+ 43 - 1
src/lib/h2o_rs/src/time.rs

@@ -1,6 +1,9 @@
 use core::{ops::*, time::Duration};
 
-use crate::error::{Error, Result};
+use crate::{
+    error::{Error, Result},
+    obj::Object,
+};
 
 #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
 #[repr(transparent)]
@@ -105,3 +108,42 @@ impl core::fmt::Display for Instant {
         write!(f, "{:.6}", s)
     }
 }
+
+#[repr(transparent)]
+pub struct Timer(sv_call::Handle);
+crate::impl_obj!(Timer);
+crate::impl_obj!(@CLONE, Timer);
+crate::impl_obj!(@DROP, Timer);
+
+impl Timer {
+    pub fn try_new() -> Result<Self> {
+        let handle = unsafe { sv_call::sv_timer_new() }.into_res()?;
+        // SAFETY: The handle is freshly allocated.
+        Ok(unsafe { Self::from_raw(handle) })
+    }
+
+    pub fn new() -> Self {
+        Self::try_new().expect("Failed to create a timer object")
+    }
+
+    /// If the timer is already set, then it will be canceled first (sending the
+    /// cancellation event).
+    ///
+    /// If `duration` is zero ([`Duration::ZERO`]), then the timer will not be
+    /// triggered.
+    pub fn set(&self, duration: Duration) -> Result {
+        // SAFETY: We don't move the ownership of the handle.
+        unsafe { sv_call::sv_timer_set(unsafe { self.raw() }, try_into_us(duration)?) }.into_res()
+    }
+
+    /// Shorthand for `set(Duration::ZERO)`.
+    pub fn reset(&self) -> Result {
+        self.set(Duration::ZERO)
+    }
+}
+
+impl Default for Timer {
+    fn default() -> Self {
+        Self::new()
+    }
+}

+ 0 - 1
src/lib/libc/ldso/src/lib.rs

@@ -14,7 +14,6 @@
 
 extern crate alloc;
 
-#[cfg(target_arch = "x86_64")]
 #[cfg_attr(target_arch = "x86_64", path = "arch/x86_64.rs")]
 mod arch;
 mod dso;