Skip to content

sgnts.base

AdapterConfig dataclass

Config to hold parameters used for the audioadapter in _TSTransSink.

Parameters:

Name Type Description Default
overlap tuple[int, int]

tuple[int, int], the overlap before and after the data segment to process, in offsets

(0, 0)
stride int

int, the stride to produce, in offsets

0
pad_zeros_startup bool

bool, when overlap is provided, whether to pad zeros in front of the first buffer, or wait until there is enough data.

False
skip_gaps bool

bool, produce a whole gap buffer if there are any gaps in the copied data segment

False
backend type[ArrayBackend]

type[ArrayBackend], the ArrayBackend wrapper

NumpyBackend
Source code in sgnts/base/__init__.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@dataclass
class AdapterConfig:
    """Config to hold parameters used for the audioadapter in _TSTransSink.

    Args:
        overlap:
            tuple[int, int], the overlap before and after the data segment to process,
            in offsets
        stride:
            int, the stride to produce, in offsets
        pad_zeros_startup:
            bool, when overlap is provided, whether to pad zeros in front of the
            first buffer, or wait until there is enough data.
        skip_gaps:
            bool, produce a whole gap buffer if there are any gaps in the copied data
            segment
        backend:
            type[ArrayBackend], the ArrayBackend wrapper
    """

    overlap: tuple[int, int] = (0, 0)
    stride: int = 0
    pad_zeros_startup: bool = False
    skip_gaps: bool = False
    backend: type[ArrayBackend] = NumpyBackend

    def valid_buffer(self, buf, data: Optional[Union[int, Array]] = 0):
        """
        Return a new buffer corresponding to the non overlapping part of a
        buffer "buf" as defined by this classes overlap properties As a special case,
        if the buffer is shape zero (a heartbeat buffer) a new heartbeat buffer is
        returned with the offsets shifted by overlap[0].
        Otherwise, in order for the buffer to be valid it must be what is expected
        based on the adapter's overlap and stride etc.
        """

        if buf.shape == (0,):
            new_slice = TSSlice(
                buf.slice[0] + self.overlap[0], buf.slice[0] + self.overlap[0]
            )
            return buf.new(new_slice, data=None)
        else:
            expected_shape = (
                Offset.tosamples(self.overlap[0], buf.sample_rate)
                + Offset.tosamples(self.overlap[1], buf.sample_rate)
                + Offset.sample_stride(buf.sample_rate),
            )
            assert buf.shape == expected_shape
            new_slice = TSSlice(
                buf.slice[0] + self.overlap[0], buf.slice[1] - self.overlap[1]
            )
            return buf.new(new_slice, data)

valid_buffer(buf, data=0)

Return a new buffer corresponding to the non overlapping part of a buffer "buf" as defined by this classes overlap properties As a special case, if the buffer is shape zero (a heartbeat buffer) a new heartbeat buffer is returned with the offsets shifted by overlap[0]. Otherwise, in order for the buffer to be valid it must be what is expected based on the adapter's overlap and stride etc.

Source code in sgnts/base/__init__.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def valid_buffer(self, buf, data: Optional[Union[int, Array]] = 0):
    """
    Return a new buffer corresponding to the non overlapping part of a
    buffer "buf" as defined by this classes overlap properties As a special case,
    if the buffer is shape zero (a heartbeat buffer) a new heartbeat buffer is
    returned with the offsets shifted by overlap[0].
    Otherwise, in order for the buffer to be valid it must be what is expected
    based on the adapter's overlap and stride etc.
    """

    if buf.shape == (0,):
        new_slice = TSSlice(
            buf.slice[0] + self.overlap[0], buf.slice[0] + self.overlap[0]
        )
        return buf.new(new_slice, data=None)
    else:
        expected_shape = (
            Offset.tosamples(self.overlap[0], buf.sample_rate)
            + Offset.tosamples(self.overlap[1], buf.sample_rate)
            + Offset.sample_stride(buf.sample_rate),
        )
        assert buf.shape == expected_shape
        new_slice = TSSlice(
            buf.slice[0] + self.overlap[0], buf.slice[1] - self.overlap[1]
        )
        return buf.new(new_slice, data)

_TSTransSink dataclass

Base class for TSTransforms and TSSinks.

This will produce aligned frames in preparedframes. If adapter_config is provided, will trigger the audioadapter to queue data, and make padded or strided frames in preparedframes.

Parameters:

Name Type Description Default
max_age int

int, the max age before timeout, in nanoseconds

100 * SECONDS
adapter_config Optional[AdapterConfig]

