Zephyrnet-logo

Meerdere HPO-taken parallel uitvoeren op Amazon SageMaker

Datum:

De mogelijkheid om machine learning (ML) -modellen snel te herhalen en te trainen is de sleutel tot het verkrijgen van bedrijfswaarde uit ML-workloads. Omdat ML-modellen vaak veel instelbare parameters hebben (bekend als hyperparameters) die van invloed kunnen zijn op het vermogen van het model om effectief te leren, gebruiken datawetenschappers vaak een techniek die bekend staat als hyperparameteroptimalisatie (HPO) om het best presterende model te bereiken tegen een bepaalde vooraf gedefinieerde metriek. Afhankelijk van het aantal hyperparameters en de grootte van de zoekruimte, kan het vinden van het beste model duizenden of zelfs tienduizenden trainingsruns vereisen. Real-world problemen die vaak uitgebreide HPO vereisen, zijn onder meer beeldsegmentatie voor het modelleren van voertuigverkeer voor autonoom rijden, het ontwikkelen van algoritmische handelsstrategieën op basis van historische financiële gegevens of het bouwen van fraudedetectiemodellen op basis van transactiegegevens. Amazon Sage Maker biedt een ingebouwde HPO-algoritme dat verwijdert het ongedifferentieerde zware werk dat nodig is om uw eigen HPO-algoritme te bouwen. Dit bericht laat zien hoe u uw HPO-taken kunt batchen om het aantal taken dat u parallel kunt uitvoeren te maximaliseren, waardoor de totale tijd die nodig is om de gewenste parameterruimte effectief te dekken en de best presterende modellen te verkrijgen, wordt verminderd.

Laten we, voordat we ingaan op de batchaanpak op Amazon SageMaker, kort de state-of-the-art bekijken [1]. Er is een groot aantal HPO-algoritmen, variërend van random of grid search, Bayesian search en handmatige afstemming, waarbij onderzoekers hun domeinkennis gebruiken om parameters af te stemmen tot populatiegebaseerde training die is geïnspireerd op genetische algoritmen. Voor deep learning-modellen kan zelfs het trainen van een enkele trainingsrun tijdrovend zijn. In dat geval wordt het belangrijk om een ​​agressieve strategie voor vroegtijdig stoppen te hebben, waardoor proeven worden beëindigd in zoekruimten die waarschijnlijk geen goede resultaten zullen opleveren. Verschillende strategieën, zoals opeenvolgende halvering of asynchrone opeenvolgende halvering, gebruiken meerarmige bandieten om een ​​afweging te maken tussen verkenning (het uitproberen van verschillende parametercombinaties) en exploitatie (waardoor een trainingsrun convergeert). Tot slot, om ontwikkelaars te helpen deze benaderingen snel te herhalen, zijn er een aantal tools, zoals SageMaker HPO, Ray, HyperOpt en meer. In dit bericht zie je ook hoe je een van de meest populaire HPO-tools, Ray Tune, naar SageMaker kunt brengen.

Gebruiksvoorbeeld: het voorspellen van wanbetalingen op kredietkaarten

Om dit aan de hand van een concreet voorbeeld aan te tonen, stelt u zich voor dat u een ML-ingenieur bent die voor een bank werkt en dat u de waarschijnlijkheid wilt voorspellen dat een klant in gebreke blijft met zijn creditcardbetalingen. Om een ​​model te trainen, gebruikt u historische gegevens die beschikbaar zijn in de UCI-opslagplaats. Alle code die in dit bericht is ontwikkeld, is beschikbaar op GitHub. De notebook behandelt de gegevensvoorverwerking die nodig is om de onbewerkte gegevens voor te bereiden op training. Omdat het aantal defaults vrij klein is (zoals weergegeven in de volgende grafiek), splitsen we de dataset op in train en test, en upsamplen we de trainingsdata naar 50/50 default versus niet-defaultleningen.

Vervolgens uploaden we de datasets naar Amazon eenvoudige opslagservice (Amazon S3). Zie de volgende code:

#Upload Training and test data into S3
train_s3 = sagemaker_session.upload_data(path='./train_full_split/', key_prefix=prefix + '/train')
print(train_s3)
test_s3 = sagemaker_session.upload_data(path='./test_full_split/', key_prefix=prefix + '/test')
print(test_s3)

Hoewel SageMaker veel ingebouwde algoritmen biedt, zoals XGBoost, laten we in dit bericht zien hoe HPO kan worden toegepast op een aangepast PyTorch-model met behulp van de SageMaker PyTorch-trainingscontainer met behulp van de scriptmodus. U kunt dit vervolgens aanpassen aan uw eigen aangepaste deep learning-code. Bovendien zullen we demonstreren hoe u aangepaste metrics naar SageMaker HPO kunt brengen.

