Validation workflow completed.
🚨 ALERT: Critical validation failures found!
Failed steps: 2
--- Validation Summary Report ---
Total validation steps: 3
Passed steps: 1
Failed steps: 2
Highest severity: critical
--- End of Report ---
Validation with Final Actions
Execute actions after validation completes, such as sending alerts or generating summary reports.
import pointblank as pb
def send_alert():
"""Check validation summary and send alert if critical failures found"""
summary = pb.get_validation_summary()
if summary and summary.get("highest_severity") == "critical":
print(f"🚨 ALERT: Critical validation failures found!")
print(f" Failed steps: {summary['n_failing_steps']}")
elif summary and summary.get("highest_severity") == "error":
print(f"⚠️ WARNING: Error-level validation failures detected.")
else:
print("✅ All validation checks passed successfully!")
def generate_summary_report():
"""Generate a summary report of validation results"""
summary = pb.get_validation_summary()
if summary:
print("\n--- Validation Summary Report ---")
print(f"Total validation steps: {summary['n_steps']}")
print(f"Passed steps: {summary['n_passing_steps']}")
print(f"Failed steps: {summary['n_failing_steps']}")
print(f"Highest severity: {summary['highest_severity']}")
print("--- End of Report ---")
validation = (
pb.Validate(
data=pb.load_dataset(dataset="game_revenue", tbl_type="polars"),
label="Validation with final actions",
thresholds=pb.Thresholds(warning=0.05, error=0.10, critical=0.15),
final_actions=pb.FinalActions(
"Validation workflow completed.", # String message
send_alert, # Alert function
generate_summary_report # Report function
)
)
.col_vals_regex(columns="player_id", pattern=r"[A-Z]{12}[0-9]{3}")
.col_vals_gt(columns="item_revenue", value=0.05)
.col_vals_gt(columns="session_duration", value=15)
.interrogate()
)
validationPreview of Input Table
PolarsRows2,000Columns11 |
|||||||||||