AdapterConfig, holds parameters used for audioadapter behavior

None
Source code in sgnts/base/__init__.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
@dataclass
class _TSTransSink:  # (HasValueProtocol):
    # class _TSTransSink(ElementLike)
    """Base class for TSTransforms and TSSinks.

    This will produce aligned frames in preparedframes. If
    adapter_config is provided, will trigger the audioadapter to queue
    data, and make padded or strided frames in preparedframes.

    Args:
        max_age:
            int, the max age before timeout, in nanoseconds
        adapter_config:
            AdapterConfig, holds parameters used for audioadapter behavior

    """

    max_age: int = 100 * Time.SECONDS
    adapter_config: Optional[AdapterConfig] = None

    def __post_init__(self):

        self._is_aligned = False
        self.inbufs = {p: Audioadapter() for p in self.sink_pads}
        self.preparedframes = {p: None for p in self.sink_pads}
        self.at_EOS = False
        self._last_ts = {p: None for p in self.sink_pads}
        self._last_offset = {p: None for p in self.sink_pads}
        self.metadata = {p: None for p in self.sink_pads}
        self.audioadapters = None
        if self.adapter_config is not None:
            self.overlap = self.adapter_config.overlap
            self.stride = self.adapter_config.stride
            self.pad_zeros_startup = self.adapter_config.pad_zeros_startup
            self.skip_gaps = self.adapter_config.skip_gaps

            # we need audioadapters
            self.audioadapters = {
                p: Audioadapter(backend=self.adapter_config.backend)
                for p in self.sink_pads
            }
            self.pad_zeros_offset = 0
            if self.pad_zeros_startup is True:
                # at startup, pad zeros in front of the first buffer to
                # serve as history
                self.pad_zeros_offset = self.overlap[0]
            self.preparedoutoffsets = {p: None for p in self.sink_pads}

    def pull(self, pad: SinkPad, frame: TSFrame) -> None:
        """Pull data from the input pads (source pads of upstream elements) and queue
        data to perform alignment once frames from all pads are pulled.

        Args:
            pad:
                SinkPad, The sink pad that is pulling the frame
            frame:
                Frame, The frame that is pulled to sink pad
        """

        self.at_EOS |= frame.EOS

        # extend and check the buffers
        for buf in frame:
            self.inbufs[pad].push(buf)
        self.metadata[pad] = frame.metadata
        if self.timeout(pad):
            raise ValueError("pad %s has timed out" % pad.name)

    def __adapter(self, pad: SinkPad, frame: TSFrame) -> list[SeriesBuffer]:
        """Use the audioadapter to handle streaming scenarios.

        This will pad with overlap before and after the target output
        data, and produce fixed-stride frames.

        The self.preparedframes are padded with the requested overlap padding. This
        method also produces a self.preparedoutoffsets, that infers the metadata
        information for the output buffer, with the data initialized as None.
        Downstream transforms can directly use the frames from self.preparedframes for
        computation, and then use the offset and noffset information in
        self.preparedoutoffsets to construct the output frame.

        If stride is not provided, the audioadapter will push out as many samples as it
        can. If stride is provided, the audioadapter will wait until there are enough
        samples to produce prepared frames.

        Args:
            pad:
                SinkPad, the sink pad on which to prepare adapted frames
            frame:
                TSFrame, the aligned frame

        Returns:
            list[SeriesBuffers], a list of SeriesBuffers that are adapted according to
            the adapter_config

        Examples:
            upsampling:
                kernel length = 17
                need to pad 8 samples before and after
                overlap_samples = (8, 8)
                stride_samples = 16
                                                for output
                preparedframes:     ________................________
                                                stride
                                    pad         samples=16  pad
                                    samples=8               samples=8


            correlation:
                filter length = 16
                need to pad filter_length - 1 samples
                overlap_samples = (15, 0)
                stride_samples = 8
                                                    for output
                preparedframes:     ----------------........
                                                    stride_samples=8
                                    pad
                                    samples=15

        """
        a = self.audioadapters[pad]
        buf0 = frame[0]
        sample_rate = buf0.sample_rate
        overlap_samples = tuple(Offset.tosamples(o, sample_rate) for o in self.overlap)
        stride_samples = Offset.tosamples(self.stride, sample_rate)
        pad_zeros_samples = Offset.tosamples(self.pad_zeros_offset, sample_rate)

        # push all buffers in the frame into the audioadapter
        for buf in frame:
            a.push(buf)

        # Check whether we have enough samples to produce a frame
        min_samples = sum(overlap_samples) + (stride_samples or 1) - pad_zeros_samples

        # figure out the offset for preparedframes and preparedoutoffsets
        offset = a.offset - self.pad_zeros_offset
        outoffset = offset + self.overlap[0]
        preparedbufs = []
        if a.size < min_samples:
            # not enough samples to produce output yet
            # make a heartbeat buffer
            shape = buf0.shape[:-1] + (0,)
            preparedbufs.append(
                SeriesBuffer(
                    offset=offset, sample_rate=sample_rate, data=None, shape=shape
                )
            )
            # prepare output frames, one buffer per frame
            self.preparedoutoffsets[pad] = [{"offset": outoffset, "noffset": 0}]

        else:
            # We have enough samples, find out how many samples to copy
            # out of the audioadapter
            # copy all of the samples in the audioadapter
            if self.stride == 0:
                # provide all the data
                num_copy_samples = a.size
            else:
                num_copy_samples = min_samples

            outoffsets = []

            segment_has_gap, segment_has_nongap = a.segment_gaps_info(
                (
                    a.offset,
                    a.offset + Offset.fromsamples(num_copy_samples, a.sample_rate),
                )
            )

            if not segment_has_nongap or (self.skip_gaps and segment_has_gap):
                # produce a gap buffer if
                # 1. the whole segment is a gap or
                # 2. there are gaps in the segment and we are skipping gaps
                data = None
            else:
                # copy out samples from head of audioadapter
                data = a.copy_samples(num_copy_samples)
                if self.pad_zeros_offset > 0 and self.adapter_config is not None:
                    # pad zeros in front of buffer
                    data = self.adapter_config.backend.pad(data, (pad_zeros_samples, 0))

            # flush out samples from head of audioadapter
            num_flush_samples = num_copy_samples - sum(overlap_samples)
            if num_flush_samples > 0:
                a.flush_samples(num_flush_samples)

            shape = buf0.shape[:-1] + (num_copy_samples + pad_zeros_samples,)

            # update next zeros padding
            self.pad_zeros_offset = -min(
                0, Offset.fromsamples(num_flush_samples, sample_rate)
            )
            pbuf = SeriesBuffer(
                offset=offset, sample_rate=sample_rate, data=data, shape=shape
            )
            preparedbufs.append(pbuf)
            outnoffset = pbuf.noffset - sum(self.overlap)
            outoffsets.append({"offset": outoffset, "noffset": outnoffset})

            self.preparedoutoffsets[pad] = outoffsets

        return preparedbufs

    def internal(self) -> None:
        """Align buffers from all the sink pads.

        If AdapterConfig is provided, perform the requested
        overlap/stride streaming of frames.

        """
        # align if possible
        self._align()

        # put in heartbeat buffer if not aligned
        if not self._is_aligned:
            # I tried to fix this properly see the notes above the definition
            # of _TSTransSink
            for sink_pad in self.sink_pads:  # type: ignore
                self.preparedframes[sink_pad] = TSFrame(
                    EOS=self.at_EOS,
                    buffers=[
                        SeriesBuffer(
                            offset=self.earliest,
                            sample_rate=self.inbufs[sink_pad].sample_rate,
                            data=None,
                            shape=self.inbufs[sink_pad].buffers[0].shape[:-1] + (0,),
                        ),
                    ],
                    metadata=self.metadata[sink_pad],
                )
        # Else pack all the buffers
        else:
            min_latest = self.min_latest
            earliest = self.earliest
            # I tried to fix this properly see the notes above the definition
            # of _TSTransSink
            for sink_pad in self.sink_pads:  # type: ignore
                out = self.inbufs[sink_pad].get_sliced_buffers(
                    (earliest, min_latest), pad_start=True
                )
                if min_latest > self.inbufs[sink_pad].offset:
                    self.inbufs[sink_pad].flush_samples_by_end_offset(min_latest)
                assert len(out) > 0
                if self.adapter_config is not None:
                    out = self.__adapter(sink_pad, out)
                self.preparedframes[sink_pad] = TSFrame(
                    EOS=self.at_EOS,
                    buffers=out,
                    metadata=self.metadata[sink_pad],
                )

    def _align(self) -> None:
        """Align the buffers in self.inbufs."""

        def slice_from_pad(inbufs):
            if len(inbufs) > 0:
                return TSSlice(inbufs.offset, inbufs.end_offset)
            else:
                return TSSlice(-1, -1)

        def __can_align(self=self):
            return TSSlices(
                [slice_from_pad(self.inbufs[p]) for p in self.inbufs]
            ).intersection()

        if not self._is_aligned and __can_align():
            self._is_aligned = True

    def timeout(self, pad: SinkPad) -> bool:
        """Whether pad has timed-out due to oldest buffer exceeding max age.

        Args:
            pad:
                SinkPad, the sink pad to check for timeout

        Returns:
            bool, whether pad has timed-out

        """
        return self.inbufs[pad].end_offset - self.inbufs[pad].offset > Offset.fromns(
            self.max_age
        )

    def latest_by_pad(self, pad: SinkPad) -> int:
        """The latest offset among the queued up buffers in this pad.

        Args:
            pad:
                SinkPad, the requested sink pad

        Returns:
            int, the latest offset in the pad's buffer queue

        """
        return self.inbufs[pad].end_offset if self.inbufs[pad] else -1

    def earliest_by_pad(self, pad) -> int:
        """The earliest offset among the queued up buffers in this pad.

        Args:
            pad:
                SinkPad, the requested sink pad

        Returns:
            int, the earliest offset in the pad's buffer queue

        """
        return self.inbufs[pad].offset if self.inbufs[pad] else -1

    @property
    def latest(self):
        """The latest offset among all the buffers from all the pads."""
        return max(self.latest_by_pad(n) for n in self.inbufs)

    @property
    def earliest(self):
        """The earliest offset among all the buffers from all the pads."""
        return min(self.earliest_by_pad(n) for n in self.inbufs)

    @property
    def min_latest(self):
        """The earliest offset among each pad's latest offset."""
        return min(self.latest_by_pad(n) for n in self.inbufs)

