---
title: Sketches of Schema-based Validation
tags: schema,ytt
---
## Use Cases
### Required Input
```python=
def validate_dex_namespace():
data.values.dex.namespace or assert.fail("Dex namespace should be provided")
end
```
... for strings, will automatically include validation:
```yaml=
#@data/values-schema
---
dex:
#!@schema/validate min_len=1
namespace: ""
```
... and specifically, error messages are crafted so that they read a lot like, "<field> is required."
Arrays have a similar mechanism, but the author must explicitily require it.
```yaml=
#@data/values-schema
---
dex:
oauth2:
#@schema/validate min_len=1
responseTypes:
- ""
```
Maps and other scalars don't lend themselves well to describing non-user-supplied values. They'll use the null+not_null idiom:
```yaml=
#@data/values-schema
---
dex:
#@schema/nullable
#@schema/validate not_null=True
someMap:
```
Which may be common enough to warrant some syntactic sugar:
```yaml=
#@data/values-schema
---
dex:
#@schema/unset
someMap:
```
(see discussion at [vmware-tanzu/carvel-ytt#556](https://github.com/vmware-tanzu/carvel-ytt/issues/556#issuecomment-1007761200))
_(Note: this may be _more_ than sugar if "required" is captured as explicit metadata.)_
... strings, being the most common type, can be assumed to be "required" ...
```yaml=
#@data/values-schema
---
dex:
namespace: ""
```
In the case the author wishes to allow blank spaces for a string value:
```yaml=
#@data/values-schema
---
dex:
#@schema/validation min_len=0
namespace: ""
```
### Simple Enum
It's quite common for string values to be constrained to a finite set: an enumeration.
```
data.values.dex.config.connector in ("oidc", "ldap") or \
assert.fail("Dex connector should be oidc or ldap")
```
```yaml
#@data/values-schema
---
dex:
config:
#@schema/validate enum=["oidc", "ldap"]
connector: ""
```
### Union Structures
Somtimes, design data values where two (or more) sibling keys are intended to be mutually exclusive. That is: it is invalid for more than one of the siblings to be non-null.
For example:
```yaml=
#@data/values
---
dex:
config:
oidc:
CLIENT_ID: null #! required if oidc enabled
CLIENT_SECRET: null #! required if oidc enabled
issuer: null #! <OIDC_IDP_URL> is required if oidc enabled
ldap:
host: null #! <LDAP_HOST> is required if ldap enabed
bindDN: null
bindPW: null
```
Only one authorization+authentication scheme can be active at a time.
```yaml=
#@data/values-schema
---
dex:
#@overlay/match by=overlay.one_of(["oidc", "ldap"])
#@schema/validate
config:
#@schema/nullable
oidc:
CLIENT_ID: null #! required if oidc enabled
CLIENT_SECRET: null #! required if oidc enabled
issuer: null #! <OIDC_IDP_URL> is required if oidc enabled
#@schema/nullable
ldap:
host: null #! <LDAP_HOST> is required if ldap enabed
bindDN: null
bindPW: null
```
### Conditionally Validated
```python=
if data.values.dex.config.connector == "oidc":
validate_oidc_config()
end
if data.values.dex.config.connector == "ldap":
validate_ldap_config()
end
def validate_oidc_config():
data.values.dex.config.oidc.CLIENT_ID or assert.fail("Dex oidc CLIENT_ID should be provided")
data.values.dex.config.oidc.CLIENT_SECRET or assert.fail("Dex oidc CLIENT_SECRET should be provided")
data.values.dex.config.oidc.issuer or assert.fail("Dex oidc issuer should be provided")
data.values.dex.config.oidc.clientID == "$OIDC_CLIENT_ID" or assert.fail("Dex oidc clientID should be $OIDC_CLIENT_ID. Do not change it")
data.values.dex.config.oidc.clientSecret == "$OIDC_CLIENT_SECRET" or assert.fail("Dex oidc clientSecret should be $OIDC_CLIENT_SECRET. Do not change it")
end
def validate_ldap_config():
data.values.dex.config.ldap.host or assert.fail("Dex ldap <LDAP_HOST> should be provided")
data.values.dex.config.ldap.insecureSkipVerify in (True, False)
if data.values.dex.config.ldap.userSearch :
data.values.dex.config.ldap.userSearch.baseDN or assert.fail("Dex ldap userSearch enabled. baseDN should be provided")
end
if data.values.dex.config.ldap.groupSearch :
data.values.dex.config.ldap.groupSearch.baseDN or assert.fail("Dex ldap groupSearch enabled. baseDN should be provided")
end
end
```
becomes
```yaml=
#@ def assert_connector_configured(config):
#@ if config.connector == "oidc":
#@ assert.not_null(config.oidc)
#@ elif config.connector == "ldap":
#@ assert.not_null(config.ldap)
#@ end
#@ end
#@ def min_len(value):
#@
#@ end
#@data/values-schema
---
dex:
#@schema/validate lambda config: assert_connector_configured(config)
config:
#@schema/validate enum=["oidc", "ldap"]
connector: ""
#@schema/validate regex=`\d\d\d-\d\d\d-\d\d\d\d`
phone_number: ""
#@schema/validate min=0
dependent: -1
#@schema/nullable
#@schema/validation not_null=True
randomSeed: 0
#@schema/nullable
oidc:
CLIENT_ID: ""
CLIENT_SECRET: ""
issuer: ""
#@schema/validate lambda l,r: assert.equals(l,r, "Dex oidc clientID must be {}. Do not change it", r)
clientID: "$OIDC_CLIENT_ID"
#@schema/validate lambda l,r: assert.equals(l,r, "Dex oidc clientSecret must be {}. Do not change it", r)
clientSecret: "$OIDC_CLIENT_SECRET"
#@schema/nullable
ldap:
host: ""
insecureSkipVerify: false
#@schema/nullable
userSearch:
baseDN: ""
#@schema/nullable
groupSearch:
baseDN: ""
```
### Required Minimum Elements in Array
```python=
def validate_static_client() :
if data.values.dex.config.staticClients and len(data.values.dex.config.staticClients) > 0:
for client in data.values.dex.config.staticClients :
getattr(client, "id") or assert.fail("Dex staticClients should have id")
getattr(client, "redirectURIs") or assert.fail("Dex staticClients should have redirectURIs")
getattr(client, "name") or assert.fail("Dex staticClients should have name")
getattr(client, "secret") or assert.fail("Dex staticClients should have secret")
end
end
```
becomes
```yaml=
dex:
config:
staticClients:
id: ""
#@schema/validation min_len=1
redirectURIs:
- ""
name: ""
secret: ""
```
## Considerations
- [Order of Validations / Reported Violations](#Order-of-Validations-/-Reported-Violations)
- [Merging of Schema Nodes](#Merging-of-Schema-Nodes)
### Order of Validations / Reported Violations
Consider this narrative:
> I want to be able to affect the order in which violations are reported \
> So that the user's attention goes to the violations that have the most impact on how the invocation runs.
This "requirement" is implied through expressions like this:
```python=
def validate_gangway():
validate_funcs = [validate_infrastructure_provider,
validate_gangway_namespace,
validate_gangway_config,
validate_gangway_image,
validate_gangway_certificate,
validate_gangway_deployment,
validate_gangway_service,
validate_gangway_secret,
validate_dex_cert]
for validate_func in validate_funcs:
validate_func()
end
end
```
## Complete Example: Dex
```yaml=
#@data/values
---
dex:
app: dex
namespace: tanzu-system-auth
organization: vmware
commonname: tkg-dex
config:
frontend:
theme: tkg
web:
https: 0.0.0.0:5556
tlsCert: /etc/dex/tls/tls.crt
tlsKey: /etc/dex/tls/tls.key
expiry:
signingKeys: 10m
idTokens: 5m
logger:
level: debug
format: json
staticClients: []
connector: null #! dex config should be either oidc or ldap
issuerPort: "30167" #! requred only for MGMT_CLUSTER_VIP if provider is vsphere. Default is "30167"
oidc:
CLIENT_ID: null #! required if oidc enabled
CLIENT_SECRET: null #! required if oidc enabled
issuer: null #! <OIDC_IDP_URL> is required if oidc enabled
clientID: $OIDC_CLIENT_ID #! do not change this
clientSecret: $OIDC_CLIENT_SECRET #! do not change this
basicAuthUnsupported: null
hostedDomains: []
scopes: []
insecureEnableGroups: null
insecureSkipEmailVerified: true
getUserInfo: null
userIDKey: null
userNameKey: null
ldap:
host: null #! <LDAP_HOST> is required if ldap enabed
insecureNoSSL: null
startTLS: null
rootCA: null
rootCAData: null
bindDN: null
bindPW: null
usernamePrompt: LDAP Username
insecureSkipVerify: False
userSearch:
baseDN: null #! required if ldap userSearch enabled
filter: "(objectClass=posixAccount)"
username: uid
idAttr: uid
emailAttr: mail
nameAttr: givenName
groupSearch:
baseDN: null #! required if ldap groupSearch enabled
filter: "(objectClass=posixGroup)"
userAttr: uid
groupAttr: memberUid
nameAttr: cn
oauth2:
skipApprovalScreen: true
responseTypes:
- "code"
- "token"
- "id_token"
storage:
type: kubernetes
config:
inCluster: true
enablePasswordDB: false
service:
name: dexsvc
type: null
deployment:
replicas: 1
certificate:
duration: 2160h
renewBefore: 360h
image:
name: dex
tag: v2.22.0_vmware.2
repository: registry.tkg.vmware.run
pullPolicy: IfNotPresent
dns:
aws:
dnsNames:
- tkg-dex.com
DEX_SVC_LB_HOSTNAME: example.com #! <DEX_SVC_LB_HOSTNAME> is required for AWS
vsphere:
dnsNames:
- tkg-dex
ipAddresses: [] #! at least one MGMT_CLUSTER_VIP is required for vsphere provider
azure:
dnsNames:
- tkg-dex.com
DEX_SVC_LB_HOSTNAME: dex.example.com #! <DEX_SVC_LB_HOSTNAME> is required for azure
```
with
```python=
load("@ytt:data", "data")
load("globals.star", "globals")
load("@ytt:assert", "assert")
load("/globals.star", "validate_infrastructure_provider")
SERVICE_TYPE_NODEPORT = "NodePort"
SERVICE_TYPE_LOADBALANCER = "LoadBalancer"
def validate_dex():
validate_funcs = [validate_infrastructure_provider,
validate_dex_namespace,
validate_dex_config,
validate_dex_image,
validate_dex_certificate,
validate_dex_deployment,
validate_dex_service,
validate_static_client]
for validate_func in validate_funcs:
validate_func()
end
end
def validate_dex_namespace():
data.values.dex.namespace or assert.fail("Dex namespace should be provided")
end
def validate_dex_config():
globals.infrastructure_provider in ("aws", "vsphere", "azure") or assert.fail("Dex supports provider aws, vsphere or azure")
if globals.infrastructure_provider == "vsphere":
data.values.dns.vsphere.ipAddresses or assert.fail("Dex MGMT_CLUSTER_IP should be provided for vsphere provider")
data.values.dex.config.issuerPort or assert.fail("Dex config issuerPort should be provided for vsphere provider")
end
if globals.infrastructure_provider == "aws":
data.values.dns.aws.DEX_SVC_LB_HOSTNAME or assert.fail("Dex oidc issuer DEX_SVC_LB_HOSTNAME should be provided for aws provider")
end
if globals.infrastructure_provider == "azure":
data.values.dns.azure.DEX_SVC_LB_HOSTNAME or assert.fail("Dex DEX_SVC_LB_HOSTNAME should be provided for azure provider")
end
data.values.dex.config.connector in ("oidc", "ldap") or assert.fail("Dex connector should be oidc or ldap")
if data.values.dex.config.connector == "oidc":
validate_oidc_config()
end
if data.values.dex.config.connector == "ldap":
validate_ldap_config()
end
data.values.dex.config.oauth2 or assert.fail("Dex oauth2 should be provided")
data.values.dex.config.storage or assert.fail("Dex storage should be provided")
end
def validate_oidc_config():
data.values.dex.config.oidc.CLIENT_ID or assert.fail("Dex oidc CLIENT_ID should be provided")
data.values.dex.config.oidc.CLIENT_SECRET or assert.fail("Dex oidc CLIENT_SECRET should be provided")
data.values.dex.config.oidc.issuer or assert.fail("Dex oidc issuer should be provided")
data.values.dex.config.oidc.clientID == "$OIDC_CLIENT_ID" or assert.fail("Dex oidc clientID should be $OIDC_CLIENT_ID. Do not change it")
data.values.dex.config.oidc.clientSecret == "$OIDC_CLIENT_SECRET" or assert.fail("Dex oidc clientSecret should be $OIDC_CLIENT_SECRET. Do not change it")
end
def validate_ldap_config():
data.values.dex.config.ldap.host or assert.fail("Dex ldap <LDAP_HOST> should be provided")
data.values.dex.config.ldap.insecureSkipVerify in (True, False)
if data.values.dex.config.ldap.userSearch :
data.values.dex.config.ldap.userSearch.baseDN or assert.fail("Dex ldap userSearch enabled. baseDN should be provided")
end
if data.values.dex.config.ldap.groupSearch :
data.values.dex.config.ldap.groupSearch.baseDN or assert.fail("Dex ldap groupSearch enabled. baseDN should be provided")
end
end
def validate_dex_image():
data.values.dex.image.name or assert.fail("Dex image name should be provided")
data.values.dex.image.tag or assert.fail("Dex image tag should be provided")
data.values.dex.image.repository or assert.fail("Dex image repository should be provided")
data.values.dex.image.pullPolicy or assert.fail("Dex image pullPolicy should be provided")
end
def validate_dex_certificate():
data.values.dex.certificate.duration or assert.fail("Dex certificate duration should be provided")
data.values.dex.certificate.renewBefore or assert.fail("Dex certificate renewBefore should be provided")
end
def validate_dex_deployment():
data.values.dex.deployment.replicas or assert.fail("Dex deployment replicas should be provided")
end
def validate_dex_service():
if data.values.dex.service.type:
data.values.dex.service.type in ("LoadBalancer", "NodePort") or assert.fail("Dex service type should be LoadBalancer or NodePort")
end
if globals.infrastructure_provider == "aws":
data.values.dns.aws.DEX_SVC_LB_HOSTNAME or assert.fail("Dex aws dnsname DEX_SVC_LB_HOSTNAME should be provided")
end
if globals.infrastructure_provider == "vsphere":
data.values.dns.vsphere.ipAddresses[0] or assert.fail("Dex vsphere dns at least one ipaddress should be provided")
end
if globals.infrastructure_provider == "azure":
data.values.dns.azure.DEX_SVC_LB_HOSTNAME or assert.fail("Dex azure dnsname DEX_SVC_LB_HOSTNAME should be provided")
end
end
def get_service_type():
if globals.infrastructure_provider == "vsphere":
return SERVICE_TYPE_NODEPORT
else:
return SERVICE_TYPE_LOADBALANCER
end
end
def get_dex_service_type():
if hasattr(data.values.dex, "service") and hasattr(data.values.dex.service, "type") and data.values.dex.service.type != None:
return data.values.dex.service.type
else:
return get_service_type()
end
end
def is_service_type_LB():
return get_dex_service_type() == SERVICE_TYPE_LOADBALANCER
end
def is_service_NodePort():
return get_dex_service_type() == SERVICE_TYPE_NODEPORT
end
def get_dex_service_annotations():
if globals.infrastructure_provider == "aws":
return {"service.beta.kubernetes.io/aws-load-balancer-backend-protocol": "ssl"}
else:
return {}
end
end
def validate_static_client() :
if data.values.dex.config.staticClients and len(data.values.dex.config.staticClients) > 0:
for client in data.values.dex.config.staticClients :
getattr(client, "id") or assert.fail("Dex staticClients should have id")
getattr(client, "redirectURIs") or assert.fail("Dex staticClients should have redirectURIs")
getattr(client, "name") or assert.fail("Dex staticClients should have name")
getattr(client, "secret") or assert.fail("Dex staticClients should have secret")
end
end
end
#export
values = data.values
# validate dex
validate_dex()
```
```yaml=
#@ def assert_connector_configured(config):
#@ if config.connector == "oidc":
#@ assert.not_null(config.oidc)
#@ elif config.connector == "ldap":
#@ assert.not_null(config.ldap)
#@ end
#@ end
#@data/values-schema
---
dex:
app: dex
namespace: tanzu-system-auth
organization: vmware
commonname: tkg-dex
#@schema/validation lambda config: assert_connector_configured(config)
config:
frontend:
theme: tkg
web:
https: 0.0.0.0:5556
tlsCert: /etc/dex/tls/tls.crt
tlsKey: /etc/dex/tls/tls.key
expiry:
signingKeys: 10m
idTokens: 5m
logger:
level: debug
format: json
staticClients: []
#@schema/validation enum=["oidc", "ldap"]
connector: ""
issuerPort: "30167" #! requred only for MGMT_CLUSTER_VIP if provider is vsphere. Default is "30167"
#@schema/nullable
oidc:
CLIENT_ID: ""
CLIENT_SECRET: ""
issuer: ""
#@schema/validation lambda id: assert.equals(id, "$OIDC_CLIENT_ID", "Dex oidc clientId must be $OIDC_CLIENT_ID. Do not change it.")
clientID: "$OIDC_CLIENT_ID"
#@schema/validation lambda id: assert.equals(id, "$OIDC_CLIENT_SECRET", "Dex oidc clientSecret must be $OIDC_CLIENT_SECRET. Do not change it.")
clientSecret: "$OIDC_CLIENT_SECRET"
basicAuthUnsupported: true
hostedDomains: [""]
scopes: [""]
insecureEnableGroups: false
insecureSkipEmailVerified: true
getUserInfo: ""
userIDKey: ""
userNameKey: ""
#@schema/nullable
ldap:
host: ""
insecureNoSSL: false
startTLS: true
rootCA: ""
rootCAData: ""
bindDN: ""
bindPW: ""
usernamePrompt: "LDAP Username"
insecureSkipVerify: false
#@schema/nullable
userSearch:
baseDN: ""
filter: "(objectClass=posixAccount)"
username: uid
idAttr: uid
emailAttr: mail
nameAttr: givenName
#@schema/nullable
groupSearch:
baseDN: ""
filter: "(objectClass=posixGroup)"
userAttr: uid
groupAttr: memberUid
nameAttr: cn
oauth2:
skipApprovalScreen: true
#@schema/default ["code", "token", "id_token"]
responseTypes: [""]
storage:
type: kubernetes
config:
inCluster: true
enablePasswordDB: false
service:
name: dexsvc
type: null
deployment:
replicas: 1
certificate:
duration: 2160h
renewBefore: 360h
image:
name: dex
tag: v2.22.0_vmware.2
repository: registry.tkg.vmware.run
pullPolicy: IfNotPresent
dns:
aws:
dnsNames:
- tkg-dex.com
DEX_SVC_LB_HOSTNAME: example.com #! <DEX_SVC_LB_HOSTNAME> is required for AWS
vsphere:
dnsNames:
- tkg-dex
ipAddresses: [] #! at least one MGMT_CLUSTER_VIP is required for vsphere provider
azure:
dnsNames:
- tkg-dex.com
DEX_SVC_LB_HOSTNAME: dex.example.com #! <DEX_SVC_LB_HOSTNAME> is required for azure
```
### Merging of Schema Nodes
Today, when merging YAML, overlays retain the meta (annotations, etc) from the "left"; they do not replace or merge any meta from the "right".
It's reasonable for a proficient configuration consumer to need to relax or replace a constraint, in the field.
How might this be done?
## Specs
### assert.not_null()
```yaml=
#@overlay/match by=overlay.one_of(["oidc", "ldap"])
#@overlay/assert assert.not_null
```
## Sources
- https://github.com/atozprasad/cna/blob/b902127b8ffddf8536fe6c4faabc1860b6710e2c/tkg-service/vSphere7u2/tkc/setup/tkg-extensions-v1.3.1/authentication/gangway/values.star
-
## Design Goals
- Authors should be able to articulate as many constraints as they like in their schema.
-
## Design Principles
- **Children nodes know not their parents' identities**. This avoids adhoc dependencies which are a notorious source of accidental complexity.
-
## Alternative Ideas
### Leveraging `@overlay/match` for assertion on children
Thought of something like...
```yaml=
#@overlay/match by=overlay.one_of(["oidc", "ldap"])
#@overlay/assert assert.not_null
```
(that in finished form would not awkwardly use such a mixture of annotations)
Rejected it largely because we want to be able to stack validations without limitation. That is, if we can keep to the design that `@schema/validate` contains _all_ validations, and that it is a variatic list of functions, then users can articulate any number of constraints.