1use core::fmt;
2use std::{
3 cell::{
4 Ref,
5 RefCell,
6 },
7 collections::HashMap,
8 future::Future,
9 hash::Hash,
10 mem,
11 rc::Rc,
12 time::{
13 Duration,
14 Instant,
15 },
16};
17
18use async_io::Timer;
19use freya_core::{
20 integration::FxHashSet,
21 lifecycle::context::{
22 consume_context,
23 provide_context_for_scope_id,
24 try_consume_context,
25 },
26 prelude::*,
27 scope_id::ScopeId,
28};
29use futures_util::stream::{
30 FuturesUnordered,
31 StreamExt,
32};
33
34pub trait QueryCapability
35where
36 Self: 'static + Clone + PartialEq + Hash + Eq,
37{
38 type Ok;
39 type Err;
40 type Keys: Hash + PartialEq + Clone;
41
42 fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
44
45 fn matches(&self, _keys: &Self::Keys) -> bool {
47 true
48 }
49}
50
51pub enum QueryStateData<Q: QueryCapability> {
52 Pending,
54 Loading { res: Option<Result<Q::Ok, Q::Err>> },
56 Settled {
58 res: Result<Q::Ok, Q::Err>,
59 settlement_instant: Instant,
60 },
61}
62
63impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
64 type Error = ();
65
66 fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
67 match value {
68 QueryStateData::Loading { res: Some(res) } => Ok(res),
69 QueryStateData::Settled { res, .. } => Ok(res),
70 _ => Err(()),
71 }
72 }
73}
74
75impl<Q> fmt::Debug for QueryStateData<Q>
76where
77 Q: QueryCapability,
78 Q::Ok: fmt::Debug,
79 Q::Err: fmt::Debug,
80{
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Self::Pending => f.write_str("Pending"),
84 Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
85 Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
86 }
87 }
88}
89
90impl<Q: QueryCapability> QueryStateData<Q> {
91 pub fn is_ok(&self) -> bool {
93 matches!(self, QueryStateData::Settled { res: Ok(_), .. })
94 }
95
96 pub fn is_err(&self) -> bool {
98 matches!(self, QueryStateData::Settled { res: Err(_), .. })
99 }
100
101 pub fn is_loading(&self) -> bool {
103 matches!(self, QueryStateData::Loading { .. })
104 }
105
106 pub fn is_pending(&self) -> bool {
108 matches!(self, QueryStateData::Pending)
109 }
110
111 pub fn is_stale(&self, query: &Query<Q>) -> bool {
113 match self {
114 QueryStateData::Pending => true,
115 QueryStateData::Loading { .. } => true,
116 QueryStateData::Settled {
117 settlement_instant, ..
118 } => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
119 }
120 }
121
122 pub fn ok(&self) -> Option<&Q::Ok> {
124 match self {
125 Self::Settled { res: Ok(res), .. } => Some(res),
126 Self::Loading { res: Some(Ok(res)) } => Some(res),
127 _ => None,
128 }
129 }
130
131 pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
133 match self {
134 Self::Loading { res: Some(v) } => v,
135 Self::Settled { res, .. } => res,
136 _ => unreachable!(),
137 }
138 }
139
140 fn into_loading(self) -> QueryStateData<Q> {
141 match self {
142 QueryStateData::Pending => QueryStateData::Loading { res: None },
143 QueryStateData::Loading { res } => QueryStateData::Loading { res },
144 QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
145 }
146 }
147}
148
149pub struct QueriesStorage<Q: QueryCapability> {
150 storage: State<HashMap<Query<Q>, QueryData<Q>>>,
151}
152
153impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
154
155impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
156 fn clone(&self) -> Self {
157 *self
158 }
159}
160
161pub struct QueryData<Q: QueryCapability> {
162 state: Rc<RefCell<QueryStateData<Q>>>,
163 reactive_contexts: Rc<RefCell<FxHashSet<ReactiveContext>>>,
164
165 interval_task: Rc<RefCell<Option<(Duration, TaskHandle)>>>,
166 clean_task: Rc<RefCell<Option<TaskHandle>>>,
167}
168
169impl<Q: QueryCapability> Clone for QueryData<Q> {
170 fn clone(&self) -> Self {
171 Self {
172 state: self.state.clone(),
173 reactive_contexts: self.reactive_contexts.clone(),
174
175 interval_task: self.interval_task.clone(),
176 clean_task: self.clean_task.clone(),
177 }
178 }
179}
180
181impl<Q: QueryCapability> QueriesStorage<Q> {
182 fn new_in_root() -> Self {
183 Self {
184 storage: State::create_global(HashMap::default()),
185 }
186 }
187
188 fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
189 let query_clone = query.clone();
190 let mut storage = self.storage.write_unchecked();
191
192 let query_data = storage.entry(query).or_insert_with(|| QueryData {
193 state: Rc::new(RefCell::new(QueryStateData::Pending)),
194 reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
195 interval_task: Rc::default(),
196 clean_task: Rc::default(),
197 });
198 let query_data_clone = query_data.clone();
199
200 if let Some(clean_task) = query_data.clean_task.take() {
202 clean_task.cancel();
203 }
204
205 let interval = query_clone.interval_time;
209 let interval_enabled = query_clone.interval_time != Duration::MAX;
210 let interval_task = &mut *query_data.interval_task.borrow_mut();
211
212 let create_interval_task = match interval_task {
213 None if interval_enabled => true,
214 Some((current_interval, current_interval_task)) if interval_enabled => {
215 let new_interval_is_shorter = *current_interval > interval;
216 if new_interval_is_shorter {
217 current_interval_task.cancel();
218 *interval_task = None;
219 }
220 new_interval_is_shorter
221 }
222 _ => false,
223 };
224 if create_interval_task {
225 let task = spawn_forever(async move {
226 loop {
227 Timer::after(interval).await;
229
230 QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
232 }
233 });
234 *interval_task = Some((interval, task));
235 }
236
237 query_data.clone()
238 }
239
240 fn update_tasks(&mut self, query: Query<Q>) {
241 let storage_clone = self.storage;
242 let mut storage = self.storage.write_unchecked();
243
244 let query_data = storage.get_mut(&query).unwrap();
245
246 if let Some((_, interval_task)) = query_data.interval_task.take() {
248 interval_task.cancel();
249 }
250
251 if query_data.reactive_contexts.borrow().len() == 1 {
253 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
254 Timer::after(query.clean_time).await;
256
257 let mut storage = storage_clone.write_unchecked();
259 storage.remove(&query);
260 }));
261 }
262 }
263
264 pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
265 let query: Query<Q> = get_query.into();
266
267 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
268 Some(storage) => storage,
269 None => {
270 provide_context_for_scope_id(
271 QueriesStorage::<Q>::new_in_root(),
272 Some(ScopeId::ROOT),
273 );
274 try_consume_context::<QueriesStorage<Q>>().unwrap()
275 }
276 };
277
278 let mut map = storage.storage.write();
279 let query_data = map
280 .entry(query.clone())
281 .or_insert_with(|| QueryData {
282 state: Rc::new(RefCell::new(QueryStateData::Pending)),
283 reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
284 interval_task: Rc::default(),
285 clean_task: Rc::default(),
286 })
287 .clone();
288
289 if query_data.state.borrow().is_stale(&query) {
291 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
293 .into_loading();
294 *query_data.state.borrow_mut() = res;
295 for reactive_context in query_data.reactive_contexts.borrow().iter() {
296 reactive_context.notify();
297 }
298
299 let res = query.query.run(&query.keys).await;
301
302 *query_data.state.borrow_mut() = QueryStateData::Settled {
304 res,
305 settlement_instant: Instant::now(),
306 };
307 for reactive_context in query_data.reactive_contexts.borrow().iter() {
308 reactive_context.notify();
309 }
310 }
311
312 if query_data.reactive_contexts.borrow().is_empty() {
314 *query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
315 Timer::after(query.clean_time).await;
317
318 let mut storage = storage.storage.write_unchecked();
320 storage.remove(&query);
321 }));
322 }
323
324 QueryReader {
325 state: query_data.state,
326 }
327 }
328
329 pub async fn invalidate_all() {
333 let storage = consume_context::<QueriesStorage<Q>>();
334
335 storage.inner_invalidate_all().await;
336 }
337
338 pub async fn try_invalidate_all() {
340 let Some(storage) = try_consume_context::<QueriesStorage<Q>>() else {
341 return;
342 };
343
344 storage.inner_invalidate_all().await;
345 }
346
347 async fn inner_invalidate_all(self) {
348 let matching_queries = self.storage.read().clone().into_iter().collect::<Vec<_>>();
350 let matching_queries = matching_queries
351 .iter()
352 .map(|(q, d)| (q, d))
353 .collect::<Vec<_>>();
354
355 Self::run_queries(&matching_queries).await
357 }
358
359 pub async fn invalidate_matching(matching_keys: Q::Keys) {
363 let storage = consume_context::<QueriesStorage<Q>>();
364
365 storage.inner_invalidate_matching(matching_keys).await;
366 }
367
368 pub async fn try_invalidate_matching(matching_keys: Q::Keys) {
370 let Some(storage) = try_consume_context::<QueriesStorage<Q>>() else {
371 return;
372 };
373
374 storage.inner_invalidate_matching(matching_keys).await;
375 }
376
377 async fn inner_invalidate_matching(self, matching_keys: Q::Keys) {
378 let mut matching_queries = Vec::new();
380 for (query, data) in self.storage.read().iter() {
381 if query.query.matches(&matching_keys) {
382 matching_queries.push((query.clone(), data.clone()));
383 }
384 }
385 let matching_queries = matching_queries
386 .iter()
387 .map(|(q, d)| (q, d))
388 .collect::<Vec<_>>();
389
390 Self::run_queries(&matching_queries).await
392 }
393
394 async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
395 let tasks = FuturesUnordered::new();
396
397 for (query, query_data) in queries {
398 let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
400 .into_loading();
401 *query_data.state.borrow_mut() = res;
402 for reactive_context in query_data.reactive_contexts.borrow().iter() {
403 reactive_context.notify();
404 }
405
406 tasks.push(Box::pin(async move {
407 let res = query.query.run(&query.keys).await;
409
410 *query_data.state.borrow_mut() = QueryStateData::Settled {
412 res,
413 settlement_instant: Instant::now(),
414 };
415 for reactive_context in query_data.reactive_contexts.borrow().iter() {
416 reactive_context.notify();
417 }
418 }));
419 }
420
421 tasks.count().await;
422 }
423}
424
425pub struct GetQuery<Q: QueryCapability> {
426 query: Q,
427 keys: Q::Keys,
428
429 stale_time: Duration,
430 clean_time: Duration,
431}
432
433impl<Q: QueryCapability> GetQuery<Q> {
434 pub fn new(keys: Q::Keys, query: Q) -> Self {
435 Self {
436 query,
437 keys,
438 stale_time: Duration::ZERO,
439 clean_time: Duration::ZERO,
440 }
441 }
442 pub fn stale_time(self, stale_time: Duration) -> Self {
446 Self { stale_time, ..self }
447 }
448
449 pub fn clean_time(self, clean_time: Duration) -> Self {
453 Self { clean_time, ..self }
454 }
455}
456
457impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
458 fn from(value: GetQuery<Q>) -> Self {
459 Query {
460 query: value.query,
461 keys: value.keys,
462
463 enabled: true,
464
465 stale_time: value.stale_time,
466 clean_time: value.clean_time,
467 interval_time: Duration::MAX,
468 }
469 }
470}
471#[derive(PartialEq, Clone)]
472pub struct Query<Q: QueryCapability> {
473 query: Q,
474 keys: Q::Keys,
475
476 enabled: bool,
477
478 stale_time: Duration,
479 clean_time: Duration,
480 interval_time: Duration,
481}
482
483impl<Q: QueryCapability> Eq for Query<Q> {}
484impl<Q: QueryCapability> Hash for Query<Q> {
485 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
486 self.query.hash(state);
487 self.keys.hash(state);
488
489 self.enabled.hash(state);
490
491 self.stale_time.hash(state);
492 self.clean_time.hash(state);
493
494 }
497}
498
499impl<Q: QueryCapability> Query<Q> {
500 pub fn new(keys: Q::Keys, query: Q) -> Self {
501 Self {
502 query,
503 keys,
504 enabled: true,
505 stale_time: Duration::ZERO,
506 clean_time: Duration::from_secs(5 * 60),
507 interval_time: Duration::MAX,
508 }
509 }
510
511 pub fn enable(self, enabled: bool) -> Self {
515 Self { enabled, ..self }
516 }
517
518 pub fn stale_time(self, stale_time: Duration) -> Self {
523 Self { stale_time, ..self }
524 }
525
526 pub fn clean_time(self, clean_time: Duration) -> Self {
530 Self { clean_time, ..self }
531 }
532
533 pub fn interval_time(self, interval_time: Duration) -> Self {
539 Self {
540 interval_time,
541 ..self
542 }
543 }
544}
545
546pub struct QueryReader<Q: QueryCapability> {
547 state: Rc<RefCell<QueryStateData<Q>>>,
548}
549
550impl<Q: QueryCapability> QueryReader<Q> {
551 pub fn state(&'_ self) -> Ref<'_, QueryStateData<Q>> {
552 self.state.borrow()
553 }
554
555 pub fn as_settled(&'_ self) -> Ref<'_, Result<Q::Ok, Q::Err>> {
559 Ref::map(self.state.borrow(), |state| match state {
560 QueryStateData::Settled { res, .. } => res,
561 _ => panic!("Query is not settled."),
562 })
563 }
564}
565
566pub struct UseQuery<Q: QueryCapability> {
567 query: State<Query<Q>>,
568}
569
570impl<Q: QueryCapability> Clone for UseQuery<Q> {
571 fn clone(&self) -> Self {
572 *self
573 }
574}
575
576impl<Q: QueryCapability> Copy for UseQuery<Q> {}
577
578impl<Q: QueryCapability> UseQuery<Q> {
579 pub fn read(&self) -> QueryReader<Q> {
584 let storage = consume_context::<QueriesStorage<Q>>();
585 let map = storage.storage.peek();
586 let query_data = map.get(&self.query.peek()).cloned().unwrap();
587
588 if let Some(mut reactive_context) = ReactiveContext::try_current() {
590 reactive_context.subscribe(&query_data.reactive_contexts);
591 }
592
593 QueryReader {
594 state: query_data.state,
595 }
596 }
597
598 pub fn peek(&self) -> QueryReader<Q> {
603 let storage = consume_context::<QueriesStorage<Q>>();
604 let map = storage.storage.peek();
605 let query_data = map.get(&self.query.peek()).cloned().unwrap();
606
607 QueryReader {
608 state: query_data.state,
609 }
610 }
611
612 pub async fn invalidate_async(&self) -> QueryReader<Q> {
616 let storage = consume_context::<QueriesStorage<Q>>();
617
618 let query = self.query.peek().clone();
619 let map = storage.storage.peek();
620 let query_data = map.get(&query).cloned().unwrap();
621
622 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
624
625 QueryReader {
626 state: query_data.state.clone(),
627 }
628 }
629
630 pub fn invalidate(&self) {
634 let storage = consume_context::<QueriesStorage<Q>>();
635
636 let query = self.query.peek().clone();
637 let map = storage.storage.peek();
638 let query_data = map.get(&query).cloned().unwrap();
639
640 spawn_forever(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
642 }
643}
644
645pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
673 let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
674 Some(storage) => storage,
675 None => {
676 provide_context_for_scope_id(QueriesStorage::<Q>::new_in_root(), Some(ScopeId::ROOT));
677 try_consume_context::<QueriesStorage<Q>>().unwrap()
678 }
679 };
680
681 let mut make_query = |query: &Query<Q>, mut prev_query: Option<Query<Q>>| {
682 let query_data = storage.insert_or_get_query(query.clone());
683
684 if let Some(prev_query) = prev_query.take() {
686 storage.update_tasks(prev_query);
687 }
688
689 if query.enabled && query_data.state.borrow().is_stale(query) {
691 let query = query.clone();
692 spawn_forever(async move {
693 QueriesStorage::run_queries(&[(&query, &query_data)]).await;
694 });
695 }
696 };
697
698 let mut current_query = use_hook(|| {
699 make_query(&query, None);
700 State::create(query.clone())
701 });
702
703 if *current_query.read() != query {
704 let prev = mem::replace(&mut *current_query.write(), query.clone());
705 make_query(&query, Some(prev));
706 }
707
708 use_drop({
710 move || {
711 storage.update_tasks(current_query.peek().clone());
712 }
713 });
714
715 let query = UseQuery {
716 query: current_query,
717 };
718
719 use_side_effect(move || {
721 let _ = query.read();
722 });
723
724 query
725}