earliest property

The earliest offset among all the buffers from all the pads.

latest property

The latest offset among all the buffers from all the pads.

min_latest property

The earliest offset among each pad's latest offset.

__adapter(pad, frame)

Use the audioadapter to handle streaming scenarios.

This will pad with overlap before and after the target output data, and produce fixed-stride frames.

The self.preparedframes are padded with the requested overlap padding. This method also produces a self.preparedoutoffsets, that infers the metadata information for the output buffer, with the data initialized as None. Downstream transforms can directly use the frames from self.preparedframes for computation, and then use the offset and noffset information in self.preparedoutoffsets to construct the output frame.

If stride is not provided, the audioadapter will push out as many samples as it can. If stride is provided, the audioadapter will wait until there are enough samples to produce prepared frames.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the sink pad on which to prepare adapted frames

required
frame TSFrame

TSFrame, the aligned frame

required

Returns:

Type Description
list[SeriesBuffer]

list[SeriesBuffers], a list of SeriesBuffers that are adapted according to

list[SeriesBuffer]

the adapter_config

Examples:

upsampling: kernel length = 17 need to pad 8 samples before and after overlap_samples = (8, 8) stride_samples = 16 for output preparedframes: _................_ stride pad samples=16 pad samples=8 samples=8

correlation: filter length = 16 need to pad filter_length - 1 samples overlap_samples = (15, 0) stride_samples = 8 for output preparedframes: ----------------........ stride_samples=8 pad samples=15

