Validation workflow completed.
🚨 ALERT: Critical validation failures found!
Failed steps: 2
--- Validation Summary Report ---
Total validation steps: 3
Passed steps: 1
Failed steps: 2
Highest severity: critical
--- End of Report ---
Validation with Final Actions
Execute actions after validation completes, such as sending alerts or generating summary reports.
import pointblank as pb
def send_alert():
"""Check validation summary and send alert if critical failures found"""
= pb.get_validation_summary()
summary if summary and summary.get("highest_severity") == "critical":
print(f"🚨 ALERT: Critical validation failures found!")
print(f" Failed steps: {summary['n_failing_steps']}")
elif summary and summary.get("highest_severity") == "error":
print(f"⚠️ WARNING: Error-level validation failures detected.")
else:
print("✅ All validation checks passed successfully!")
def generate_summary_report():
"""Generate a summary report of validation results"""
= pb.get_validation_summary()
summary if summary:
print("\n--- Validation Summary Report ---")
print(f"Total validation steps: {summary['n_steps']}")
print(f"Passed steps: {summary['n_passing_steps']}")
print(f"Failed steps: {summary['n_failing_steps']}")
print(f"Highest severity: {summary['highest_severity']}")
print("--- End of Report ---")
= (
validation
pb.Validate(=pb.load_dataset(dataset="game_revenue", tbl_type="polars"),
data="Validation with final actions",
label=pb.Thresholds(warning=0.05, error=0.10, critical=0.15),
thresholds=pb.FinalActions(
final_actions"Validation workflow completed.", # String message
# Alert function
send_alert, # Report function
generate_summary_report
)
)="player_id", pattern=r"[A-Z]{12}[0-9]{3}")
.col_vals_regex(columns="item_revenue", value=0.05)
.col_vals_gt(columns="session_duration", value=15)
.col_vals_gt(columns
.interrogate()
)
validation
Preview of Input Table
PolarsRows2,000Columns11 |
|||||||||||