-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcodecarbon_tutorial.py
98 lines (83 loc) · 2.77 KB
/
codecarbon_tutorial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from codecarbon import EmissionsTracker
from decimal import Decimal
import boto3
import csv
import os
emissions_tracker = EmissionsTracker(project_name='emissions_demo')
cloudwatch_client = boto3.client('cloudwatch', 'eu-west-1')
def handler():
'''
This is my handler function
'''
# Track the emissions of our business logic
emissions_tracker.start()
my_business_logic()
emissions_tracker.stop()
# Extract the data we want to keep from the file
timestamp, emissions_in_kg, energy_consumption_in_kwh = get_emissions_data_from_file()
# Remove file created by CodeCarbon
delete_emissions_file()
# Upload metrics for visibility
upload_emissions(emissions_in_kg, timestamp)
upload_energy_consumption(energy_consumption_in_kwh, timestamp)
def my_business_logic():
'''
My business logic should go in here
'''
my_list = []
my_list.append('Hello')
my_list.append('World')
my_list.reverse()
print(my_list)
def get_emissions_data_from_file():
'''
Gets the emissions data from the file produced by CodeCarbon
'''
with open('emissions.csv', newline='') as f:
i = 0
reader = csv.reader(f)
for row in reader:
if i == 0:
i += 1
continue
if row == []:
i += 1
continue
timestamp = row[0]
emissions_in_kg = format(Decimal(row[4]), '.10f')
energy_consumption_in_kwh = format(Decimal(row[5]), '.10f')
print('timestamp: ' + str(timestamp))
print('emissions_in_kg: ' + str(emissions_in_kg))
print('energy_consumption_in_kwh: ' + str(energy_consumption_in_kwh))
return timestamp, emissions_in_kg, energy_consumption_in_kwh
def delete_emissions_file():
'''
Removes the file produced by CodeCarbon
'''
os.remove("emissions.csv")
def upload_emissions(emissions_in_kg, timestamp):
'''
Uploads emissions to improve visibility
'''
return upload_custom_metric_to_cloudwatch('emissions_in_kg', emissions_in_kg, timestamp)
def upload_energy_consumption(energy_consumption_in_kwh, timestamp):
'''
Uploads energy consumption to improve visibility
'''
return upload_custom_metric_to_cloudwatch('energy_consumption_in_kwh', energy_consumption_in_kwh, timestamp)
def upload_custom_metric_to_cloudwatch(metric_name, value, timestamp):
'''
Uploads custom metric to CloudWatch
'''
return cloudwatch_client.put_metric_data(
Namespace='EmissionsDemo',
MetricData=[
{
'MetricName': metric_name,
'Value': Decimal(value),
'Unit': 'None',
'Timestamp': timestamp
},
]
)
handler()