Source code in sgnts/base/__init__.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def __adapter(self, pad: SinkPad, frame: TSFrame) -> list[SeriesBuffer]:
    """Use the audioadapter to handle streaming scenarios.

    This will pad with overlap before and after the target output
    data, and produce fixed-stride frames.

    The self.preparedframes are padded with the requested overlap padding. This
    method also produces a self.preparedoutoffsets, that infers the metadata
    information for the output buffer, with the data initialized as None.
    Downstream transforms can directly use the frames from self.preparedframes for
    computation, and then use the offset and noffset information in
    self.preparedoutoffsets to construct the output frame.

    If stride is not provided, the audioadapter will push out as many samples as it
    can. If stride is provided, the audioadapter will wait until there are enough
    samples to produce prepared frames.

    Args:
        pad:
            SinkPad, the sink pad on which to prepare adapted frames
        frame:
            TSFrame, the aligned frame

    Returns:
        list[SeriesBuffers], a list of SeriesBuffers that are adapted according to
        the adapter_config

    Examples:
        upsampling:
            kernel length = 17
            need to pad 8 samples before and after
            overlap_samples = (8, 8)
            stride_samples = 16
                                            for output
            preparedframes:     ________................________
                                            stride
                                pad         samples=16  pad
                                samples=8               samples=8


        correlation:
            filter length = 16
            need to pad filter_length - 1 samples
            overlap_samples = (15, 0)
            stride_samples = 8
                                                for output
            preparedframes:     ----------------........
                                                stride_samples=8
                                pad
                                samples=15

    """
    a = self.audioadapters[pad]
    buf0 = frame[0]
    sample_rate = buf0.sample_rate
    overlap_samples = tuple(Offset.tosamples(o, sample_rate) for o in self.overlap)
    stride_samples = Offset.tosamples(self.stride, sample_rate)
    pad_zeros_samples = Offset.tosamples(self.pad_zeros_offset, sample_rate)

    # push all buffers in the frame into the audioadapter
    for buf in frame:
        a.push(buf)

    # Check whether we have enough samples to produce a frame
    min_samples = sum(overlap_samples) + (stride_samples or 1) - pad_zeros_samples

    # figure out the offset for preparedframes and preparedoutoffsets
    offset = a.offset - self.pad_zeros_offset
    outoffset = offset + self.overlap[0]
    preparedbufs = []
    if a.size < min_samples:
        # not enough samples to produce output yet
        # make a heartbeat buffer
        shape = buf0.shape[:-1] + (0,)
        preparedbufs.append(
            SeriesBuffer(
                offset=offset, sample_rate=sample_rate, data=None, shape=shape
            )
        )
        # prepare output frames, one buffer per frame
        self.preparedoutoffsets[pad] = [{"offset": outoffset, "noffset": 0}]

    else:
        # We have enough samples, find out how many samples to copy
        # out of the audioadapter
        # copy all of the samples in the audioadapter
        if self.stride == 0:
            # provide all the data
            num_copy_samples = a.size
        else:
            num_copy_samples = min_samples

        outoffsets = []

        segment_has_gap, segment_has_nongap = a.segment_gaps_info(
            (
                a.offset,
                a.offset + Offset.fromsamples(num_copy_samples, a.sample_rate),
            )
        )

        if not segment_has_nongap or (self.skip_gaps and segment_has_gap):
            # produce a gap buffer if
            # 1. the whole segment is a gap or
            # 2. there are gaps in the segment and we are skipping gaps
            data = None
        else:
            # copy out samples from head of audioadapter
            data = a.copy_samples(num_copy_samples)
            if self.pad_zeros_offset > 0 and self.adapter_config is not None:
                # pad zeros in front of buffer
                data = self.adapter_config.backend.pad(data, (pad_zeros_samples, 0))

        # flush out samples from head of audioadapter
        num_flush_samples = num_copy_samples - sum(overlap_samples)
        if num_flush_samples > 0:
            a.flush_samples(num_flush_samples)

        shape = buf0.shape[:-1] + (num_copy_samples + pad_zeros_samples,)

        # update next zeros padding
        self.pad_zeros_offset = -min(
            0, Offset.fromsamples(num_flush_samples, sample_rate)
        )
        pbuf = SeriesBuffer(
            offset=offset, sample_rate=sample_rate, data=data, shape=shape
        )
        preparedbufs.append(pbuf)
        outnoffset = pbuf.noffset - sum(self.overlap)
        outoffsets.append({"offset": outoffset, "noffset": outnoffset})

        self.preparedoutoffsets[pad] = outoffsets

    return preparedbufs

