Opening and closing a db connection is expensive, hence, we keep the connection open.Next(generate_from_db( "username", "password", "host", 5432, "database")) ) # this will fetch data in batches from the ready data in db if not rows: ) # this will get the data ready on the db side while True: Note that the yield from items is short hand forĭef generate_from_db(username, password, host, port, dbname, batch_size = 10000):Ĭonn_url = f = psycopg2. Next(final_stream) # you will notice a 15s wait ] # you will notice almost no wait, since this data is held in process memory # we have iterated through the first batch of 10,000 # the next call will invoke the service_call function, thus sleeping for 15s Next(final_stream) # you will notice a 15s wait, simulating the external service call make batched calls to the external serviceįinal_stream = batched_service_transforms(extractor) sleep( 15) # simulating an external service call return rowsĭef batched_service_transforms(rows, batch_size = 10000): Output_file_name = "./pv_2018_ext_call_enriched.csv" def service_call(rows): You can also create your own generator as shown below. reader(read_file_object)įinal_stream = (row + for row in extractor) split( "."), "%Y%m %d "ĭate_diff = (vehicle_expiration_date - issue_date). Output_file_name = "./pv_2018_w_days_until_violation.csv" def get_days_until_expiration(row): This function will calculate the number of days between the violation issue date and the vehicle expiration date. Let’s see an example where we use a more complex function. The above example is simple and the logic can be easily described using lambda functions. We also chain multiple generators to form a data pipeline, this is called chaining. In the above example, we create generators with the () comprehension format. writerows(final_stream) # loader asks for data from final_stream Write_file_object, delimiter = ",", quotechar = '"', quoting =csv. Write_file_object = open(output_file_name, "w") join(, stream, stream])]įinal_stream # this is a generator object and has not yet started generating data # 6. concat house number, street name, registration state into a single address field Lambda x: all( = "NJ", x = "FORD", x = "P"]), col_filtered_stream keep only violations issued by police, to vehicles with the make FORD in NJ keep only required fields # field index => field # 2 => registration state, 7 => vehicle make, 8 => issuing agency, 23 => house number, 24 => street nameĬol_filtered_stream = (, row, row, row, row] for row in extractor) reader(read_file_object) # csv reader produces a generator # 2. Read_file_object = open(input_file_name, "r")Įxtractor = csv. Output_file_name = "./nj_ford_trasnportation_issued_pv_2018.csv" # 1. Input_file_name = "./parking-violations-issued-fiscal-year-2018.csv" Write the result’s in to a csv file with the header vehicle_make,issuing_agency,address.Concat house number, street name, and registration state fields into a single address field.Keep only the violations issued by police (denoted by P in the data), to vehicles with the make FORD in NJ.Let’s say our objective is to process the parking violation 2018 dataset You can also get data directly from a generator using next(generator_obj).
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |