Skip to main content

freya_query/
query.rs

1use core::fmt;
2use std::{
3    cell::{
4        Ref,
5        RefCell,
6    },
7    collections::HashMap,
8    future::Future,
9    hash::Hash,
10    mem,
11    rc::Rc,
12    time::{
13        Duration,
14        Instant,
15    },
16};
17
18use async_io::Timer;
19use freya_core::{
20    integration::FxHashSet,
21    lifecycle::context::{
22        consume_context,
23        provide_context_for_scope_id,
24        try_consume_context,
25    },
26    prelude::*,
27    scope_id::ScopeId,
28};
29use futures_util::stream::{
30    FuturesUnordered,
31    StreamExt,
32};
33
34pub trait QueryCapability
35where
36    Self: 'static + Clone + PartialEq + Hash + Eq,
37{
38    type Ok;
39    type Err;
40    type Keys: Hash + PartialEq + Clone;
41
42    /// Query logic.
43    fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
44
45    /// Implement a custom logic to check if this query should be invalidated or not given a [QueryCapability::Keys].
46    fn matches(&self, _keys: &Self::Keys) -> bool {
47        true
48    }
49}
50
51pub enum QueryStateData<Q: QueryCapability> {
52    /// Has not loaded yet.
53    Pending,
54    /// Is loading and may not have a previous settled value.
55    Loading { res: Option<Result<Q::Ok, Q::Err>> },
56    /// Is not loading and has a settled value.
57    Settled {
58        res: Result<Q::Ok, Q::Err>,
59        settlement_instant: Instant,
60    },
61}
62
63impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
64    type Error = ();
65
66    fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
67        match value {
68            QueryStateData::Loading { res: Some(res) } => Ok(res),
69            QueryStateData::Settled { res, .. } => Ok(res),
70            _ => Err(()),
71        }
72    }
73}
74
75impl<Q> fmt::Debug for QueryStateData<Q>
76where
77    Q: QueryCapability,
78    Q::Ok: fmt::Debug,
79    Q::Err: fmt::Debug,
80{
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        match self {
83            Self::Pending => f.write_str("Pending"),
84            Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
85            Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
86        }
87    }
88}
89
90impl<Q: QueryCapability> QueryStateData<Q> {
91    /// Check if the state is [QueryStateData::Settled] and [Result::Ok].
92    pub fn is_ok(&self) -> bool {
93        matches!(self, QueryStateData::Settled { res: Ok(_), .. })
94    }
95
96    /// Check if the state is [QueryStateData::Settled] and [Result::Err].
97    pub fn is_err(&self) -> bool {
98        matches!(self, QueryStateData::Settled { res: Err(_), .. })
99    }
100
101    /// Check if the state is [QueryStateData::Loading].
102    pub fn is_loading(&self) -> bool {
103        matches!(self, QueryStateData::Loading { .. })
104    }
105
106    /// Check if the state is [QueryStateData::Pending].
107    pub fn is_pending(&self) -> bool {
108        matches!(self, QueryStateData::Pending)
109    }
110
111    /// Check if the state is stale or not, where stale means outdated.
112    pub fn is_stale(&self, query: &Query<Q>) -> bool {
113        match self {
114            QueryStateData::Pending => true,
115            QueryStateData::Loading { .. } => true,
116            QueryStateData::Settled {
117                settlement_instant, ..
118            } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
119        }
120    }
121
122    /// Get the value as an [Option].
123    pub fn ok(&self) -> Option<&Q::Ok> {
124        match self {
125            Self::Settled { res: Ok(res), .. } => Some(res),
126            Self::Loading { res: Some(Ok(res)) } => Some(res),
127            _ => None,
128        }
129    }
130
131    /// Get the value as an [Result] if possible, otherwise it will panic.
132    pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
133        match self {
134            Self::Loading { res: Some(v) } => v,
135            Self::Settled { res, .. } => res,
136            _ => unreachable!(),
137        }
138    }
139
140    fn into_loading(self) -> QueryStateData<Q> {
141        match self {
142            QueryStateData::Pending => QueryStateData::Loading { res: None },
143            QueryStateData::Loading { res } => QueryStateData::Loading { res },
144            QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
145        }
146    }
147}
148
149pub struct QueriesStorage<Q: QueryCapability> {
150    storage: State<HashMap<Query<Q>, QueryData<Q>>>,
151}
152
153impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
154
155impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
156    fn clone(&self) -> Self {
157        *self
158    }
159}
160
161pub struct QueryData<Q: QueryCapability> {
162    state: Rc<RefCell<QueryStateData<Q>>>,
163    reactive_contexts: Rc<RefCell<FxHashSet<ReactiveContext>>>,
164
165    interval_task: Rc<RefCell<Option<(Duration, TaskHandle)>>>,
166    clean_task: Rc<RefCell<Option<TaskHandle>>>,
167}
168
169impl<Q: QueryCapability> Clone for QueryData<Q> {
170    fn clone(&self) -> Self {
171        Self {
172            state: self.state.clone(),
173            reactive_contexts: self.reactive_contexts.clone(),
174
175            interval_task: self.interval_task.clone(),
176            clean_task: self.clean_task.clone(),
177        }
178    }
179}
180
181impl<Q: QueryCapability> QueriesStorage<Q> {
182    fn new_in_root() -> Self {
183        Self {
184            storage: State::create_global(HashMap::default()),
185        }
186    }
187
188    fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
189        let query_clone = query.clone();
190        let mut storage = self.storage.write_unchecked();
191
192        let query_data = storage.entry(query).or_insert_with(|| QueryData {
193            state: Rc::new(RefCell::new(QueryStateData::Pending)),
194            reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
195            interval_task: Rc::default(),
196            clean_task: Rc::default(),
197        });
198        let query_data_clone = query_data.clone();
199
200        // Cancel clean task
201        if let Some(clean_task) = query_data.clean_task.take() {
202            clean_task.cancel();
203        }
204
205        // Start an interval task if necessary
206        // If multiple queries subscribers use different intervals the interval task
207        // will run using the shortest interval
208        let interval = query_clone.interval_time;
209        let interval_enabled = query_clone.interval_time != Duration::MAX;
210        let interval_task = &mut *query_data.interval_task.borrow_mut();
211
212        let create_interval_task = match interval_task {
213            None if interval_enabled => true,
214            Some((current_interval, current_interval_task)) if interval_enabled => {
215                let new_interval_is_shorter = *current_interval > interval;
216                if new_interval_is_shorter {
217                    current_interval_task.cancel();
218                    *interval_task = None;
219                }
220                new_interval_is_shorter
221            }
222            _ => false,
223        };
224        if create_interval_task {
225            let task = spawn_forever(async move {
226                loop {
227                    // Wait as long as the stale time is configured
228                    Timer::after(interval).await;
229
230                    // Run the query
231                    QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
232                }
233            });
234            *interval_task = Some((interval, task));
235        }
236
237        query_data.clone()
238    }
239
240    fn update_tasks(&mut self, query: Query<Q>) {
241        let storage_clone = self.storage;
242        let mut storage = self.storage.write_unchecked();
243
244        let query_data = storage.get_mut(&query).unwrap();
245
246        // Cancel interval task
247        if let Some((_, interval_task)) = query_data.interval_task.take() {
248            interval_task.cancel();
249        }
250
251        // Spawn clean up task if there no more reactive contexts
252        if query_data.reactive_contexts.borrow().len() == 1 {
253            *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
254                // Wait as long as the stale time is configured
255                Timer::after(query.clean_time).await;
256
257                // Finally clear the query
258                let mut storage = storage_clone.write_unchecked();
259                storage.remove(&query);
260            }));
261        }
262    }
263
264    pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
265        let query: Query<Q> = get_query.into();
266
267        let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
268            Some(storage) => storage,
269            None => {
270                provide_context_for_scope_id(
271                    QueriesStorage::<Q>::new_in_root(),
272                    Some(ScopeId::ROOT),
273                );
274                try_consume_context::<QueriesStorage<Q>>().unwrap()
275            }
276        };
277
278        let mut map = storage.storage.write();
279        let query_data = map
280            .entry(query.clone())
281            .or_insert_with(|| QueryData {
282                state: Rc::new(RefCell::new(QueryStateData::Pending)),
283                reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
284                interval_task: Rc::default(),
285                clean_task: Rc::default(),
286            })
287            .clone();
288
289        // Run the query if the value is stale
290        if query_data.state.borrow().is_stale(&query) {
291            // Set to Loading
292            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
293                .into_loading();
294            *query_data.state.borrow_mut() = res;
295            for reactive_context in query_data.reactive_contexts.borrow().iter() {
296                reactive_context.notify();
297            }
298
299            // Run
300            let res = query.query.run(&query.keys).await;
301
302            // Set to Settled
303            *query_data.state.borrow_mut() = QueryStateData::Settled {
304                res,
305                settlement_instant: Instant::now(),
306            };
307            for reactive_context in query_data.reactive_contexts.borrow().iter() {
308                reactive_context.notify();
309            }
310        }
311
312        // Spawn clean up task if there no more reactive contexts
313        if query_data.reactive_contexts.borrow().is_empty() {
314            *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
315                // Wait as long as the stale time is configured
316                Timer::after(query.clean_time).await;
317
318                // Finally clear the query
319                let mut storage = storage.storage.write_unchecked();
320                storage.remove(&query);
321            }));
322        }
323
324        QueryReader {
325            state: query_data.state,
326        }
327    }
328
329    /// Acquires query storage from context and invalidates all queries
330    ///
331    /// Panics if query storage is not in context
332    pub async fn invalidate_all() {
333        let storage = consume_context::<QueriesStorage<Q>>();
334
335        storage.inner_invalidate_all().await;
336    }
337
338    /// Non-panicking version of [`QueriesStorage::invalidate_all()`]
339    pub async fn try_invalidate_all() {
340        let Some(storage) = try_consume_context::<QueriesStorage<Q>>() else {
341            return;
342        };
343
344        storage.inner_invalidate_all().await;
345    }
346
347    async fn inner_invalidate_all(self) {
348        // Get all the queries
349        let matching_queries = self.storage.read().clone().into_iter().collect::<Vec<_>>();
350        let matching_queries = matching_queries
351            .iter()
352            .map(|(q, d)| (q, d))
353            .collect::<Vec<_>>();
354
355        // Invalidate the queries
356        Self::run_queries(&matching_queries).await
357    }
358
359    /// Acquires query storage from context and invalidates matching queries
360    ///
361    /// Panics if query storage is not in context
362    pub async fn invalidate_matching(matching_keys: Q::Keys) {
363        let storage = consume_context::<QueriesStorage<Q>>();
364
365        storage.inner_invalidate_matching(matching_keys).await;
366    }
367
368    /// Non-panicking version of [`QueriesStorage::invalidate_matching()`]
369    pub async fn try_invalidate_matching(matching_keys: Q::Keys) {
370        let Some(storage) = try_consume_context::<QueriesStorage<Q>>() else {
371            return;
372        };
373
374        storage.inner_invalidate_matching(matching_keys).await;
375    }
376
377    async fn inner_invalidate_matching(self, matching_keys: Q::Keys) {
378        // Get those queries that match
379        let mut matching_queries = Vec::new();
380        for (query, data) in self.storage.read().iter() {
381            if query.query.matches(&matching_keys) {
382                matching_queries.push((query.clone(), data.clone()));
383            }
384        }
385        let matching_queries = matching_queries
386            .iter()
387            .map(|(q, d)| (q, d))
388            .collect::<Vec<_>>();
389
390        // Invalidate the queries
391        Self::run_queries(&matching_queries).await
392    }
393
394    async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
395        let tasks = FuturesUnordered::new();
396
397        for (query, query_data) in queries {
398            // Set to Loading
399            let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
400                .into_loading();
401            *query_data.state.borrow_mut() = res;
402            for reactive_context in query_data.reactive_contexts.borrow().iter() {
403                reactive_context.notify();
404            }
405
406            tasks.push(Box::pin(async move {
407                // Run
408                let res = query.query.run(&query.keys).await;
409
410                // Set to settled
411                *query_data.state.borrow_mut() = QueryStateData::Settled {
412                    res,
413                    settlement_instant: Instant::now(),
414                };
415                for reactive_context in query_data.reactive_contexts.borrow().iter() {
416                    reactive_context.notify();
417                }
418            }));
419        }
420
421        tasks.count().await;
422    }
423}
424
425pub struct GetQuery<Q: QueryCapability> {
426    query: Q,
427    keys: Q::Keys,
428
429    stale_time: Duration,
430    clean_time: Duration,
431}
432
433impl<Q: QueryCapability> GetQuery<Q> {
434    pub fn new(keys: Q::Keys, query: Q) -> Self {
435        Self {
436            query,
437            keys,
438            stale_time: Duration::ZERO,
439            clean_time: Duration::ZERO,
440        }
441    }
442    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query.
443    ///
444    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately.
445    pub fn stale_time(self, stale_time: Duration) -> Self {
446        Self { stale_time, ..self }
447    }
448
449    /// For how long the data is kept cached after there are no more query subscribers.
450    ///
451    /// Defaults to [Duration::ZERO], meaning it clears automatically.
452    pub fn clean_time(self, clean_time: Duration) -> Self {
453        Self { clean_time, ..self }
454    }
455}
456
457impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
458    fn from(value: GetQuery<Q>) -> Self {
459        Query {
460            query: value.query,
461            keys: value.keys,
462
463            enabled: true,
464
465            stale_time: value.stale_time,
466            clean_time: value.clean_time,
467            interval_time: Duration::MAX,
468        }
469    }
470}
471#[derive(PartialEq, Clone)]
472pub struct Query<Q: QueryCapability> {
473    query: Q,
474    keys: Q::Keys,
475
476    enabled: bool,
477
478    stale_time: Duration,
479    clean_time: Duration,
480    interval_time: Duration,
481}
482
483impl<Q: QueryCapability> Eq for Query<Q> {}
484impl<Q: QueryCapability> Hash for Query<Q> {
485    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
486        self.query.hash(state);
487        self.keys.hash(state);
488
489        self.enabled.hash(state);
490
491        self.stale_time.hash(state);
492        self.clean_time.hash(state);
493
494        // Intentionally left out as intervals can vary from one query subscriber to another
495        // self.interval_time.hash(state);
496    }
497}
498
499impl<Q: QueryCapability> Query<Q> {
500    pub fn new(keys: Q::Keys, query: Q) -> Self {
501        Self {
502            query,
503            keys,
504            enabled: true,
505            stale_time: Duration::ZERO,
506            clean_time: Duration::from_secs(5 * 60),
507            interval_time: Duration::MAX,
508        }
509    }
510
511    /// Enable or disable this query so that it doesnt automatically run.
512    ///
513    /// Defaults to `true`.
514    pub fn enable(self, enabled: bool) -> Self {
515        Self { enabled, ..self }
516    }
517
518    /// For how long is the data considered stale. If a query subscriber is mounted and the data is stale, it will re run the query
519    /// otherwise it return the cached data.
520    ///
521    /// Defaults to [Duration::ZERO], meaning it is marked stale immediately after it has been used.
522    pub fn stale_time(self, stale_time: Duration) -> Self {
523        Self { stale_time, ..self }
524    }
525
526    /// For how long the data is kept cached after there are no more query subscribers.
527    ///
528    /// Defaults to `5min`, meaning it clears automatically after 5 minutes of no subscribers to it.
529    pub fn clean_time(self, clean_time: Duration) -> Self {
530        Self { clean_time, ..self }
531    }
532
533    /// Every how often the query reruns.
534    ///
535    /// Defaults to [Duration::MAX], meaning it never re runs automatically.
536    ///
537    /// **Note**: If multiple subscribers of the same query use different intervals, only the shortest one will be used.
538    pub fn interval_time(self, interval_time: Duration) -> Self {
539        Self {
540            interval_time,
541            ..self
542        }
543    }
544}
545
546pub struct QueryReader<Q: QueryCapability> {
547    state: Rc<RefCell<QueryStateData<Q>>>,
548}
549
550impl<Q: QueryCapability> QueryReader<Q> {
551    pub fn state(&'_ self) -> Ref<'_, QueryStateData<Q>> {
552        self.state.borrow()
553    }
554
555    /// Get the result of the query.
556    ///
557    /// **This method will panic if the query is not settled.**
558    pub fn as_settled(&'_ self) -> Ref<'_, Result<Q::Ok, Q::Err>> {
559        Ref::map(self.state.borrow(), |state| match state {
560            QueryStateData::Settled { res, .. } => res,
561            _ => panic!("Query is not settled."),
562        })
563    }
564}
565
566pub struct UseQuery<Q: QueryCapability> {
567    query: State<Query<Q>>,
568}
569
570impl<Q: QueryCapability> Clone for UseQuery<Q> {
571    fn clone(&self) -> Self {
572        *self
573    }
574}
575
576impl<Q: QueryCapability> Copy for UseQuery<Q> {}
577
578impl<Q: QueryCapability> UseQuery<Q> {
579    /// Read the [Query] state.
580    ///
581    /// This **will** automatically subscribe.
582    /// If you want a **non-subscribing** method have a look at [UseQuery::peek].
583    pub fn read(&self) -> QueryReader<Q> {
584        let storage = consume_context::<QueriesStorage<Q>>();
585        let map = storage.storage.peek();
586        let query_data = map.get(&self.query.peek()).cloned().unwrap();
587
588        // Subscribe if possible
589        if let Some(mut reactive_context) = ReactiveContext::try_current() {
590            reactive_context.subscribe(&query_data.reactive_contexts);
591        }
592
593        QueryReader {
594            state: query_data.state,
595        }
596    }
597
598    /// Read the [Query] state.
599    ///
600    /// This **will not** automatically subscribe.
601    /// If you want a **subscribing** method have a look at [UseQuery::read].
602    pub fn peek(&self) -> QueryReader<Q> {
603        let storage = consume_context::<QueriesStorage<Q>>();
604        let map = storage.storage.peek();
605        let query_data = map.get(&self.query.peek()).cloned().unwrap();
606
607        QueryReader {
608            state: query_data.state,
609        }
610    }
611
612    /// Invalidate this query and await its result.
613    ///
614    /// For a `sync` version use [UseQuery::invalidate].
615    pub async fn invalidate_async(&self) -> QueryReader<Q> {
616        let storage = consume_context::<QueriesStorage<Q>>();
617
618        let query = self.query.peek().clone();
619        let map = storage.storage.peek();
620        let query_data = map.get(&query).cloned().unwrap();
621
622        // Run the query
623        QueriesStorage::run_queries(&[(&query, &query_data)]).await;
624
625        QueryReader {
626            state: query_data.state.clone(),
627        }
628    }
629
630    /// Invalidate this query in the background.
631    ///
632    /// For an `async` version use [UseQuery::invalidate_async].
633    pub fn invalidate(&self) {
634        let storage = consume_context::<QueriesStorage<Q>>();
635
636        let query = self.query.peek().clone();
637        let map = storage.storage.peek();
638        let query_data = map.get(&query).cloned().unwrap();
639
640        // Run the query
641        spawn_forever(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
642    }
643}
644
645/// Queries are used to get data asynchronously (e.g external resources such as HTTP APIs), which can later be cached or refreshed.
646///
647/// Important concepts:
648///
649/// ### Stale time
650/// This is how long will a value that is cached, considered to be recent enough.
651/// So in other words, if a value is stale it means that its outdated and therefore it should be refreshed.
652///
653/// By default the stale time is `0ms`, so if a value is cached and a new query subscriber
654/// is interested in this value, it will get refreshed automatically.
655///
656/// See [Query::stale_time].
657///
658/// ### Clean time
659/// This is how long will a value kept cached after there are no more subscribers of that query.
660///
661/// Imagine there is `Subscriber 1` of a query, the data is requested and cached.
662/// But after some seconds the `Subscriber 1` is unmounted, but the data is not cleared as the default clean time is `5min`.
663/// A few seconds later the `Subscriber 1` gets mounted again, it requests the data again but this time it is returned directly from the cache.
664///
665/// See [Query::clean_time].
666///
667/// ### Interval time
668/// This is how often do you want a query to be refreshed in the background automatically.
669/// By default it never refreshes automatically.
670///
671/// See [Query::interval_time].
672pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
673    let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
674        Some(storage) => storage,
675        None => {
676            provide_context_for_scope_id(QueriesStorage::<Q>::new_in_root(), Some(ScopeId::ROOT));
677            try_consume_context::<QueriesStorage<Q>>().unwrap()
678        }
679    };
680
681    let mut make_query = |query: &Query<Q>, mut prev_query: Option<Query<Q>>| {
682        let query_data = storage.insert_or_get_query(query.clone());
683
684        // Update the query tasks if there has been a change in the query
685        if let Some(prev_query) = prev_query.take() {
686            storage.update_tasks(prev_query);
687        }
688
689        // Immediately run the query if enabled and the value is stale
690        if query.enabled && query_data.state.borrow().is_stale(query) {
691            let query = query.clone();
692            spawn_forever(async move {
693                QueriesStorage::run_queries(&[(&query, &query_data)]).await;
694            });
695        }
696    };
697
698    let mut current_query = use_hook(|| {
699        make_query(&query, None);
700        State::create(query.clone())
701    });
702
703    if *current_query.read() != query {
704        let prev = mem::replace(&mut *current_query.write(), query.clone());
705        make_query(&query, Some(prev));
706    }
707
708    // Update the query tasks when the scope is dropped
709    use_drop({
710        move || {
711            storage.update_tasks(current_query.peek().clone());
712        }
713    });
714
715    let query = UseQuery {
716        query: current_query,
717    };
718
719    // Used to consider this use_query call as a subscriber without rerunning the component
720    use_side_effect(move || {
721        let _ = query.read();
722    });
723
724    query
725}