earliest_by_pad(pad)

The earliest offset among the queued up buffers in this pad.

Parameters:

Name Type Description Default
pad

SinkPad, the requested sink pad

required

Returns:

Type Description
int

int, the earliest offset in the pad's buffer queue

Source code in sgnts/base/__init__.py
393
394
395
396
397
398
399
400
401
402
403
404
def earliest_by_pad(self, pad) -> int:
    """The earliest offset among the queued up buffers in this pad.

    Args:
        pad:
            SinkPad, the requested sink pad

    Returns:
        int, the earliest offset in the pad's buffer queue

    """
    return self.inbufs[pad].offset if self.inbufs[pad] else -1

internal()

Align buffers from all the sink pads.

If AdapterConfig is provided, perform the requested overlap/stride streaming of frames.

Source code in sgnts/base/__init__.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def internal(self) -> None:
    """Align buffers from all the sink pads.

    If AdapterConfig is provided, perform the requested
    overlap/stride streaming of frames.

    """
    # align if possible
    self._align()

    # put in heartbeat buffer if not aligned
    if not self._is_aligned:
        # I tried to fix this properly see the notes above the definition
        # of _TSTransSink
        for sink_pad in self.sink_pads:  # type: ignore
            self.preparedframes[sink_pad] = TSFrame(
                EOS=self.at_EOS,
                buffers=[
                    SeriesBuffer(
                        offset=self.earliest,
                        sample_rate=self.inbufs[sink_pad].sample_rate,
                        data=None,
                        shape=self.inbufs[sink_pad].buffers[0].shape[:-1] + (0,),
                    ),
                ],
                metadata=self.metadata[sink_pad],
            )
    # Else pack all the buffers
    else:
        min_latest = self.min_latest
        earliest = self.earliest
        # I tried to fix this properly see the notes above the definition
        # of _TSTransSink
        for sink_pad in self.sink_pads:  # type: ignore
            out = self.inbufs[sink_pad].get_sliced_buffers(
                (earliest, min_latest), pad_start=True
            )
            if min_latest > self.inbufs[sink_pad].offset:
                self.inbufs[sink_pad].flush_samples_by_end_offset(min_latest)
            assert len(out) > 0
            if self.adapter_config is not None:
                out = self.__adapter(sink_pad, out)
            self.preparedframes[sink_pad] = TSFrame(
                EOS=self.at_EOS,
                buffers=out,
                metadata=self.metadata[sink_pad],
            )

