---
name: gdtest-skill-rich
description: >
  Build, run, and monitor data pipelines with gdtest-skill-rich.
  Supports sync and async execution, schema validation, and
  pluggable stages for ETL workflows.
license: Apache-2.0
compatibility: Requires Python >=3.10.
metadata:
  author: gdg-test-suite
  version: "1.0"
  tags:
    - data-pipeline
    - etl
    - streaming
---

# gdtest-skill-rich

A full-featured data-pipeline toolkit for ETL workflows.

## Quick start

```python
from gdtest_skill_rich import Pipeline, Stage, Source, Sink

src = Source("data/input.json")
snk = Sink("data/output.parquet", format="parquet")

pipe = (
    Pipeline(name="etl-demo", schema="v2")
    .add(Stage("clean", fn=clean_fn))
    .add(Stage("transform", fn=transform_fn))
)
pipe.run()
```

## Core concepts

### Pipeline

A `Pipeline` is an ordered chain of `Stage` objects. Pipelines are
**immutable once running** — modifications after `.run()` raise
`RuntimeError`.

### Stage

A stage wraps a callable `fn(data) -> data`. Stages execute in
insertion order.

### Source & Sink

Sources read data; sinks write it. Both accept a `uri`
(file path, URL, or database DSN) and a `format` string.

#### Supported formats

| Format | Source | Sink | Notes |
|--------|--------|------|-------|
| `json` | Yes | Yes | Default format |
| `csv` | Yes | Yes | Header row required |
| `parquet` | Yes | Yes | Requires `pyarrow` |
| `sqlite` | Yes | No | Read-only |
| `postgres` | Yes | Yes | Requires `psycopg2` |

## Decision table

| If you need to… | Then use |
|-----------------|----------|
| Run a simple one-shot ETL | `run_pipeline(source, *stages, sink=sink)` |
| Build a reusable pipeline | `Pipeline().add(stage).add(stage)` |
| Run without blocking | `await pipeline.run_async()` |
| Validate input data | `validate_schema(data, schema="v2")` |
| Read from a database | `Source("postgres://...", format="postgres")` |
| Write to Parquet | `Sink("out.parquet", format="parquet")` |

## Configuration example

```yaml
# great-docs.yml
skill:
  gotchas:
    - "Pipeline.run() blocks until all stages complete."
  best_practices:
    - "Pin to a schema version for reproducibility."
```

## Error handling

```python
try:
    pipe.run()
except PipelineError as e:
    print(f"Stage {e.stage} failed: {e}")
```

## Advanced: custom stages

```python
class MyStage(Stage):
    def __init__(self):
        super().__init__("my-stage", fn=self._process)

    def _process(self, data):
        return [row for row in data if row["active"]]
```

## Capabilities and boundaries

**What agents can configure:**

- Create and run pipelines
- Add custom stages
- Read from files, URLs, and databases
- Write to files and databases
- Validate schemas
- Run async pipelines

**Requires human setup:**

- Database credentials and access
- Installing optional dependencies (`pyarrow`, `psycopg2`)
- Deploying to production infrastructure

## Resources

- [llms.txt](llms.txt) — Indexed API reference for LLMs
- [llms-full.txt](llms-full.txt) — Full documentation for LLMs
