pijplijn¶
-
class
pyelt.pipeline.Pipe(source_system, pipeline, config={})¶ Een pipeline bevat 1 of meerdere pipes. Een pipe is een verbinding met 1 bronsysteem, maar bevat wel alle lagen in de datavault.
Bijvoorbeeld, we maken een pipe aan met de naam ‘timeff’, met als bronsysteem een oracle database en ‘sor_timeff’ als de naam van het sor schema:
timeff_config = { 'source_connection': 'oracle://SID:pwd@server/db', 'default_schema': 'MTDX', 'sor_schema': 'sor_timeff' } pipe = pipeline.get_or_create_pipe('timeff', timeff_config) sor = pipe.sor print(sor.name) >> sor_timeff print(sor.source_db.default_schema.name) >> MTDX
Note
Indien bij het aanmaken van de pipe het sor schema nog niet bestaat, dan wordt deze eerst aangemaakt.
-
__init__(source_system, pipeline, config={})¶ Stelt de begin status van pipe in.
Parameters: - source_system – de naam van huidige gebruikte source_system; bijvoorbeeld “timeff”
- pipeline – de huidige pipeline volgens de gebruikte config
- config – de dict met de configuratie voor deze pipe met daarin oa de bronsysteem config
Note
Maak een pipe altijd aan via de pipeline.get_or_create_pipe
-
create_db_functions()¶ DDL functie. Maakt nieuwe database functies aan.
-
create_dv_from_domain()¶ Voert ddl uit van de dv laag. Maakt eventuele nieuwe tabellen aan (hubs, sats en links) gebaseerd op de gedefiniëerde domeinen.
-
create_sor_from_mappings()¶ Voert ddl uit van de sor-laag. Maakt eventuele nieuwe sor-tabellen aan aan de hand van de gedefiniëerde mappings zoals bijvoorbeeld in “timeff_mappings_old.py”.
-
register_db_function(func: pyelt.datalayers.database.DbFunction) → None¶ Registreert de database functie.
Parameters: func – de functie die geregistreert wordt
-
register_db_functions(module, schema: pyelt.datalayers.database.Schema = None) → None¶ Registreert de module met database functies. Dat is een module met een of meerder classes die over erven van DbFunction.
Tijdens de ddl worden de functies aangemaakt op de database.
Parameters: - module – de module met database functies. ; bijvoorbeeld: “timeff_functies”
- schema – Het schema in Postgresql waarop de hier geregistreerde functies van toepassing zijn; bijvoorbeeld: ‘dv’ of ‘sor_timeff’
-
register_domain(module)¶ Registreert de module met het domein.
Parameters: module – de module met daarin de domein classes; bijvoorbeeld: “domain_huisartsen” Je kunt meerdere modules registreren door de functie vaker aan te roepen:
pipe.register_domain(domain_huisartsen) pipe.register_domain(domain_ziekenhuizen)
Tijdens de dll zullen alle tabellen uit beide domeinen aangemaakt worden.
-
register_extra_sql(extra_sql: typing.List[str]) → None¶ Registreert additionele uit te voeren (eenmalige) sql code die op de database wordt uitgevoerd
Parameters: extra_sql – een lijst met strings van geldige sql code
-
run(parts=['sor', 'refs', 'hubs', 'links', 'views', 'viewlinks'])¶ Parameters: parts – lijst van keywords. De eventuele aanwezigheid van een keyword in deze lijst bepaald of het log bestand dat hoort bij dit keyword gemaakt wordt en of de bijhorende DDL uitgevoerd wordt.
-
run_extra_sql()¶ Runt de aanwezige extra sql code.
-
runid¶ Returns: self.pipeline.runid
-
validate()¶ Voert achtereenvolgens uit: :class:’validate_domains’ en validate_mappings. Geneereert de validatie-boodschap.
Returns: validation_msg. leeg indien validatie goed is
-
validate_domains()¶ Valideert of er een domein is gedefiniëerd en via een aantal andere functies of de eventueel gebruikte entiteiten, hybridsats of links wel geldig zijn.
zie:
Pipeline.validate_domainsReturns: validation_msg
-
validate_mappings_after_ddl()¶ Valideert of de gebruikte mappings correcte domein classes gebruikt (entities, hubs, sats en links).
zie:
Pipeline.validate_mappingsReturns: validation_msg
-
validate_mappings_before_ddl()¶ Valideert of de gebruikte mappings correcte domein classes gebruikt (entities, hubs, sats en links).
zie:
Pipeline.validate_mappingsReturns: validation_msg
-
-
class
pyelt.pipeline.Pipeline¶ De pipeline omvat alle lagen en alle bronnen. De pipeline bevat de databaseverbinding naar de datavault-datawarehouse, die bestaat uit de verschillende lagen:
- sor(s)
- rdv
- dv
- datamart(s)
Voorbeeld:
pipeline = Pipeline() datawarehouse = pipeline.dwh rdv = datawarehouse.rdv dv = datawarehouse.dv
Note
Een pipeline is een singleton implementatie; er is altijd maar 1 pipeline object voor alle processen. Elke nieuwe pipeline, die wordt aangemaakt, zal dezelfde waardes bevatten.
-
static
__new__(*args, **kwargs)¶ returns cls._instance
Definieert wat de instances van de parameters pyelt_config, dwh, runid, pipes, logger en sql_logger zijn. :param config: de globale pyelt configuratie (dict) met daarin oa. dwh connectie string
-
create_new_runid() → float¶ Maakt een nieuw runid aan. De runid wordt met 0,01 verhoogd voor iedere volgende run op dezelfde dag. De eerste run op een dag krijgt het eerste gehele getal volgend op de maximale aanwezige runid in de tabel.
Runid wordt bewaard in de database in het sys-schema
Returns: self.runid
-
end_run()¶ Beëindigt de run en maakt een update van de database tabel sys.runs met een finish_date en eventuele exceptions.
-
get_or_create_pipe(source_system, config={}) → pyelt.pipeline.Pipe¶ Maakt een nieuwe pipe aan, indien deze nog niet bestaat, of retourneert een reeds bestaande pipe op basis van unieke naam van bron systeem
Parameters: - source_system – naam van het bronsysteem; bv ‘timeff’
- config – dict met de gebruikte configuratie
Returns: Pipe
-
run(parts=['sor', 'refs', 'hubs', 'links', 'views', 'viewlinks']) → None¶ Deze functie start de run van de pipeline. Er wordt per pipe ddl uitgevoerd, een logbestand aangemaakt, een sql_logbestand en de etl wordt uitgevoerd..
Parameters: parts – lijst van te runnen onderdelen, indien leeg gelaten worden alle onderdelen gerund. Voorbeeld:
pipeline.run(['sor', 'refs'])
Nu wordt alleen de sor laag gerunt en de referentie tabel in de dv gevuld.
-
validate_domains() → bool¶ Valideert of de geregistreerde domeinen van alle pipes geldig zijn.
Wordt uitgevoerd voorafgaande aan de ddl.
Returns: Boolean
-
validate_mappings_after_ddl() → bool¶ Valideert of de mappings van alle pipes geldig zijn.
Wordt uitgevoerd voorafgaande aan de etl, maar nadat de ddl heeft gerund.
Returns: Boolean
-
validate_mappings_before_ddl() → bool¶ Valideert of de mappings van alle pipes geldig zijn.
Wordt uitgevoerd voorafgaande aan de ddl
Returns: Boolean