latest_by_pad(pad)

The latest offset among the queued up buffers in this pad.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the requested sink pad

required

Returns:

Type Description
int

int, the latest offset in the pad's buffer queue

Source code in sgnts/base/__init__.py
380
381
382
383
384
385
386
387
388
389
390
391
def latest_by_pad(self, pad: SinkPad) -> int:
    """The latest offset among the queued up buffers in this pad.

    Args:
        pad:
            SinkPad, the requested sink pad

    Returns:
        int, the latest offset in the pad's buffer queue

    """
    return self.inbufs[pad].end_offset if self.inbufs[pad] else -1

pull(pad, frame)

Pull data from the input pads (source pads of upstream elements) and queue data to perform alignment once frames from all pads are pulled.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, The sink pad that is pulling the frame

required
frame TSFrame

Frame, The frame that is pulled to sink pad

required
Source code in sgnts/base/__init__.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def pull(self, pad: SinkPad, frame: TSFrame) -> None:
    """Pull data from the input pads (source pads of upstream elements) and queue
    data to perform alignment once frames from all pads are pulled.

    Args:
        pad:
            SinkPad, The sink pad that is pulling the frame
        frame:
            Frame, The frame that is pulled to sink pad
    """

    self.at_EOS |= frame.EOS

    # extend and check the buffers
    for buf in frame:
        self.inbufs[pad].push(buf)
    self.metadata[pad] = frame.metadata
    if self.timeout(pad):
        raise ValueError("pad %s has timed out" % pad.name)

timeout(pad)

Whether pad has timed-out due to oldest buffer exceeding max age.

Parameters:

Name Type Description Default
pad SinkPad

SinkPad, the sink pad to check for timeout

required

Returns:

Type Description
bool

bool, whether pad has timed-out

Source code in sgnts/base/__init__.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def timeout(self, pad: SinkPad) -> bool:
    """Whether pad has timed-out due to oldest buffer exceeding max age.

    Args:
        pad:
            SinkPad, the sink pad to check for timeout

    Returns:
        bool, whether pad has timed-out

    """
    return self.inbufs[pad].end_offset - self.inbufs[pad].offset > Offset.fromns(
        self.max_age
    )

TSTransform dataclass

Bases: TransformElement, _TSTransSink

A time-series transform element.

Source code in sgnts/base/__init__.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
@dataclass
class TSTransform(TransformElement, _TSTransSink):
    """A time-series transform element."""

    # FIXME mypy complains that this takes a TSFrame instead of a Frame.  Not
    # sure what the right fix is.
    # FIXME, I also cannot get type hints to work
    # pull: Callable[[SourcePad, TSFrame], None] = _TSTransSink.pull  # type: ignore
    pull = _TSTransSink.pull  # type: ignore

    def __post_init__(self):
        TransformElement.__post_init__(self)
        _TSTransSink.__post_init__(self)

    def internal(self):
        _TSTransSink.internal(self)

    def new(self, pad: SourcePad) -> TSFrame:
        """The transform function must be provided by the subclass.

        It should take the source pad as an argument and return a new
        TSFrame.

        Args:
            pad:
                SourcePad, The source pad that is producing the transformed frame

        Returns:
            TSFrame, The transformed frame

        """
        raise NotImplementedError

new(pad)

The transform function must be provided by the subclass.

It should take the source pad as an argument and return a new TSFrame.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, The source pad that is producing the transformed frame

required

Returns:

Type Description
TSFrame

TSFrame, The transformed frame

