Zephyrnet-logo

Als u functies kunt schrijven, kunt u Dask . gebruiken

Datum:

Als u functies kunt schrijven, kunt u Dask . gebruiken

Dit artikel is het tweede artikel van een doorlopende serie over het gebruik van Dask in de praktijk. Elk artikel in deze serie is eenvoudig genoeg voor beginners, maar biedt nuttige tips voor het echte werk. Het eerste artikel in de serie gaat over het gebruik van LocalCluster.


By Hugo Shi, oprichter van Saturn Cloud

Ik heb met veel datawetenschappers gepraat die van hebben gehoord Dashboard, het Python-framework voor gedistribueerd computergebruik, maar weet niet waar te beginnen. Ze weten dat Dask waarschijnlijk veel van hun workflows kan versnellen door ze parallel over een cluster van machines te laten lopen, maar de taak om een ​​geheel nieuwe methodologie te leren kan ontmoedigend lijken. Ik ben hier om je te vertellen dat je waarde uit Dask kunt halen zonder dat je het hele raamwerk hoeft te leren. Indien u besteedt u tijd aan wachten tot notebookcellen worden uitgevoerd, is de kans groot dat Dask u tijd kan besparen. Zelfs als je alleen weet hoe je Python-functies moet schrijven, kun je hiervan profiteren zonder iets anders te leren! Deze blogpost is een "hoe Dask te gebruiken zonder het hele ding te leren" tutorial.

Dask, dataframes, tassen, arrays, planners, werkers, grafieken, RAPIDS, oh nee!

 
 
Er zijn veel gecompliceerde inhoudsstukken over Dask, die overweldigend kunnen zijn. Dit komt omdat Dask een cluster van werkmachines kan gebruiken om veel coole dingen te doen! Maar vergeet dat allemaal voor nu. Dit artikel richt zich op eenvoudige technieken die u tijd kunnen besparen, zonder dat u veel hoeft te veranderen aan uw manier van werken.
 

Voor lussen en functies

 
 
Vrijwel elke datawetenschapper heeft zoiets als dit gedaan, waarbij je een set dataframes hebt opgeslagen in afzonderlijke bestanden en je een for-lus gebruikt om ze allemaal te lezen, wat logica te doen en ze vervolgens te combineren:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

Na verloop van tijd krijg je meer bestanden, of de genius_algorithm wordt ingewikkelder en duurt langer. En je blijft wachten. En wachten.


 

Stap 1 is om uw code in een functie in te kapselen. U wilt de dingen inkapselen die in de for-lus gaan. Dit maakt het gemakkelijker om te begrijpen wat de code doet (een bestand converteren naar iets nuttigs via magie). Wat nog belangrijker is, het maakt het gemakkelijker om die code op andere manieren te gebruiken dan voor lussen.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Stap 2 is om het te vergelijken met Dask. Nu, in plaats van een for-lus te gebruiken, waarbij elke iteratie na de vorige plaatsvindt, zal Dask ze parallel uitvoeren op een cluster. Dit zou ons veel sneller resultaten moeten opleveren, en is slechts drie regels langer dan de for-loop-code!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Hoe het werkt:

  • De uitgestelde decorateur transformeert uw functie. Als je het nu noemt, wordt het niet geëvalueerd. In plaats daarvan krijg je een delayed object, dat Dask later kan uitvoeren.
  • Client().compute stuurt al die vertraagde objecten naar het Dask-cluster, waar ze parallel worden geëvalueerd! Dat is alles, u wint!
  • Een . instantiëren Client automatisch voorzieningen a LocalCluster. Dit betekent dat de parallelle werkers van Dask allemaal processen zijn op dezelfde machine als degene die Dask aanroept. Dit levert een beknopt voorbeeld op. Voor echt werk raad ik aan om te creëren lokale clusters in de terminal.

Praktische onderwerpen

 
 
Het bovenstaande stopt waar de meeste Dask-tutorials stoppen. Ik heb deze aanpak gebruikt met mijn eigen werk, en met tal van klanten, en er doen zich altijd een paar praktische problemen voor. Deze volgende tips zullen je helpen om van dat schoolvoorbeeld hierboven naar meer bruikbare methoden in de praktijk te gaan door twee onderwerpen te behandelen die constant naar voren komen: grote objecten en foutafhandeling.
 

