pijplijn

class pyelt.pipeline.Pipe(source_system, pipeline, config={})

Een pipeline bevat 1 of meerdere pipes. Een pipe is een verbinding met 1 bronsysteem, maar bevat wel alle lagen in de datavault.

Bijvoorbeeld, we maken een pipe aan met de naam ‘timeff’, met als bronsysteem een oracle database en ‘sor_timeff’ als de naam van het sor schema:

timeff_config = {
    'source_connection': 'oracle://SID:pwd@server/db',
    'default_schema': 'MTDX',
    'sor_schema': 'sor_timeff'
}
pipe = pipeline.get_or_create_pipe('timeff', timeff_config)
sor = pipe.sor
print(sor.name)
>> sor_timeff
print(sor.source_db.default_schema.name)
>> MTDX

Note

Indien bij het aanmaken van de pipe het sor schema nog niet bestaat, dan wordt deze eerst aangemaakt.

__init__(source_system, pipeline, config={})

Stelt de begin status van pipe in.

Parameters:
  • source_system – de naam van huidige gebruikte source_system; bijvoorbeeld “timeff”
  • pipeline – de huidige pipeline volgens de gebruikte config
  • config – de dict met de configuratie voor deze pipe met daarin oa de bronsysteem config

Note

Maak een pipe altijd aan via de pipeline.get_or_create_pipe

create_db_functions()

DDL functie. Maakt nieuwe database functies aan.

create_dv_from_domain()

Voert ddl uit van de dv laag. Maakt eventuele nieuwe tabellen aan (hubs, sats en links) gebaseerd op de gedefiniëerde domeinen.

create_sor_from_mappings()

Voert ddl uit van de sor-laag. Maakt eventuele nieuwe sor-tabellen aan aan de hand van de gedefiniëerde mappings zoals bijvoorbeeld in “timeff_mappings_old.py”.

register_db_function(func: pyelt.datalayers.database.DbFunction) → None

Registreert de database functie.

Parameters:func – de functie die geregistreert wordt
register_db_functions(module, schema: pyelt.datalayers.database.Schema = None) → None

Registreert de module met database functies. Dat is een module met een of meerder classes die over erven van DbFunction.

Tijdens de ddl worden de functies aangemaakt op de database.

Parameters:
  • module – de module met database functies. ; bijvoorbeeld: “timeff_functies”
  • schema – Het schema in Postgresql waarop de hier geregistreerde functies van toepassing zijn; bijvoorbeeld: ‘dv’ of ‘sor_timeff’
register_domain(module)

Registreert de module met het domein.

Parameters:module – de module met daarin de domein classes; bijvoorbeeld: “domain_huisartsen”

Je kunt meerdere modules registreren door de functie vaker aan te roepen:

pipe.register_domain(domain_huisartsen)
pipe.register_domain(domain_ziekenhuizen)

Tijdens de dll zullen alle tabellen uit beide domeinen aangemaakt worden.

register_extra_sql(extra_sql: typing.List[str]) → None

Registreert additionele uit te voeren (eenmalige) sql code die op de database wordt uitgevoerd

Parameters:extra_sql – een lijst met strings van geldige sql code
run(parts=['sor', 'refs', 'hubs', 'links', 'views', 'viewlinks'])
Parameters:parts – lijst van keywords. De eventuele aanwezigheid van een keyword in deze lijst bepaald of het log bestand dat hoort bij dit keyword gemaakt wordt en of de bijhorende DDL uitgevoerd wordt.
run_extra_sql()

Runt de aanwezige extra sql code.

runid
Returns:self.pipeline.runid
validate()

Voert achtereenvolgens uit: :class:’validate_domains’ en validate_mappings. Geneereert de validatie-boodschap.

Returns:validation_msg. leeg indien validatie goed is
validate_domains()

Valideert of er een domein is gedefiniëerd en via een aantal andere functies of de eventueel gebruikte entiteiten, hybridsats of links wel geldig zijn.

zie: Pipeline.validate_domains

Returns:validation_msg
validate_mappings_after_ddl()

Valideert of de gebruikte mappings correcte domein classes gebruikt (entities, hubs, sats en links).

zie: Pipeline.validate_mappings

Returns:validation_msg
validate_mappings_before_ddl()

Valideert of de gebruikte mappings correcte domein classes gebruikt (entities, hubs, sats en links).

zie: Pipeline.validate_mappings

Returns:validation_msg
class pyelt.pipeline.Pipeline

De pipeline omvat alle lagen en alle bronnen. De pipeline bevat de databaseverbinding naar de datavault-datawarehouse, die bestaat uit de verschillende lagen:

  • sor(s)
  • rdv
  • dv
  • datamart(s)

Voorbeeld:

pipeline = Pipeline()
datawarehouse = pipeline.dwh
rdv = datawarehouse.rdv
dv = datawarehouse.dv

Note

Een pipeline is een singleton implementatie; er is altijd maar 1 pipeline object voor alle processen. Elke nieuwe pipeline, die wordt aangemaakt, zal dezelfde waardes bevatten.

static __new__(*args, **kwargs)

returns cls._instance

Definieert wat de instances van de parameters pyelt_config, dwh, runid, pipes, logger en sql_logger zijn. :param config: de globale pyelt configuratie (dict) met daarin oa. dwh connectie string

create_new_runid() → float

Maakt een nieuw runid aan. De runid wordt met 0,01 verhoogd voor iedere volgende run op dezelfde dag. De eerste run op een dag krijgt het eerste gehele getal volgend op de maximale aanwezige runid in de tabel.

Runid wordt bewaard in de database in het sys-schema

Returns:self.runid
end_run()

Beëindigt de run en maakt een update van de database tabel sys.runs met een finish_date en eventuele exceptions.

get_or_create_pipe(source_system, config={}) → pyelt.pipeline.Pipe

Maakt een nieuwe pipe aan, indien deze nog niet bestaat, of retourneert een reeds bestaande pipe op basis van unieke naam van bron systeem

Parameters:
  • source_system – naam van het bronsysteem; bv ‘timeff’
  • config – dict met de gebruikte configuratie
Returns:

Pipe

run(parts=['sor', 'refs', 'hubs', 'links', 'views', 'viewlinks']) → None

Deze functie start de run van de pipeline. Er wordt per pipe ddl uitgevoerd, een logbestand aangemaakt, een sql_logbestand en de etl wordt uitgevoerd..

Parameters:parts – lijst van te runnen onderdelen, indien leeg gelaten worden alle onderdelen gerund.

Voorbeeld:

pipeline.run(['sor', 'refs'])

Nu wordt alleen de sor laag gerunt en de referentie tabel in de dv gevuld.

validate_domains() → bool

Valideert of de geregistreerde domeinen van alle pipes geldig zijn.

Wordt uitgevoerd voorafgaande aan de ddl.

Returns:Boolean
validate_mappings_after_ddl() → bool

Valideert of de mappings van alle pipes geldig zijn.

Wordt uitgevoerd voorafgaande aan de etl, maar nadat de ddl heeft gerund.

Returns:Boolean
validate_mappings_before_ddl() → bool

Valideert of de mappings van alle pipes geldig zijn.

Wordt uitgevoerd voorafgaande aan de ddl

Returns:Boolean