Source code in sgnts/base/__init__.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def new(self, pad: SourcePad) -> TSFrame:
    """The transform function must be provided by the subclass.

    It should take the source pad as an argument and return a new
    TSFrame.

    Args:
        pad:
            SourcePad, The source pad that is producing the transformed frame

    Returns:
        TSFrame, The transformed frame

    """
    raise NotImplementedError

TSSink dataclass

Bases: SinkElement, _TSTransSink

A time-series sink element.

Source code in sgnts/base/__init__.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
@dataclass
class TSSink(SinkElement, _TSTransSink):
    """A time-series sink element."""

    # FIXME mypy complains that this takes a TSFrame instead of a Frame.  Not
    # sure what the right fix is.
    # FIXME, I also cannot get type hints to work
    # pull: Callable[[SourcePad, TSFrame], None] = _TSTransSink.pull  # type: ignore
    pull = _TSTransSink.pull  # type: ignore

    def __post_init__(self):
        SinkElement.__post_init__(self)
        _TSTransSink.__post_init__(self)

    def internal(self):
        _TSTransSink.internal(self)

TSSource dataclass

Bases: SourceElement, SignalEOS

A time-series source that generates data in fixed-size buffers.

Parameters:

Name Type Description Default
t0 float | None

float, start time of first buffer, in seconds

None
end float | None

float, end time of the last buffer, in seconds

None
duration float | None

float, alternative to end option, specify the duration of time to be covered

None
Source code in sgnts/base/__init__.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
@dataclass
class TSSource(SourceElement, SignalEOS):
    """A time-series source that generates data in fixed-size buffers.

    Args:
        t0:
            float, start time of first buffer, in seconds
        end:
            float, end time of the last buffer, in seconds
        duration:
            float, alternative to end option, specify the duration of
            time to be covered

    """

    t0: float | None = None
    end: float | None = None
    duration: float | None = None

    def __post_init__(self):
        super().__post_init__()
        self.t0 = self.t0 or 0
        # FIXME should we be more careful about this?
        # FIXME should this not be different by pad?
        self.offset = {
            p: Offset.fromsec(self.t0 - Offset.offset_ref_t0 / Time.SECONDS)
            for p in self.source_pads
        }
        if self.end and self.duration:
            raise RuntimeError("may specify either end or duration, not both")
        if self.end and self.end < self.t0:
            raise RuntimeError("end is before t0")
        if self.duration:
            self.end = self.t0 + self.duration
        # FIXME should this be different by pad?
        if self.end is not None and not isinf(self.end):
            self.end_offset = Offset.fromsec(
                self.end - Offset.offset_ref_t0 / Time.SECONDS
            )
        else:
            self.end_offset = float("+inf")
        self.__new_buffer_dict = {}

    def num_samples(self, rate: int) -> int:
        """The number of samples in the sample stride at the requested rate.

        Args:
            rate:
                int, the sample rate

        Returns:
            int, the number of samples

        """
        return Offset.sample_stride(rate)

    def set_pad_buffer_params(
        self,
        pad: SourcePad,
        sample_shape: tuple[int, ...],
        rate: int,
    ) -> None:
        """Set variables on the pad that are needed to construct SeriesBuffers.

        These should remain constant throughout the duration of the
        pipeline.

        Args:
            pad:
                SourcePad, the pad to setup buffers on
            sample_shape:
                tuple[int, ...], the shape of a single sample of the
                data, or put another way, the shape of the data except
                for the last (time) dimension,
                i.e. sample_shape=data.shape[:-1]
            rate:
                int, the sample rate of the data the pad will produce

        """
        self.__new_buffer_dict[pad] = {
            "sample_rate": rate,
            "shape": sample_shape + (self.num_samples(rate),),
        }

    def prepare_frame(
        self,
        pad: SourcePad,
        data: Optional[Union[int, Array]] = None,
        EOS: Optional[bool] = None,
        metadata: Optional[dict] = None,
    ) -> TSFrame:
        """Prepare the next TSFrame that the source pad will produce.

        The offset will be advanced by the stride in
        Offset.SAMPLE_STRIDE_AT_MAX_RATE.

        Args:
            pad:
                SourcePad, the source pad to produce the TSFrame
            data:
                Optional[int, Array], the data in the buffers
            EOS:
                Optioinal[bool], whether the TSFrame is at EOS
            metadata:
                Optional[dict], the metadata in the TSFrame

        Returns:
            TSFrame, the TSFrame prepared on the source pad

        """
        buf = SeriesBuffer(
            offset=self.offset[pad], data=data, **self.__new_buffer_dict[pad]
        )
        if buf.end_offset > self.end_offset:
            # slice the buffer if the last buffer is not a full stride
            buf = buf.sub_buffer(TSSlice(buf.offset, self.end_offset))

        EOS = (
            (buf.end_offset == self.end_offset or self.signaled_eos())
            if EOS is None
            else (EOS or (buf.end_offset == self.end_offset) or self.signaled_eos())
        )
        if metadata is None:
            metadata = {}

        self.offset[pad] += Offset.fromsamples(buf.samples, buf.sample_rate)

        return TSFrame(
            buffers=[buf],
            EOS=EOS,
            metadata=metadata,
        )