Grote objecten

 
Om functies op een gedistribueerd cluster te berekenen, moeten de objecten waarop de functies worden aangeroepen, naar de werknemers worden verzonden. Dit kan leiden tot prestatieproblemen, aangezien deze op uw computer geserialiseerd (gebeitst) moeten worden en via het netwerk moeten worden verzonden. Stel je voor dat je processen aan het doen bent op gigabytes aan gegevens - je wilt dat niet elke keer dat een functie erop wordt uitgevoerd, moeten overdragen. Als u per ongeluk grote objecten verstuurt, ziet u mogelijk een bericht van Dask als volgt:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Er zijn twee manieren om dit te voorkomen: je kunt kleinere objecten naar werknemers sturen zodat de last niet zo erg is, of je kunt proberen elk object slechts één keer naar een werknemer te sturen, zodat je niet steeds overboekingen hoeft te doen .
Oplossing 1: stuur kleine objecten indien mogelijk
Dit voorbeeld is goed, omdat we een bestandspad (kleine tekenreeks) verzenden in plaats van het dataframe.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Hieronder is wat niet Te doen. Zowel omdat je CSV-lezing (duur en traag) in de lus zou doen, wat niet parallel is, maar ook omdat we nu dataframes verzenden (die groot kunnen zijn).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Vaak kan code worden herschreven om te wijzigen waar gegevens worden beheerd, zowel bij de klant als bij de werknemers. Afhankelijk van uw situatie kan er een enorme tijdwinst worden behaald door na te denken over welke functies als invoer dienen en hoe gegevensoverdrachten kunnen worden geminimaliseerd.
Oplossing 2: verzend objecten slechts één keer
Als je een groot object moet verzenden, stuur het dan niet meerdere keren. Als ik bijvoorbeeld een groot modelobject moet verzenden om te kunnen berekenen, zal het eenvoudigweg toevoegen van de parameter het model meerdere keren serialiseren (eenmaal per bestand)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Ik kan Dask vertellen dat niet te doen, door het in een vertraagd object te wikkelen.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Afhandelingsfout

 
Naarmate uw rekentaken groeien, wilt u vaak door een storing heen kunnen. In dit geval bevat misschien 5% van mijn CSV's slechte gegevens die ik niet aankan. Ik wil graag 95% van de CSV's met succes verwerken, maar houd de fouten bij zodat ik mijn methoden kan aanpassen en het opnieuw kan proberen.

Deze lus doet dit.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Aangezien deze functie op het eerste gezicht vrij ingewikkeld is, laten we hem even opsplitsen.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Wij bellen compute on results, maar aangezien we niet passeren sync=True, krijgen we onmiddellijk futures terug, die de berekening vertegenwoordigen, die nog niet is voltooid. We maken ook een toewijzing van de toekomst zelf, naar het _n_th invoerargument dat het heeft gegenereerd. Ten slotte vullen we voorlopig een lijst met resultaten gevuld met Geen.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Vervolgens wachten we op resultaten en verwerken we ze zodra ze binnenkomen. Als we op futures wachten, worden ze opgedeeld in futures die done, en degenen die zijn not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Als de toekomst is finished, dan printen we dat het ons gelukt is, en slaan we het resultaat op.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

Anders slaan we de uitzondering op en drukken we de stacktracering af.

 queue = result.not_done

Ten slotte hebben we de wachtrij ingesteld op die futures die nog niet zijn voltooid.
 

Conclusie

 
 
Dask kan u zeker tijd besparen. Als u tijd besteedt aan het wachten tot de code wordt uitgevoerd, moet u deze eenvoudige tips gebruiken om uw werk te parallelliseren. Er zijn ook veel geavanceerde dingen die je met Dask kunt doen, maar dit is een goed startpunt.

 
Bio: Hugo Shi is oprichter van Saturn Cloud, de go-to-cloud-werkruimte om Python te schalen, samen te werken, taken te implementeren en meer.

ORIGINELE. Met toestemming opnieuw gepost.

Zie ook:


PlatoAi. Web3 opnieuw uitgevonden. Gegevensintelligentie versterkt.
Klik hier om toegang te krijgen.

Bron: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

spot_img

Laatste intelligentie

spot_img

Chat met ons

Hallo daar! Hoe kan ik u helpen?