SuperTinyKernel™ RTOS 1.06.x
Lightweight, high-performance, deterministic, bare-metal C++ RTOS for resource-constrained embedded systems. MIT Open Source License.
Loading...
Searching...
No Matches
stk_sync_pipe.h
Go to the documentation of this file.
1/*
2 * SuperTinyKernel(TM) RTOS: Lightweight High-Performance Deterministic C++ RTOS for Embedded Systems.
3 *
4 * Source: https://github.com/SuperTinyKernel-RTOS
5 *
6 * Copyright (c) 2022-2026 Neutron Code Limited <stk@neutroncode.com>. All Rights Reserved.
7 * License: MIT License, see LICENSE for a full text.
8 */
9
10#ifndef STK_SYNC_PIPE_H_
11#define STK_SYNC_PIPE_H_
12
13#include <type_traits> // for std::is_scalar
14
15#include "stk_sync_cv.h"
16
22
23namespace stk {
24namespace sync {
25
71class Pipe : public ITraceable
72{
73public:
76 static const size_t CAPACITY_MAX = 0xFFFEU;
77
83 explicit Pipe(uint8_t *buf, size_t capacity, size_t element_size);
84
91 STK_VIRT_DTOR ~Pipe() = default;
92
103 bool Write(const void *data, Timeout timeout_ticks = WAIT_INFINITE);
104
112 bool TryWrite(const void *data) { return Write(data, NO_WAIT); }
113
135 size_t WriteBulk(const void *src, size_t count, Timeout timeout_ticks = WAIT_INFINITE);
136
145 size_t TryWriteBulk(const void *src, size_t count) { return WriteBulk(src, count, NO_WAIT); }
146
157 bool Read(void *data, Timeout timeout_ticks = WAIT_INFINITE);
158
166 bool TryRead(void *data) { return Read(data, NO_WAIT); }
167
190 size_t ReadBulk(void *dst, size_t count, Timeout timeout_ticks = WAIT_INFINITE);
191
199 size_t TryReadBulk(void *dst, size_t count) { return ReadBulk(dst, count, NO_WAIT); }
200
222 size_t ReadBulkTriggered(void *dst, size_t trigger, size_t max_count,
223 Timeout timeout_ticks = WAIT_INFINITE);
224
234 size_t TryReadBulkTriggered(void *dst, size_t max_count)
235 {
236 return ReadBulkTriggered(dst, 1U, max_count, NO_WAIT);
237 }
238
246 void Reset();
247
252 size_t GetCapacity() const { return m_capacity; }
253
258 size_t GetElementSize() const { return m_element_size; }
259
265 size_t GetCount() const { return m_count; }
266
271 size_t GetSpace() const { return (m_capacity - m_count); }
272
277 uint8_t *GetBuffer() { return m_buffer; }
278
284 bool IsEmpty() const { return (m_count == 0U); }
285
291 bool IsFull() const { return (m_count == m_capacity); }
292
301 bool IsStorageValid() const { return (m_buffer != nullptr); }
302
303private:
305
306 // Get a byte pointer to the raw storage for slot index \a idx.
307 uint8_t *Slot(size_t idx) const { return m_buffer + (idx * m_element_size); }
308
309 // Advance a ring-buffer index by one with wrap-around.
310 size_t Next(size_t idx) const { return (idx + 1U) % m_capacity; }
311
312 // Copy Min(count, m_count) elements into dst_bytes and update tail/count.
313 // Caller MUST hold the critical section. Returns the number of elements copied.
314 size_t DrainLocked(uint8_t *dst_bytes, size_t count);
315
316 uint8_t *m_buffer;
317 const size_t m_capacity;
318 const size_t m_element_size;
319 size_t m_count;
320 size_t m_head;
321 size_t m_tail;
324};
325
326// ---------------------------------------------------------------------------
327// Pipe: Constructor
328// ---------------------------------------------------------------------------
329
330inline Pipe::Pipe(uint8_t *buf, size_t capacity, size_t element_size)
331: m_buffer(buf),
332 m_capacity(capacity),
333 m_element_size(element_size),
334 m_count(0U),
335 m_head(0U),
336 m_tail(0U)
337{
338 STK_ASSERT(buf != nullptr);
339 STK_ASSERT(capacity >= 1U);
340 STK_ASSERT(capacity <= CAPACITY_MAX);
341 STK_ASSERT(element_size >= 1U);
342}
343
344// ---------------------------------------------------------------------------
345// Pipe: Write
346// ---------------------------------------------------------------------------
347
348inline bool Pipe::Write(const void *data, Timeout timeout_ticks)
349{
350 STK_ASSERT(data != nullptr);
352
354 bool success = true;
355
356 while (m_count == m_capacity)
357 {
358 if (!m_cv_not_full.Wait(cs_, timeout_ticks))
359 {
360 success = false;
361 break;
362 }
363 }
364
365 if (success)
366 {
368 m_head = Next(m_head);
369 m_count++;
370
371 // notify consumer that data is ready
372 m_cv_not_empty.NotifyOne_CS();
373 }
374
375 return success;
376}
377
378// ---------------------------------------------------------------------------
379// Pipe: WriteBulk
380// ---------------------------------------------------------------------------
381
382inline size_t Pipe::WriteBulk(const void *src, size_t count, Timeout timeout_ticks)
383{
384 size_t written = 0U;
385
386 if ((src != nullptr) && (count != 0U))
387 {
388 const uint8_t *const src_bytes = static_cast<const uint8_t *>(src);
389 const bool timed_wait = (timeout_ticks != WAIT_INFINITE) && (timeout_ticks != NO_WAIT);
390
391 // capture an absolute deadline once, before entering the wait loop,
392 // preventing the timeout from resetting on intermediate partial writes
393 const Timeout deadline = (timed_wait ?
394 static_cast<Timeout>(GetTicks() + timeout_ticks) : timeout_ticks);
395
397
398 while (written < count)
399 {
400 bool is_timeout = false;
401
402 while (m_count == m_capacity)
403 {
404 Timeout remaining = deadline;
405 if (timed_wait)
406 {
407 const Timeout now = static_cast<Timeout>(GetTicks());
408 remaining = (now >= deadline ? NO_WAIT : (deadline - now));
409 }
410
411 if (!m_cv_not_full.Wait(cs_, remaining))
412 {
413 is_timeout = true;
414 break; // break inner condition variable loop
415 }
416 }
417
418 // if a timeout occurred, drop out of the chunk processing loop
419 if (is_timeout)
420 {
421 break;
422 }
423
424 const size_t available = m_capacity - m_count;
425 const size_t to_write = ((count - written) < available) ? (count - written) : available;
426 const size_t first_part = m_capacity - m_head;
427
428 if (to_write <= first_part)
429 {
430 STK_MEMCPY(Slot(m_head), src_bytes + (written * m_element_size), to_write * m_element_size);
431 }
432 else
433 {
434 STK_MEMCPY(Slot(m_head), src_bytes + (written * m_element_size), first_part * m_element_size);
435 STK_MEMCPY(Slot(0U), src_bytes + ((written + first_part) * m_element_size), (to_write - first_part) * m_element_size);
436 }
437
438 written += to_write;
439 m_head = (m_head + to_write) % m_capacity;
440 m_count += to_write;
441
442 // notify consumers that data is ready
443 m_cv_not_empty.NotifyAll_CS();
444 }
445 }
446
447 return written;
448}
449
450// ---------------------------------------------------------------------------
451// Pipe: Read
452// ---------------------------------------------------------------------------
453
454inline bool Pipe::Read(void *data, Timeout timeout_ticks)
455{
456 STK_ASSERT(data != nullptr);
457
459 bool success = true;
460
461 while (m_count == 0U)
462 {
463 if (!m_cv_not_empty.Wait(cs_, timeout_ticks))
464 {
465 success = false;
466 break;
467 }
468 }
469
470 if (success)
471 {
473 m_tail = Next(m_tail);
474 m_count--;
475
476 // notify producer that space is now available
477 m_cv_not_full.NotifyOne_CS();
478 }
479
480 return success;
481}
482
483// ---------------------------------------------------------------------------
484// Pipe: DrainLocked (private helper, caller must hold the critical section)
485// ---------------------------------------------------------------------------
486
487inline size_t Pipe::DrainLocked(uint8_t *const dst_bytes, const size_t count)
488{
489 const size_t to_read = Min(count, m_count);
490 const size_t first_part = m_capacity - m_tail;
491
492 if (to_read <= first_part)
493 {
494 STK_MEMCPY(dst_bytes, Slot(m_tail), to_read * m_element_size);
495 }
496 else
497 {
498 STK_MEMCPY(dst_bytes, Slot(m_tail), first_part * m_element_size);
499 STK_MEMCPY(dst_bytes + (first_part * m_element_size), Slot(0U), (to_read - first_part) * m_element_size);
500 }
501
502 m_tail = (m_tail + to_read) % m_capacity;
503 m_count -= to_read;
504
505 m_cv_not_full.NotifyAll_CS();
506 return to_read;
507}
508
509// ---------------------------------------------------------------------------
510// Pipe: ReadBulk
511// ---------------------------------------------------------------------------
512
513inline size_t Pipe::ReadBulk(void *dst, size_t count, Timeout timeout_ticks)
514{
515 size_t read_count = 0U;
516
517 if ((dst != nullptr) && (count != 0U))
518 {
519 uint8_t *const dst_bytes = static_cast<uint8_t *>(dst);
520 const bool timed_wait = (timeout_ticks != WAIT_INFINITE) && (timeout_ticks != NO_WAIT);
521
522 // capture an absolute deadline once, before entering the wait loop,
523 // preventing the timeout from resetting on intermediate partial reads
524 const Timeout deadline = (timed_wait ?
525 static_cast<Timeout>(GetTicks() + timeout_ticks) : timeout_ticks);
526
528
529 while (read_count < count)
530 {
531 bool is_timeout = false;
532
533 while (m_count == 0U)
534 {
535 Timeout remaining = deadline;
536 if (timed_wait)
537 {
538 const Timeout now = static_cast<Timeout>(GetTicks());
539 remaining = (now >= deadline ? NO_WAIT : (deadline - now));
540 }
541
542 if (!m_cv_not_empty.Wait(cs_, remaining))
543 {
544 is_timeout = true;
545 break; // break inner condition variable loop
546 }
547 }
548
549 // if a timeout occurred, drop out of the chunk processing loop
550 if (is_timeout)
551 {
552 break;
553 }
554
555 // drain data using the state tracker's relative byte offsets
556 read_count += DrainLocked(dst_bytes + (read_count * m_element_size), count - read_count);
557 }
558 }
559
560 return read_count;
561}
562
563// ---------------------------------------------------------------------------
564// Pipe: ReadBulkTriggered
565// ---------------------------------------------------------------------------
566
567inline size_t Pipe::ReadBulkTriggered(void *dst, size_t trigger, size_t max_count, Timeout timeout_ticks)
568{
569 size_t read_count = 0U;
570
571 if ((dst != nullptr) && (max_count != 0U))
572 {
573 // trigger must be in [1, max_count]
574 if (trigger == 0U) { trigger = 1U; }
575 if (trigger > max_count) { trigger = max_count; }
576
577 uint8_t *const dst_bytes = static_cast<uint8_t *>(dst);
578 const bool timed_wait = (timeout_ticks != WAIT_INFINITE) && (timeout_ticks != NO_WAIT);
579
580 // capture an absolute deadline once, before entering the wait loop,
581 // preventing the timeout from resetting on intermediate spurious wakeups
582 const Timeout deadline = (timed_wait ?
583 static_cast<Timeout>(GetTicks() + timeout_ticks) : timeout_ticks);
584
586
587 while (m_count < trigger)
588 {
589 Timeout remaining = deadline;
590 if (timed_wait)
591 {
592 const Timeout now = static_cast<Timeout>(GetTicks());
593 remaining = (now >= deadline ? NO_WAIT : (deadline - now));
594 }
595
596 if (!m_cv_not_empty.Wait(cs_, remaining))
597 {
598 break; // break the waiting loop on timeout
599 }
600 }
601
602 // whether we broke out via satisfying the trigger or hitting a timeout,
603 // we drain whatever is currently available up to max_count
604 read_count = DrainLocked(dst_bytes, max_count);
605 }
606
607 return read_count;
608}
609
610// ---------------------------------------------------------------------------
611// Pipe: Reset
612// ---------------------------------------------------------------------------
613
614inline void Pipe::Reset()
615{
616 const ScopedCriticalSection cs_;
617
618 m_count = 0U;
619 m_head = 0U;
620 m_tail = 0U;
621
622 // wake all blocked producers: the pipe is now entirely empty, every slot is free
623 // note: we do not release readers here
624 m_cv_not_full.NotifyAll_CS();
625}
626
627// ---------------------------------------------------------------------------
628
670template <typename T, size_t N>
671class PipeT
672{
673public:
676 explicit PipeT() : m_buffer(), m_head(0U), m_tail(0U), m_count(0U), m_cv_not_empty(), m_cv_not_full()
677 {}
678
689 bool Write(const T &data, Timeout timeout_ticks = WAIT_INFINITE)
690 {
692 bool success = true;
693
694 while (m_count == N)
695 {
696 if (!m_cv_not_full.Wait(cs_, timeout_ticks))
697 {
698 success = false;
699 break;
700 }
701 }
702
703 if (success)
704 {
705 m_buffer[m_head] = data;
706 m_head = (m_head + 1U) % N;
707 m_count += 1U;
708
709 // notify consumer that data is ready
710 m_cv_not_empty.NotifyOne_CS();
711 }
712
713 return success;
714 }
715
723 bool TryWrite(const T &data) { return Write(data, NO_WAIT); }
724
745 size_t WriteBulk(const T *src, size_t count, Timeout timeout_ticks = WAIT_INFINITE)
746 {
747 size_t written = 0U;
748
749 if ((src != nullptr) && (count != 0U))
750 {
751 const bool timed_wait = (timeout_ticks != WAIT_INFINITE) && (timeout_ticks != NO_WAIT);
752
753 // capture an absolute deadline once, before entering the wait loop,
754 // preventing the timeout from resetting on intermediate partial writes
755 const Timeout deadline = (timed_wait ?
756 static_cast<Timeout>(GetTicks() + timeout_ticks) : timeout_ticks);
757
759
760 while (written < count)
761 {
762 bool is_timeout = false;
763
764 while (m_count == N)
765 {
766 Timeout remaining = deadline;
767 if (timed_wait)
768 {
769 const Timeout now = static_cast<Timeout>(GetTicks());
770 remaining = (now >= deadline ? NO_WAIT : (deadline - now));
771 }
772
773 if (!m_cv_not_full.Wait(cs_, remaining))
774 {
775 is_timeout = true;
776 break; // break inner condition variable loop
777 }
778 }
779
780 // if timeout, drop out of the chunk processing loop
781 if (is_timeout)
782 {
783 break;
784 }
785
786 // calculate how many we can copy in this contiguous stretch
787 const size_t available = N - m_count;
788 const size_t to_write = ((count - written) < available) ? (count - written) : available;
789
790 // copy from source
791 // note: if value type is not scalar or queue is small we copy with a for loop,
792 // otherwise using faster memcpy version for large scalar arrays
793 if (!std::is_scalar<T>::value || (N < 8U))
794 {
795 for (size_t i = 0U; i < to_write; ++i)
796 {
797 m_buffer[m_head] = src[written++];
798 m_head = (m_head + 1U) % N;
799 m_count += 1U;
800 }
801 }
802 else
803 {
804 const size_t first_part = N - m_head;
805
806 if (to_write <= first_part)
807 {
808 STK_MEMCPY(&m_buffer[m_head], &src[written], to_write * sizeof(T));
809 }
810 else
811 {
812 STK_MEMCPY(&m_buffer[m_head], &src[written], first_part * sizeof(T));
813 STK_MEMCPY(&m_buffer[0U], &src[written + first_part], (to_write - first_part) * sizeof(T));
814 }
815
816 written += to_write;
817 m_head = (m_head + to_write) % N;
818 m_count += to_write;
819 }
820
821 // notify consumers that data is ready
822 m_cv_not_empty.NotifyAll_CS();
823 }
824 }
825
826 return written;
827 }
828
837 size_t TryWriteBulk(const T *src, size_t count) { return WriteBulk(src, count, NO_WAIT); }
838
849 bool Read(T &data, Timeout timeout_ticks = WAIT_INFINITE)
850 {
852 bool success = true;
853
854 while (m_count == 0U)
855 {
856 if (!m_cv_not_empty.Wait(cs_, timeout_ticks))
857 {
858 success = false;
859 break;
860 }
861 }
862
863 if (success)
864 {
865 data = m_buffer[m_tail];
866 m_tail = (m_tail + 1U) % N;
867 m_count -= 1U;
868
869 // notify producer that space is available
870 m_cv_not_full.NotifyOne_CS();
871 }
872
873 return success;
874 }
875
883 bool TryRead(T &data) { return Read(data, NO_WAIT); }
884
906 size_t ReadBulk(T *dst, size_t count, Timeout timeout_ticks = WAIT_INFINITE)
907 {
908 size_t read_count = 0U;
909
910 if ((dst != nullptr) && (count != 0U))
911 {
912 const bool timed_wait = (timeout_ticks != WAIT_INFINITE) && (timeout_ticks != NO_WAIT);
913
914 // capture an absolute deadline once, before entering the wait loop,
915 // this prevents the timeout from being silently restarted on each
916 // spurious wakeup (e.g. a partial Set() that does not satisfy WAIT_ALL)
917 const Timeout deadline = (timed_wait ?
918 static_cast<Timeout>(GetTicks() + timeout_ticks) : timeout_ticks);
919
921
922 while (read_count < count)
923 {
924 bool is_timeout = false;
925
926 // wait until there is at least 1 element available
927 while (m_count == 0U)
928 {
929 Timeout remaining = deadline;
930 if (timed_wait)
931 {
932 const Timeout now = static_cast<Timeout>(GetTicks());
933 remaining = (now >= deadline ? NO_WAIT : (deadline - now));
934 }
935
936 if (!m_cv_not_empty.Wait(cs_, remaining))
937 {
938 is_timeout = true;
939 break; // break inner condition variable loop
940 }
941 }
942
943 // if a timeout, drop out of the chunk processing loop
944 if (is_timeout)
945 {
946 break;
947 }
948
949 // determine how many we can pull in this stretch
950 const size_t to_read = (count - read_count) < m_count ? (count - read_count) : m_count;
951
952 // note: if value type is not scalar or queue is small we copy with a for loop,
953 // otherwise using faster memcpy version for large scalar arrays
954 if (!std::is_scalar<T>::value || (N < 8U))
955 {
956 for (size_t i = 0U; i < to_read; ++i)
957 {
958 dst[read_count++] = m_buffer[m_tail];
959 m_tail = (m_tail + 1U) % N;
960 m_count -= 1U;
961 }
962 }
963 else
964 {
965 const size_t first_part = N - m_tail;
966
967 if (to_read <= first_part)
968 {
969 STK_MEMCPY(&dst[read_count], &m_buffer[m_tail], to_read * sizeof(T));
970 }
971 else
972 {
973 STK_MEMCPY(&dst[read_count], &m_buffer[m_tail], first_part * sizeof(T));
974 STK_MEMCPY(&dst[read_count + first_part], &m_buffer[0U], (to_read - first_part) * sizeof(T));
975 }
976
977 read_count += to_read;
978 m_tail = (m_tail + to_read) % N;
979 m_count -= to_read;
980 }
981
982 // notify producers that space is now available
983 m_cv_not_full.NotifyAll_CS();
984 }
985 }
986
987 return read_count;
988 }
989
997 size_t TryReadBulk(T *dst, size_t count) { return ReadBulk(dst, count, NO_WAIT); }
998
1006 void Reset()
1007 {
1008 const ScopedCriticalSection cs_;
1009
1010 m_count = 0U;
1011 m_head = 0U;
1012 m_tail = 0U;
1013
1014 // wake all blocked producers: the pipe is now entirely empty, every slot is free
1015 // note: we do not release readers here
1016 m_cv_not_full.NotifyAll_CS();
1017 }
1018
1023 size_t GetCapacity() const { return N; }
1024
1030 size_t GetCount() const { return m_count; }
1031
1036 size_t GetSpace() const { return (N - m_count); }
1037
1043 bool IsEmpty() const { return (m_count == 0U); }
1044
1050 bool IsFull() const { return (m_count == N); }
1051
1052private:
1054
1056 size_t m_head;
1057 size_t m_tail;
1058 size_t m_count;
1061};
1062
1063} // namespace sync
1064} // namespace stk
1065
1066#endif /* STK_SYNC_PIPE_H_ */
static __stk_forceinline void STK_MEMCPY(void *const dest, const void *const src, const size_t size)
A wrapper for a built-in memcpy, redefine to your own if required.
Definition stk_arch.h:534
#define STK_NONCOPYABLE_CLASS(TYPE)
Disables copy construction and assignment for a class.
Definition stk_defs.h:601
#define STK_ASSERT(e)
Runtime assertion. Halts execution if the expression e evaluates to false.
Definition stk_defs.h:409
#define STK_VIRT_DTOR
Makes destructors virtual and compliant to strict rules if STK_STRICT_COMPLIANCY=0.
Definition stk_defs.h:159
Implementation of synchronization primitive: stk::sync::ConditionVariable.
Namespace of STK package.
constexpr Timeout NO_WAIT
Timeout value: return immediately if the synchronization object is not yet signaled (non-blocking pol...
Definition stk_common.h:189
int32_t Timeout
Timeout time (ticks).
Definition stk_common.h:125
static Ticks GetTicks()
Get number of ticks elapsed since kernel start.
Definition stk_helper.h:319
constexpr Timeout WAIT_INFINITE
Timeout value: block indefinitely until the synchronization object is signaled.
Definition stk_common.h:183
static constexpr T Min(T a, T b)
Compile-time minimum of two values.
Definition stk_defs.h:619
Synchronization primitives for task coordination and resource protection.
Traceable object.
Definition stk_common.h:393
RAII-style low-level synchronization primitive for atomic code execution. Used as building brick for ...
Definition stk_sync_cs.h:54
Condition Variable primitive for signaling between tasks based on specific predicates.
Definition stk_sync_cv.h:68
size_t WriteBulk(const void *src, size_t count, Timeout timeout_ticks=WAIT_INFINITE)
Write multiple elements to the pipe.
bool TryRead(void *data)
Attempt to read a single element from the pipe without blocking.
const size_t m_capacity
maximum number of elements stored in the pipe
~Pipe()=default
Destructor.
size_t TryReadBulkTriggered(void *dst, size_t max_count)
Non-blocking variant of ReadBulkTriggered.
void Reset()
Discard all elements and reset the pipe to the empty state.
bool Read(void *data, Timeout timeout_ticks=WAIT_INFINITE)
Read a single element from the pipe.
size_t ReadBulk(void *dst, size_t count, Timeout timeout_ticks=WAIT_INFINITE)
Read multiple elements from the pipe.
uint8_t * Slot(size_t idx) const
ConditionVariable m_cv_not_full
signaled by Read()/Reset() when the pipe is no longer full
bool Write(const void *data, Timeout timeout_ticks=WAIT_INFINITE)
Write a single element to the pipe.
size_t GetElementSize() const
Get the size of each element in bytes.
size_t m_head
write index (next slot to be written by Write())
size_t TryWriteBulk(const void *src, size_t count)
Attempt to write multiple elements to the pipe without blocking.
bool IsEmpty() const
Check if the pipe is currently empty.
size_t GetCapacity() const
Get the maximum number of elements the pipe can hold.
size_t Next(size_t idx) const
bool TryWrite(const void *data)
Attempt to write a single element to the pipe without blocking.
size_t GetCount() const
Get the current number of elements in the pipe.
ConditionVariable m_cv_not_empty
signaled by Write() when the pipe transitions from empty
bool IsFull() const
Check if the pipe is currently full.
size_t TryReadBulk(void *dst, size_t count)
Attempt to read multiple elements from the pipe without blocking.
const size_t m_element_size
size of each element in bytes
static const size_t CAPACITY_MAX
Max capacity supported (number of elements).
Pipe(uint8_t *buf, size_t capacity, size_t element_size)
Constructor.
size_t m_tail
read index (next slot to be read by Read())
uint8_t * m_buffer
flat byte ring-buffer: capacity slots of element_size bytes each
size_t DrainLocked(uint8_t *dst_bytes, size_t count)
size_t GetSpace() const
Get the number of free slots currently available.
size_t ReadBulkTriggered(void *dst, size_t trigger, size_t max_count, Timeout timeout_ticks=WAIT_INFINITE)
Read at least trigger elements, then drain up to max_count without blocking.
bool IsStorageValid() const
Verify that the backing storage is valid and the pipe is ready for use.
uint8_t * GetBuffer()
Get a pointer to the raw backing buffer.
size_t m_count
current number of elements stored in the pipe
size_t TryReadBulk(T *dst, size_t count)
Attempt to read multiple elements from the pipe without blocking.
size_t GetCount() const
Get the current number of elements in the pipe.
bool IsEmpty() const
Check if the pipe is currently empty.
size_t ReadBulk(T *dst, size_t count, Timeout timeout_ticks=WAIT_INFINITE)
Read multiple elements from the pipe.
bool Write(const T &data, Timeout timeout_ticks=WAIT_INFINITE)
Write data to the pipe.
bool TryRead(T &data)
Attempt to read data from the pipe without blocking.
PipeT()
Constructor.
bool Read(T &data, Timeout timeout_ticks=WAIT_INFINITE)
Read data from the pipe.
bool TryWrite(const T &data)
Attempt to write data to the pipe without blocking.
size_t TryWriteBulk(const T *src, size_t count)
Attempt to write multiple elements to the pipe without blocking.
size_t WriteBulk(const T *src, size_t count, Timeout timeout_ticks=WAIT_INFINITE)
Write multiple elements to the pipe.
void Reset()
Discard all elements and reset the pipe to the empty state.
bool IsFull() const
Check if the pipe is currently full.
size_t GetSpace() const
Get the number of free slots currently available.
size_t GetCapacity() const
Get the maximum number of elements the pipe can hold.