num_samples(rate)

The number of samples in the sample stride at the requested rate.

Parameters:

Name Type Description Default
rate int

int, the sample rate

required

Returns:

Type Description
int

int, the number of samples

Source code in sgnts/base/__init__.py
517
518
519
520
521
522
523
524
525
526
527
528
def num_samples(self, rate: int) -> int:
    """The number of samples in the sample stride at the requested rate.

    Args:
        rate:
            int, the sample rate

    Returns:
        int, the number of samples

    """
    return Offset.sample_stride(rate)

prepare_frame(pad, data=None, EOS=None, metadata=None)

Prepare the next TSFrame that the source pad will produce.

The offset will be advanced by the stride in Offset.SAMPLE_STRIDE_AT_MAX_RATE.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the source pad to produce the TSFrame

required
data Optional[Union[int, Array]]

Optional[int, Array], the data in the buffers

None
EOS Optional[bool]

Optioinal[bool], whether the TSFrame is at EOS

None
metadata Optional[dict]

Optional[dict], the metadata in the TSFrame

None

Returns:

Type Description
TSFrame

TSFrame, the TSFrame prepared on the source pad

Source code in sgnts/base/__init__.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def prepare_frame(
    self,
    pad: SourcePad,
    data: Optional[Union[int, Array]] = None,
    EOS: Optional[bool] = None,
    metadata: Optional[dict] = None,
) -> TSFrame:
    """Prepare the next TSFrame that the source pad will produce.

    The offset will be advanced by the stride in
    Offset.SAMPLE_STRIDE_AT_MAX_RATE.

    Args:
        pad:
            SourcePad, the source pad to produce the TSFrame
        data:
            Optional[int, Array], the data in the buffers
        EOS:
            Optioinal[bool], whether the TSFrame is at EOS
        metadata:
            Optional[dict], the metadata in the TSFrame

    Returns:
        TSFrame, the TSFrame prepared on the source pad

    """
    buf = SeriesBuffer(
        offset=self.offset[pad], data=data, **self.__new_buffer_dict[pad]
    )
    if buf.end_offset > self.end_offset:
        # slice the buffer if the last buffer is not a full stride
        buf = buf.sub_buffer(TSSlice(buf.offset, self.end_offset))

    EOS = (
        (buf.end_offset == self.end_offset or self.signaled_eos())
        if EOS is None
        else (EOS or (buf.end_offset == self.end_offset) or self.signaled_eos())
    )
    if metadata is None:
        metadata = {}

    self.offset[pad] += Offset.fromsamples(buf.samples, buf.sample_rate)

    return TSFrame(
        buffers=[buf],
        EOS=EOS,
        metadata=metadata,
    )

set_pad_buffer_params(pad, sample_shape, rate)

Set variables on the pad that are needed to construct SeriesBuffers.

These should remain constant throughout the duration of the pipeline.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad to setup buffers on

required
sample_shape tuple[int, ...]

tuple[int, ...], the shape of a single sample of the data, or put another way, the shape of the data except for the last (time) dimension, i.e. sample_shape=data.shape[:-1]

required
rate int

int, the sample rate of the data the pad will produce

required
Source code in sgnts/base/__init__.py
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def set_pad_buffer_params(
    self,
    pad: SourcePad,
    sample_shape: tuple[int, ...],
    rate: int,
) -> None:
    """Set variables on the pad that are needed to construct SeriesBuffers.

    These should remain constant throughout the duration of the
    pipeline.

    Args:
        pad:
            SourcePad, the pad to setup buffers on
        sample_shape:
            tuple[int, ...], the shape of a single sample of the
            data, or put another way, the shape of the data except
            for the last (time) dimension,
            i.e. sample_shape=data.shape[:-1]
        rate:
            int, the sample rate of the data the pad will produce

    """
    self.__new_buffer_dict[pad] = {
        "sample_rate": rate,
        "shape": sample_shape + (self.num_samples(rate),),
    }