Wanneer u te maken heeft met tabelgegevens, is het handig om uw gegevensset in kleinere bestanden te splitsen om lange laadtijden van gegevens te voorkomen, waardoor uw rekenbronnen kunnen uithongeren en dit kan leiden tot inefficiënt CPU / GPU-gebruik. We maken een aangepaste Dataset-klasse om onze gegevens op te halen en verpakken deze in de DataLoader-klasse om de dataset te herhalen. We zetten de batchgrootte op 1, omdat elke batch uit 10,000 rijen bestaat, en deze laden met Panda's.

Ons model is een eenvoudig feed-forward neuraal netwerk, zoals weergegeven in het volgende codefragment:

class Net(nn.Module): def __init__(self, inp_dimension): super().__init__() self.fc1 = nn.Linear(inp_dimension, 500) self.drop = nn.Dropout(0.3) self.bn1 = nn.BatchNorm1d(500) self.bn2=nn.BatchNorm1d(250) self.fc2 = nn.Linear(500, 250) self.fc3 = nn.Linear(250, 100) self.bn3=nn.BatchNorm1d(100) self.fc4 = nn.Linear(100,2) def forward(self, x): x = x.squeeze() x = F.relu(self.fc1(x.float())) x = self.drop(x) x = self.bn1(x) x = F.relu(self.fc2(x)) x = self.drop(x) x = self.bn2(x) x = F.relu(self.fc3(x)) x = self.drop(x) x = self.bn3(x) x = self.fc4(x) # last layer converts it to binary classification probability return F.log_softmax(x, dim=1)

Zoals te zien is in de bovenstaande afbeelding, is de dataset zeer onevenwichtig en als zodanig is modelnauwkeurigheid niet de meest bruikbare evaluatiestatistiek, omdat een basismodel dat voorspelt dat alle klanten hun betalingen niet in gebreke zullen stellen, een hoge nauwkeurigheid zal hebben. Een meer bruikbare statistiek is de AUC, het gebied onder de ROC-curve (Receiver Operator Characteristic) dat tot doel heeft het aantal valse positieven te minimaliseren en het aantal echte positieven te maximaliseren. Een vals positief (model dat onjuist voorspelt dat een goede klant in gebreke blijft) kan ertoe leiden dat de bank inkomsten misloopt door creditcards aan klanten te weigeren. Om ervoor te zorgen dat uw HPO-algoritme kan optimaliseren op een aangepaste metriek zoals de AUC of F1-score, moet u die metrische gegevens in STDOUT loggen, zoals weergegeven in de volgende code:

def test(model, test_loader, device, loss_optim): model.eval() test_loss = 0 correct = 0 fulloutputs = [] fulltargets = [] fullpreds = [] with torch.no_grad(): for i, (data, target) in enumerate(test_loader): data, target = data.to(device), target.to(device) output = model(data) target = target.squeeze() test_loss += loss_optim(output, target).item() # sum up batch loss pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() fulloutputs.append(output.cpu().numpy()[:, 1]) fulltargets.append(target.cpu().numpy()) fullpreds.append(pred.squeeze().cpu()) i+=1 test_loss /= i logger.info("Test set Average loss: {:.4f}, Test Accuracy: {:.0f}%;n".format( test_loss, 100. * correct / (len(target)*i) )) fulloutputs = [item for sublist in fulloutputs for item in sublist] fulltargets = [item for sublist in fulltargets for item in sublist] fullpreds = [item for sublist in fullpreds for item in sublist] logger.info('Test set F1-score: {:.4f}, Test set AUC: {:.4f} n'.format( f1_score(fulltargets, fullpreds), roc_auc_score(fulltargets, fulloutputs)))

Nu zijn we klaar om onze SageMaker-schatter te definiëren en de parameters voor de HPO-taak te definiëren:

