← GDG /

#260 gdtest_skill_config

#260 gdtest_skill_config OK CONFIG
Tests enriched auto-generated skill with config overrides
Auto-generated skill enriched with gotchas, best_practices, decision_table, and extra_body from great-docs.yml. Tests that config overrides are injected into the generated skill.md body and rendered correctly on the Skills page with styled tables and code blocks.
View Site → Build Log ๐Ÿงช Test Coverage

Build Mode

● Has great-docs.yml

This package ships a pre-supplied config. The great-docs init step is skipped and great-docs build uses the spec-defined configuration directly. Tests specific config options and their rendered output.

Dimensions

S3
S3Enriched skill (config)skill

Source Files

๐Ÿ“ gdtest_skill_config/
๐Ÿ“„ __init__.py
"""A streaming toolkit."""

__version__ = "0.2.0"
__all__ = ["Stream", "Sink", "batch"]


class Stream:
    """
    A lazy stream reader.

    Parameters
    ----------
    source
        A file path or file-like object.
    chunk_size
        Read chunk size in bytes.
    timeout
        Read timeout in seconds.
    """

    def __init__(
        self,
        source: str,
        chunk_size: int = 1024,
        timeout: float | None = None,
    ):
        self.source = source
        self.chunk_size = chunk_size
        self.timeout = timeout

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

    def read(self, n: int = -1) -> bytes:
        """
        Read up to n bytes.

        Parameters
        ----------
        n
            Number of bytes. -1 for all remaining.

        Returns
        -------
        bytes
            The data read.
        """
        return b""

    def close(self) -> None:
        """Close the stream and release resources."""
        pass

    def map(self, fn) -> "Stream":
        """
        Apply a function to each element.

        Parameters
        ----------
        fn
            Callable to apply.

        Returns
        -------
        Stream
            A new transformed stream.
        """
        return self

    def filter(self, predicate) -> "Stream":
        """
        Filter elements by a predicate.

        Parameters
        ----------
        predicate
            Callable returning bool.

        Returns
        -------
        Stream
            A new filtered stream.
        """
        return self


class Sink:
    """
    A buffered output sink.

    Parameters
    ----------
    dest
        Destination file path or file-like object.
    buffer_size
        Write buffer size in bytes.
    """

    def __init__(self, dest: str, buffer_size: int = 4096):
        self.dest = dest
        self.buffer_size = buffer_size

    def write(self, data: bytes) -> int:
        """
        Write data to the sink.

        Parameters
        ----------
        data
            Bytes to write.

        Returns
        -------
        int
            Number of bytes written.
        """
        return len(data)

    def flush(self) -> None:
        """Flush the write buffer."""
        pass


def batch(iterable, size: int = 10):
    """
    Yield items from an iterable in fixed-size batches.

    Parameters
    ----------
    iterable
        Input iterable.
    size
        Batch size.

    Yields
    ------
    list
        A batch of items.
    """
    batch_items = []
    for item in iterable:
        batch_items.append(item)
        if len(batch_items) == size:
            yield batch_items
            batch_items = []
    if batch_items:
        yield batch_items
๐Ÿ“„ README.md
# gdtest-skill-config

A streaming toolkit with enriched skill configuration.

## Installation

```bash
pip install gdtest-skill-config
```
๐Ÿ“„ great-docs.yml
skill:
  gotchas:
    - "Always close streams with `stream.close()` or use a context manager."
    - "The `batch()` generator is lazy โ€” it won't pull data until iterated."
    - Default chunk size is 1024 bytes; increase for large payloads.
  best_practices:
    - "Use `with Stream(source) as s:` for automatic cleanup."
    - "Prefer `batch()` over manual iteration for memory efficiency."
    - "Set `timeout` explicitly in production to avoid hangs."
  decision_table:
    - if: Need to read data incrementally
      then: stream = Stream(source)
    - if: Need to process in fixed-size chunks
      then: "for chunk in stream.batch(size=4096): ..."
    - if: Need to transform each element
      then: stream.map(fn) | stream.filter(pred)
    - if: Need buffered output
      then: "sink = Sink(dest, buffer_size=8192)"
  extra_body: |-
    ## Performance notes

    Throughput scales linearly up to ~10 concurrent streams. Beyond that, consider using `asyncio` or a thread pool.

    ## Compatibility

    Works with file-like objects, sockets, and any object implementing the `Readable` protocol.