estimator = PyTorch(entry_point="train_cpu.py", role=role, framework_version='1.6.0', py_version='py36', source_dir='./code', output_path = f's3://{bucket}/{prefix}/output', instance_count=1, sagemaker_session=sagemaker_session, instance_type='ml.m5.xlarge', hyperparameters={ 'epochs': 10, # run more epochs for HPO. 'backend': 'gloo' #gloo for cpu, nccl for gpu } ) #specify the hyper-parameter ranges hyperparameter_ranges = {'lr': ContinuousParameter(0.001, 0.1), 'momentum': CategoricalParameter(list(np.arange(0, 10)/10))} inputs ={'training': train_s3, 'testing':test_s3} #specify the custom HPO metric
objective_metric_name = 'test AUC'
objective_type = 'Maximize'
metric_definitions = [{'Name': 'test AUC', 'Regex': 'Test set AUC: ([0-9\.]+)'}] #note that the regex must match your test function above estimator.fit({'training': train_s3, 'testing':test_s3}, wait=False)

We passeren de paden naar de training- en testgegevens in Amazon S3.

Nu de installatie is voltooid, gaan we nu over op het uitvoeren van meerdere HPO-taken.

Parallelle HPO-banen

Om meerdere afstemmingsopdrachten voor hyperparameters parallel uit te voeren, moeten we eerst de afstemmingsstrategie bepalen. SageMaker biedt momenteel een willekeurige en Bayesiaanse optimalisatiestrategie. Voor een willekeurige strategie zijn verschillende HPO-banen volledig onafhankelijk van elkaar, terwijl Bayesiaanse optimalisatie het HPO-probleem behandelt als een regressieprobleem en intelligente inschattingen maakt over de volgende set parameters die moeten worden gekozen op basis van de eerdere reeks proeven.

Laten we eerst wat terminologie bekijken:

  • Trials - Een proef komt overeen met een enkele trainingstaak met een reeks vaste waarden voor de hyperparameters
  • max_banen - Het totale aantal trainingsproeven dat voor die bepaalde HPO-baan moet worden uitgevoerd
  • max_parallel_jobs - Het maximum aantal gelijktijdige lopende proeven per HPO-taak

Stel dat u in totaal 10,000 tests wilt uitvoeren. Om de totale HPO-tijd te minimaliseren, wilt u zoveel mogelijk tests parallel uitvoeren. Dit wordt beperkt door de beschikbaarheid van een bepaald Amazon Elastic Compute-cloud (Amazon EC2) -instantie type in uw regio en account. Neem contact op met uw AWS-accountvertegenwoordigers als u deze limieten wilt wijzigen of verhogen.

Stel voor dit voorbeeld dat u 20 ml.m5.xlarge-exemplaren beschikbaar heeft. Dit betekent dat u tegelijkertijd 20 proefversies van elk één exemplaar kunt uitvoeren. Momenteel, zonder enige limiet te verhogen, limieten van SageMaker max_jobs naar 500 en max_parallel_jobs tot 10. Dit betekent dat u in totaal 10,000 / 500 = 20 HPO-taken moet uitvoeren. Omdat u 20 proefversies kunt uitvoeren en max_parallel_jobs 10 is, kunt u het aantal gelijktijdige HPO-taken maximaliseren door 20/10 = 2 HPO-taken parallel uit te voeren. Dus een benadering om uw code in batches te plaatsen, is om altijd twee taken uit te voeren, totdat u uw totale vereiste taken van 20 bereikt.

In het volgende codefragment laten we twee manieren zien waarop u het aantal actieve taken kunt opvragen om dit te bereiken. De eerste benadering maakt gebruik van boto3, wat de AWS SDK is voor Python om actieve HPO-taken te ondervragen, en kan in uw notebook worden uitgevoerd en wordt geïllustreerd in het volgende diagram. Deze benadering kan voornamelijk worden gebruikt door datawetenschappers. Telkens wanneer het aantal lopende HPO-taken onder een vast aantal komt, aangegeven door de blauwe pijlen in het gestippelde vak aan de linkerkant, zal de polling-code nieuwe taken starten (weergegeven in oranje pijlen). De tweede benadering gebruikt Amazon Simple Queue-service (Amazon SQS) en AWS Lambda om SageMaker HPO-taken in de wachtrij te plaatsen en te ondervragen, zodat u een operationele pijplijn kunt bouwen voor herhaalbaarheid.

Klinkt ingewikkeld? Geen probleem, met het volgende codefragment kunt u de optimale strategie bepalen om uw totale HPO-tijd te minimaliseren door zoveel mogelijk HPO-taken parallel uit te voeren als is toegestaan. Nadat u het type exemplaar dat u wilt gebruiken en uw respectieve accountlimieten voor dat exemplaar hebt bepaald, vervangt u max_parallel_across_jobs met uw waarde.

def bayesian_batching_cold_start(total_requested_trials, max_parallel_across_jobs=20, max_parallel_per_job=10, max_candidates_per_job = 500): '''Given a total number of requested trials, generates the strategy for Bayesian HPO The strategy is a list (batch_strat) where every element is the number of jobs to run in parallel. The sum of all elements in the list is the total number of HPO jobs needed to reach total_requested_trials. For example if batch_strat = [2, 2, 2, 1], means you will run a total of 7 HPO jobs starting with 2 --> 2 ---> 2 ---> 1. total_requested_trials = number of trails user wants to run. max_parallel_across_jobs = max number of training jobs across all trials Sagemaker runs in parallel. Limited by instance availability max_parallel_per_job = max number of parallel jobs to run per HPO job max_candidates_per_job = total number of training jobs per HPO job''' batch_strat = [] tot_jobs_left = total_requested_trials max_parallel_hpo_jobs = max_parallel_across_jobs//max_parallel_per_job if total_requested_trials < max_parallel_hpo_jobs*max_candidates_per_job: batch_strat.append(total_requested_trials//max_candidates_per_job) else: while tot_jobs_left > max_parallel_hpo_jobs*max_candidates_per_job: batch_strat.append(max_parallel_hpo_jobs) tot_jobs_left -=max_parallel_hpo_jobs*max_candidates_per_job batch_strat.append(math.ceil((tot_jobs_left)/max_candidates_per_job)) return math.ceil(total_requested_trials/max_candidates_per_job), max_parallel_hpo_jobs, batch_strat bayesian_batching_cold_start(10000)
(20, 2, [2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

Overweeg de volgende code voor het starten van een bepaalde reeks taken nadat u hebt bepaald hoe u uw taken moet uitvoeren. De helperfunctie _parallel_hpo_no_polling voert de groep parallelle HPO-taken uit die in de voorgaande afbeelding wordt aangegeven door het gestippelde vak. Het is belangrijk om de wait parameter False bij het aanroepen van de tuner, omdat hierdoor de API-aanroep wordt vrijgegeven om de lus te laten draaien. De orkestratiecode poll_and_run peilt naar het aantal taken dat op een bepaald moment wordt uitgevoerd. Als het aantal taken onder het door de gebruiker gespecificeerde maximale aantal proefversies komt dat ze parallel willen uitvoeren (max_parallel_across_jobs), start de functie automatisch nieuwe opdrachten. Nu denk je misschien: "Maar het kan dagen duren voordat deze taken zijn uitgevoerd, wat als ik mijn laptop wil uitschakelen of als ik mijn sessie verlies?" Geen probleem, de code gaat verder waar hij was gebleven en voert het resterende aantal jobs uit door te tellen hoeveel HPO-jobs er nog over zijn, voorafgegaan door het job_name_prefix dat u opgeeft.

Ten slotte get_best_job functie aggregeert de outputs in een Pandas DataFrame in oplopende volgorde van de objectieve metriek voor visualisatie.

# helper function to launch a desired number of "n_parallel" HPO jobs simultaneously
def _parallel_hpo_no_polling(job_name_prefix, n_parallel, inputs, max_candidates_per_job, max_parallel_per_job): """kicks off N_parallel Bayesian HPO jobs in parallel job_name_prefix: user specified prefix for job names n_parallel: Number of HPO jobs to start in parallel inputs: training and test data s3 paths max_candidates_per_job: number of training jobs to run in each HPO job in total max_parallel_per_job: number of training jobs to run in parallel in each job """ # kick off n_parallel jobs simultaneously and returns all the job names tuning_job_names = [] for i in range(n_parallel): timestamp_suffix = strftime("%d-%H-%M-%S", gmtime()) try: tuner = HyperparameterTuner(estimator, objective_metric_name, hyperparameter_ranges, metric_definitions, max_jobs=max_candidates_per_job, max_parallel_jobs=max_parallel_per_job, objective_type=objective_type ) # fit the tuner to the inputs and include it as part of experiments tuner.fit(inputs, job_name = f'{job_name_prefix}-{timestamp_suffix}', wait=False ) # set wait=False, so you can launch other jobs in parallel. tuning_job_names.append(tuner.latest_tuning_job.name) sleep(1) #this is required otherwise you will get an error for using the same tuning job name print(tuning_job_names) except Exception as e: sleep(5) return tuning_job_names #orchestration and polling logicdef poll_and_run(job_name_prefix, inputs, max_total_candidates, max_parallel_across_jobs, max_candidates_per_job, max_parallel_per_job): """Polls for number of running HPO jobs. If less than max_parallel , starts a new one. job_name_prefix: the name prefix to give all your training jobs max_total_candidates: how many total trails to run across all HPO jobs max_candidates_per_job: how many total trails to run for 1 HPO job max_parallel_per_job: how many trials to run in parallel for a given HPO job (fixed to 10 without limit increases). max_parallel_across_jobs: how many concurrent trials to run in parallel across all HPO jobs """ #get how many jobs to run in total and concurrently max_num, max_parallel, _ = bayesian_batching_cold_start(max_total_candidates, max_parallel_across_jobs=max_parallel_across_jobs, max_parallel_per_job=max_parallel_per_job, max_candidates_per_job = max_candidates_per_job ) # continuously polls for running jobs -- if they are less than the required number, then launches a new one. all_jobs = sm.list_hyper_parameter_tuning_jobs(SortBy='CreationTime', SortOrder='Descending', NameContains=job_name_prefix, MaxResults = 100)['HyperParameterTuningJobSummaries'] all_jobs = [i['HyperParameterTuningJobName'] for i in all_jobs] if len(all_jobs)==0: print(f"Starting a set of HPO jobs with the prefix {job_name_prefix} ...") num_left = max_num #kick off the first set of jobs all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel, num_left), inputs, max_candidates_per_job, max_parallel_per_job) else: print("Continuing where you left off...") response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus'] for i in all_jobs] print(f"Already completed jobs = {response_list.count('Completed')}") num_left = max_num - response_list.count("Completed") print(f"Number of jobs left to complete = {max(num_left, 0)}") while num_left >0: response_list = [sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=i)['HyperParameterTuningJobStatus'] for i in all_jobs] running_jobs = response_list.count("InProgress") # look for the jobs that are running. print(f"number of completed jobs = {response_list.count('Completed')}") sleep(10) if running_jobs < max_parallel and len(all_jobs) < max_num: all_jobs += _parallel_hpo_no_polling(job_name_prefix, min(max_parallel-running_jobs, num_left), inputs, max_candidates_per_job, max_parallel_per_job) num_left = max_num - response_list.count("Completed") return all_jobs
# Aggregate the results from all the HPO jobs based on the custom metric specified
def get_best_job(all_jobs_list): """Get the best job from the list of all the jobs completed. Objective is to maximize a particular value such as AUC or F1 score""" df = pd.DataFrame() for job in all_jobs_list: tuner = sagemaker.HyperparameterTuningJobAnalytics(job) full_df = tuner.dataframe() tuning_job_result = sm.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=job) is_maximize = (tuning_job_result['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['Type'] == 'Maximize') if len(full_df) > 0: df = pd.concat([df, full_df[full_df['FinalObjectiveValue'] < float('inf')]]) if len(df) > 0: df = df.sort_values('FinalObjectiveValue', ascending=is_maximize) print("Number of training jobs with valid objective: %d" % len(df)) print({"lowest":min(df['FinalObjectiveValue']),"highest": max(df['FinalObjectiveValue'])}) pd.set_option('display.max_colwidth', -1) # Don't truncate TrainingJobName return df else: print("No training jobs have reported valid results yet.")

Nu kunnen we dit testen door in totaal 260 proeven uit te voeren, en vragen dat de code te allen tijde 20 proeven parallel uitvoert:

alljobs = poll_and_run('newtrials', inputs, max_total_candidates=260, max_parallel_across_jobs = 20, max_candidates_per_job=4, max_parallel_per_job=2)

Nadat de taken zijn voltooid, kunnen we alle outputs bekijken (zie de volgende schermafbeelding).

Nadat de taken zijn voltooid, kunnen we alle outputs bekijken (zie de volgende schermafbeelding).

Met de bovenstaande code kunt u HPO-taken parallel uitvoeren tot de toegestane limiet van 100 gelijktijdige HPO-taken.

Parallel HPO-banen met warme start

Stel nu dat u een warme startopdracht wilt uitvoeren, waarbij het resultaat van een eerdere opdracht wordt gebruikt als invoer voor de volgende opdracht. Warme start is vooral handig als u al een set hyperparameters hebt bepaald die een goed model opleveren, maar nu over nieuwe gegevens beschikken. Een andere use case voor warme start is wanneer een enkele HPO-taak lang kan duren, met name voor deep learning-workloads. In dat geval wilt u misschien de uitgangen van de vorige taak gebruiken om de volgende te starten. In ons geval kan dat gebeuren wanneer u een batch nieuwe maandelijkse of driemaandelijkse standaardgegevens ontvangt. Zie voor meer informatie over SageMaker HPO met warme start Voer een Warm Start Hyperparameter Tuning Job uit.

Het cruciale verschil tussen warme en koude start is de van nature opeenvolgende aard van warme start. Nogmaals, stel dat we 10,000 banen met een warme start willen lanceren. Deze keer starten we slechts één HPO-taak met de maximaal toegestane max_jobs parameter, wacht tot deze is voltooid en start de volgende taak met deze taak als ouder. We herhalen het proces totdat het totale gewenste aantal banen is bereikt. We kunnen dit bereiken met de volgende code:

def large_scale_hpo_warmstart(job_name_prefix, inputs, max_total_trials, max_parallel_per_job, max_trials_per_hpo_job=250): """Kicks off sequential HPO jobs with warmstart. job_name_prefix: user defined prefix to name your HPO jobs. HPO will add a timestamp inputs: locations of train and test datasets in S3 provided as a dict max_total_trials: total number of trials you want to run max_trials_per_hpo_job: Fixed at 250 unless you want fewer. max_parallel_per_job: max trails to run in parallel per HPO job""" if max_trials_per_hpo_job >250: raise ValueError('Please select a value less than or equal to 250 for max_trials_per_hpo_job') base_hpo_job_name = job_name_prefix timestamp_suffix = strftime("%d-%H-%M-%S", gmtime()) tuning_job_name = lambda i : f"{base_hpo_job_name}-{timestamp_suffix}-{i}" current_jobs_completed = 0 job_names_list = [] while current_jobs_completed < max_total_trials: jobs_to_launch = min(max_total_trials - current_jobs_completed, max_trials_per_hpo_job) hpo_job_config = dict( estimator=estimator, objective_metric_name=objective_metric_name, metric_definitions=metric_definitions, hyperparameter_ranges=hyperparameter_ranges, max_jobs=jobs_to_launch, strategy="Bayesian", objective_type=objective_type, max_parallel_jobs=max_parallel_per_job, ) if current_jobs_completed > 0: parent_tuning_job_name = tuning_job_name(current_jobs_completed) warm_start_config = WarmStartConfig( WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM, parents={parent_tuning_job_name} ) hpo_job_config.update(dict( base_tuning_job_name=parent_tuning_job_name, warm_start_config=warm_start_config )) tuner = HyperparameterTuner(**hpo_job_config) tuner.fit( inputs, job_name=tuning_job_name(current_jobs_completed + jobs_to_launch), logs=True, ) tuner.wait() job_names_list.append(tuner.latest_tuning_job.name) current_jobs_completed += jobs_to_launch return job_names_list

Nadat de taken zijn uitgevoerd, gebruikt u opnieuw de get_best_job functie om de bevindingen samen te voegen.

Andere HPO-tools gebruiken met SageMaker

SageMaker biedt de flexibiliteit om andere HPO-tools, zoals degene die eerder zijn besproken, te gebruiken om uw HPO-taken uit te voeren door het ongedifferentieerde zware werk van het beheer van de onderliggende infrastructuur weg te nemen. Een populaire open-source HPO-tool is bijvoorbeeld Ray Tune [2], een Python-bibliotheek voor grootschalige HPO die de meeste populaire frameworks ondersteunt, zoals XGBoost, MXNet, PyTorch en TensorFlow. Ray integreert met populaire zoekalgoritmen zoals Bayesian, HyperOpt en SigOpt, gecombineerd met geavanceerde planners zoals Hyperband of ASHA.

Om Ray met PyTorch te gebruiken, moet je eerst ray [tune] opnemen en naar je requirements.txt-bestand in je codemap met je trainingsscript opnemen. Geef de codemap als volgt op in de SageMaker PyTorch-schatter:

from sagemaker.pytorch import PyTorch estimator = PyTorch(entry_point="train_ray_cpu.py", #put requirements.txt file to install ray role=role, source_dir='./code', framework_version='1.6.0', py_version='py3', output_path = f's3://{bucket}/{prefix}/output', instance_count=1, instance_type='ml.m5.xlarge', sagemaker_session=sagemaker_session, hyperparameters={ 'epochs': 7, 'backend': 'gloo' # gloo for CPU and nccl for GPU }, disable_profiler=True) inputs ={'training': train_s3, 'testing':test_s3} estimator.fit(inputs, wait=True)

Uw trainingsscript moet worden aangepast om uw aangepaste statistieken uit te voeren naar de Ray-rapportgenerator, zoals weergegeven in de volgende code. Hierdoor kan uw trainingstaak communiceren met Ray. Hier gebruiken we de ASHA-planner om vroegtijdig stoppen te implementeren:

# additional imports
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler # modify test function to output to ray tune report.
def test(model, test_loader, device): # same as test function above with 1 line of code added to enable communication # with tune. tune.report(loss=test_loss, accuracy=correct / (len(target)*i), f1score=f1score, roc=roc)

U moet uw model ook regelmatig controleren:

for epoch in range(1, args.epochs + 1): model.train() for batch_idx, (data, target) in enumerate(train_loader, 1): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target.squeeze()) loss.backward() optimizer.step() if batch_idx % args.log_interval == 0: logger.info("Train Epoch: {} [{}/{} ({:.0f}%)] Loss: {:.6f}".format( epoch, batch_idx * len(data), len(train_loader.sampler), 100. * batch_idx / len(train_loader), loss.item())) # have your test function publish metrics to tune.report test(model, test_loader, device) # checkpoint your model with tune.checkpoint_dir(epoch) as checkpoint_dir: # modified to store checkpoint after every epoch. path = os.path.join(checkpoint_dir, "checkpoint") torch.save((model.state_dict(), optimizer.state_dict()), path)

Ten slotte moet u het trainingsscript in een aangepaste hoofdfunctie verpakken die de hyperparameters instelt, zoals de leersnelheid, de grootte van de eerste en tweede verborgen lagen en eventuele aanvullende hyperparameters die u wilt herhalen. U moet ook een planner gebruiken, zoals de ASHA-planner die we hier gebruiken, voor GPU-training met één en meerdere knooppunten. We gebruiken het standaard afstemmingsalgoritme Variant generatie, die zowel random (getoond in de volgende code) als grid search ondersteunt, afhankelijk van de gebruikte configuratieparameter.

def main(args): config = { "l1": tune.sample_from(lambda _: 2**np.random.randint(2, 9)), "l2": tune.sample_from(lambda _: 2**np.random.randint(2, 9)), "lr": tune.loguniform(1e-4, 1e-1) } scheduler = ASHAScheduler( metric="loss", mode="min", max_t=args.epochs, grace_period=1, reduction_factor=2) reporter = CLIReporter( metric_columns=["loss","training_iteration", "roc"]) # run the HPO job by calling train print("Starting training ....") result = tune.run( partial(train_ray, args=args), resources_per_trial={"cpu": args.num_cpus, "gpu": args.num_gpus}, config=config, num_samples=args.num_samples, scheduler=scheduler, progress_reporter=reporter)

De uitvoer van de taak ziet eruit als de volgende schermafbeelding.

De uitvoer van de taak ziet eruit als de volgende schermafbeelding.

Ray Tune beëindigt automatisch slecht presterende taken, terwijl de beter presterende taken langer doorgaan, waardoor uw totale HPO-tijden worden geoptimaliseerd. In dit geval liep de best presterende taak alle 7 volledige tijdperken, terwijl andere hyperparameterkeuzes vroegtijdig werden gestopt. Zie voor meer informatie over hoe vroeg stoppen werkt met SageMaker HPO hier.

HPO-banen in de wachtrij plaatsen met Amazon SQS

Wanneer meerdere datawetenschappers tegelijkertijd HPO-jobs creëren in hetzelfde account, kan de limiet van 100 gelijktijdige HPO-jobs per account worden bereikt. In dit geval kunnen we gebruiken Amazon SQS om een ​​HPO-takenwachtrij te maken. Elk HPO-taakverzoek wordt weergegeven als een bericht en verzonden naar een SQS-wachtrij. Elk bericht bevat hyperparameters en instelbare hyperparameterbereiken in de berichttekst. Er wordt ook een Lambda-functie gemaakt. De functie controleert eerst het aantal HPO-opdrachten in uitvoering. Als de limiet van 100 gelijktijdige HPO-taken niet wordt bereikt, haalt het berichten op uit de SQS-wachtrij en worden HPO-taken gemaakt zoals bepaald in het bericht. De functie wordt geactiveerd door Amazon EventBridge gebeurtenissen met regelmatige tussenpozen (bijvoorbeeld elke 10 minuten). De eenvoudige architectuur wordt als volgt weergegeven.

De eenvoudige architectuur wordt als volgt weergegeven.

Om deze architectuur te bouwen, maken we eerst een SQS-wachtrij en noteren we de URL. In de Lambda-functie gebruiken we de volgende code om het aantal HPO-taken in uitvoering te retourneren:

sm_client = boto3.client('sagemaker') def check_hpo_jobs(): response = sm_client.list_hyper_parameter_tuning_jobs( MaxResults=HPO_LIMIT, StatusEquals='InProgress') return len(list(response["HyperParameterTuningJobSummaries"]))

Als het aantal HPO-taken in uitvoering groter is dan of gelijk is aan de limiet van 100 gelijktijdige HPO-taken (zie voor huidige limieten Amazon SageMaker-eindpunten en quota), retourneert de Lambda-functie 200 status en wordt afgesloten. Als de limiet niet wordt bereikt, berekent de functie het aantal HPO-jobs dat beschikbaar is om te maken en haalt hetzelfde aantal berichten op uit de SQS-wachtrij. Vervolgens extraheert de Lambda-functie hyperparameterbereiken en andere gegevensvelden voor het maken van HPO-taken. Als de HPO-taak met succes is gemaakt, wordt het bijbehorende bericht verwijderd uit de SQS-wachtrij. Zie de volgende code:

def lambda_handler(event, context): # first: check HPO jobs in progress hpo_in_progress = check_hpo_jobs() if hpo_in_progress >= HPO_LIMIT: return { 'statusCode': 200, 'body': json.dumps('HPO concurrent jobs limit reached') } else: hpo_capacity = HPO_LIMIT - hpo_in_progress container = image_uris.retrieve("xgboost", region, "0.90-2") train_input = TrainingInput(f"s3://{bucket}/{key_prefix}/train/train.csv", content_type="text/csv") validation_input = TrainingInput(f"s3://{bucket}/{key_prefix}/validation/validation.csv", content_type="text/csv") while hpo_capacity > 0: sqs_response = sqs.receive_message(QueueUrl = queue_url) if 'Messages' in sqs_response.keys(): msgs = sqs_response['Messages'] for msg in msgs: try: hp_in_msg = json.loads(msg['Body'])['hyperparameter_ranges'] create_hpo(container,train_input,validation_input,hp_in_msg) response = sqs.delete_message(QueueUrl=queue_url,ReceiptHandle=msg['ReceiptHandle']) hpo_capacity = hpo_capacity-1 if hpo_capacity == 0: break except : return ("error occurred for message {}".format(msg['Body'])) else: return {'statusCode': 200, 'body': json.dumps('Queue is empty')} return {'statusCode': 200, 'body': json.dumps('Lambda completes')}

Nadat uw Lambda-functie is gemaakt, kunt u triggers toevoegen met de volgende stappen:

  1. Kies uw functie op de Lambda-console.
  2. Op de Configuratie pagina, kies Trigger toevoegen.
  3. kies EventBridge (CloudWatch-evenementen).
  4. Kies Maak een nieuwe regel.
  5. Voer een naam in voor uw regel.
  6. kies Plan expressie.
  7. Stel het tarief in op 10 minuten.
  8. Kies Toevoegen.

Deze regel activeert onze Lambda-functie elke 10 minuten.

Wanneer dit is voltooid, kunt u het testen door berichten naar de SQS-wachtrij te sturen met uw HPO-taakconfiguratie in de berichttekst. De code en het notitieblok voor deze architectuur staan ​​op onze GitHub repo. Zie de volgende code:

response = sqs.send_message( QueueUrl=queue_url, DelaySeconds=1, MessageBody=( '{"hyperparameter_ranges":{ "<hyperparamter1>":<range>, "hyperparamter2":<range>} }' ) )

Conclusies

ML-ingenieurs moeten vaak door een grote hyperparameterruimte zoeken om het best presterende model voor hun toepassing te vinden. Voor complexe deep learning-modellen, waar individuele trainingstaken behoorlijk tijdrovend kunnen zijn, kan dit een omslachtig proces zijn dat vaak weken of maanden aan ontwikkelaarstijd in beslag kan nemen.

In dit bericht hebben we besproken hoe u het aantal afstemmingsopdrachten dat u parallel met SageMaker kunt starten, kunt maximaliseren, waardoor de totale tijd die nodig is om HPO uit te voeren met door de gebruiker gespecificeerde objectieve maatstaven op maat, wordt verminderd. We hebben eerst een op Jupyter-notebook gebaseerde benadering besproken die door individuele datawetenschappers kan worden gebruikt voor onderzoeks- en experimenteerworkflows. We hebben ook laten zien hoe je een SQS-wachtrij kunt gebruiken om teams van datawetenschappers in staat te stellen meer opdrachten in te dienen. SageMaker is een zeer flexibel platform waarmee u uw eigen HPO-tool kunt meenemen, die we hebben geïllustreerd met behulp van de populaire open-source tool Ray Tune.

Zie voor meer informatie over het brengen van andere algoritmen, zoals genetische algoritmen naar SageMaker HPO Breng uw eigen algoritme voor hyperparameteroptimalisatie op Amazon SageMaker.

Referenties

[1] Hyperparameteroptimalisatie: een overzicht van algoritmen en toepassingen, Yu, T. en Zhu, H., https://arxiv.org/pdf/2003.05689.pdf.

[2] Tune: een onderzoeksplatform voor gedistribueerde modelselectie en training, https://arxiv.org/abs/1807.05118.


Over de auteurs

Iaroslav Sjtsjerbatyi is Machine Learning Engineer bij Amazon Web Services. Zijn werk is gericht op verbeteringen aan het Amazon SageMaker-platform en het helpen van klanten om de functies optimaal te gebruiken. In zijn vrije tijd houdt hij ervan om recent onderzoek in ML in te halen en buitensporten te beoefenen zoals schaatsen of wandelen.

Enrico Sartorello is Sr. Software Development Engineer bij Amazon Web Services. Hij helpt klanten bij het adopteren van machine learning-oplossingen die passen bij hun behoeften door nieuwe functionaliteiten voor Amazon SageMaker te ontwikkelen. In zijn vrije tijd volgt hij met passie zijn voetbalteam en werkt hij graag aan zijn kookkunsten.

Toeshar Saxena is Principal Product Manager bij Amazon, met de missie om de bestandsopslagactiviteiten van AWS te laten groeien. Voordat hij bij Amazon kwam, leidde hij business units voor telecominfrastructuur bij twee bedrijven en speelde hij een centrale rol bij de lancering van de glasvezelbreedbandservice van Verizon. Hij begon zijn carrière als onderzoeker bij GE R&D en BBN, waar hij werkte in computervisie, internetnetwerken en videostreaming.

Stefan Natu is Senior Machine Learning Specialist bij Amazon Web Services. Hij richt zich op het helpen van financiële dienstverleners bij het bouwen van end-to-end machine learning-oplossingen op AWS. In zijn vrije tijd leest hij graag blogs over machine learning, speelt hij gitaar en verkent hij de eetcultuur in New York City.

Qingwei Li is Machine Learning Specialist bij Amazon Web Services. Hij promoveerde in Operations Research nadat hij de onderzoeksbeursrekening van zijn adviseur had verbroken en de beloofde Nobelprijs niet kon leveren. Momenteel helpt hij klanten in de financiële dienstverlening en de verzekeringssector bij het bouwen van machine learning-oplossingen op AWS. In zijn vrije tijd houdt hij van lezen en lesgeven.

Bron: https://aws.amazon.com/blogs/machine-learning/running-multiple-hpo-jobs-in-parallel-on-amazon-sagemaker/

spot_img

Laatste intelligentie

spot_img