Skip to content

StaticPool

Bases: Thread

Creates a pool to submit job and manage tasks based on their estimated costs. Jobs are given at the invocation of the instance and ends when all jobs are processed.

Attributes:

Name Type Description
init_memory int

Memory available at the creation of the pool. It can be overrided with override_max_value

memory int

Memory available at the moment

output str or None

The file to write the output. If none, will write on the console. (default is None)

pool list

A list of jobs to be executed

launched list

A list of jobs that have been launched

idle_time int

The time in seconds between each verification by the system of the current resources allocations (default is 1)

refresh_time int

The time in seconds between each status emission by the pool (default is 5)

is_over bool

If True, the pool is terminated

count int

The number of jobs submitted

start_date datetime object

The date at which the pool has been created

end_date datetime object

The date at which the pool has been terminated

Methods

allocate(calculation) Virtually allocate an amount of memory based on the calculation cost - Not meant to be called by the user - release(self, calculation) Virtually release an amount of memory based on the calculation cost - Not meant to be called by the user - end(now=False) Ends the pool and wait for it to be dead run() The running function of the pool. Is called by start method - Not meant to be called by the user - display_status() Displays the current status of the pool with running jobs, memory allocation, calculation times and job in the queue emit_status() Execute the display_status method every refresh_time with a time stamp check_status() A thread that check the current status and health of the pool - Not meant to be called by the user - review(awaiting=False) Displays a summary of all calculation performed by the pool. It should only be called after when the pool is dead (i.e. after the end function is called).

Parameters:

Name Type Description Default
target function

The function containing the jobs to be performed

required
args list of tuples

A list of tuples containing the arguments to be parsed. For instance, if n jobs of the function have to be executed with k arguments for the function, a list of n tuples, each containing k arguments should be given

required
cost int

The cost of each job in giga octets

20
idle_time int, optional

The time in seconds between each verification by the system of the current resources allocations (default is 1)

1
refresh_time int, optional

The time in seconds between each status emission by the pool (default is 5)

5
override_max_value int or None, optional

The maximum virtual memory available in the system/for the pool. If None, will get the maximum from the OS (default is None)

None
output str or None, optional

The file to write the output. If none, will write on the console. (default is None)

None
Source code in PoolFlow\static_pool.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def __init__(self, target, args, cost=20, idle_time=1, refresh_time=5, override_max_value=None,
             output=None):
    """
    Parameters
    ----------
    target : function
        The function containing the jobs to be performed
    args : list of tuples
        A list of tuples containing the arguments to be parsed. For instance, if n jobs of the function have to be
        executed with k arguments for the function, a list of n tuples, each containing k arguments should be given
    cost : int
        The cost of each job in giga octets
    idle_time : int, optional
        The time in seconds between each verification by the system of the current resources allocations
        (default is 1)
    refresh_time : int, optional
       The time in seconds between each status emission by the pool (default is 5)
    override_max_value : int or None, optional
        The maximum virtual memory available in the system/for the pool. If None, will get the maximum from the OS
        (default is None)
    output : str or None, optional
        The file to write the output. If none, will write on the console. (default is None)
    """
    super(StaticPool, self).__init__()
    self.status = None
    if not override_max_value:
        self.init_memory = round(mem().available / 1E9, 0)
    else:
        self.init_memory = override_max_value
    self.output = output
    self.memory = self.init_memory
    self.cost = cost
    self.args = args
    self.threads = []
    self.idle_time = idle_time
    self.refresh_time = refresh_time
    self.len = len(args)
    self.where = 0
    self.target = target
    self.is_over = False
    self.start_date = datetime.datetime.now().strftime("%d/%m/%y-%H:%M:%S")
    self.end_date = ""
    self.pool = []
    self.launched = []
    self.count = 0
    for arg in self.args:
        self.count += 1
        self.pool.append(Calculation(target, arg, self.cost, self.count))
    if self.output:
        self.csl = Console(file=open(self.output, 'w'))
    else:
        self.csl = Console()

runnings property

Returns a list with running jobs - Not meant to be called by the user -

deads property

Returns a list of dead jobs - Not meant to be called by the user -

allocate(calc)

Virtually allocate an amount of memory based on the calculation cost - Not meant to be called by the user -

Parameters:

Name Type Description Default
calc a Calculation object
required
Source code in PoolFlow\static_pool.py
119
120
121
122
123
124
125
126
127
128
def allocate(self, calc):
    """
    Virtually allocate an amount of memory based on the calculation cost
     - Not meant to be called by the user -

    Parameters
    ----------
    calc : a Calculation object
    """
    self.memory += -calc.cost

release(calc)

Virtually release an amount of memory based on the calculation cost - Not meant to be called by the user -

Parameters:

Name Type Description Default
calc a Calculation object
required
Source code in PoolFlow\static_pool.py
130
131
132
133
134
135
136
137
138
139
def release(self, calc):
    """
    Virtually release an amount of memory based on the calculation cost
     - Not meant to be called by the user -

    Parameters
    ----------
    calc : a Calculation object
    """
    self.memory += calc.cost

run()

The running function of the pool. Is called by start method - Not meant to be called by the user -

Source code in PoolFlow\static_pool.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def run(self):
    """
    The running function of the pool. Is called by start method
     - Not meant to be called by the user -
    """
    self.status = self.check_status()
    while not self.is_over:
        for calc in self.pool:
            if self.memory > calc.cost and calc.ispreprocessed:
                self.allocate(calc)
                self.launched.append(calc)
                self.pool.remove(calc)
                self.launched[-1].start()
            elif not calc.ispreprocessed:
                calc.launch_pre_processing()
            else:
                pass
        for calc in self.launched:
            if not calc.isrunning and calc.isdead and not calc.ispostprocessed:
                calc.launch_post_processing()
            else:
                pass
        time.sleep(self.idle_time)

display_status()

Displays the current status of the pool with running jobs, memory allocation, calculation times and job in the queue

Source code in PoolFlow\static_pool.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def display_status(self):
    """
    Displays the current status of the pool with running jobs, memory allocation, calculation times and job in the
    queue
    """
    if self.output:
        with open(self.output, 'w') as out:
            out.write('')
    self.csl.print(f'[blue]Pool started - {self.start_date}')
    self.csl.print(f'[bold red] Threading status - {datetime.datetime.now().strftime("%d/%m/%y-%H:%M:%S")}')
    self.csl.print(f'Threads running: {len(self.runnings)}')
    self.csl.print(f'Threads in queue: {len(self.pool)}')
    self.csl.print(f'Threads dead: {len(self.deads)}')
    self.csl.print('[bold blue] Alive threads in pool')
    table = Table()
    table.add_column('Thread Id')
    table.add_column('Cost (Go)')
    table.add_column('Target')
    table.add_column('Parameters')
    table.add_column('Memory Status')
    table.add_column('Start Date')
    for calc in self.runnings:
        table.add_row(str(calc.id), str(calc.cost), str(calc.target.__name__), str(calc.args),
                      MemoryStatus(calc.counted), calc.start_date)
    self.csl.print(table)
    self.csl.print('[bold pink] Memory status')
    self.csl.print(f'Total memory: {self.init_memory} Go\t Available Memory: {self.memory} Go')

emit_status()

Execute the display_status method every refresh_time with a time stamp

Source code in PoolFlow\static_pool.py
193
194
195
196
197
198
199
200
201
202
203
@threaded
def emit_status(self):
    """
    Execute the display_status method every refresh_time with a time stamp
    """
    while self.is_alive():
        self.display_status()
        sys.stdout.flush()
        time.sleep(self.refresh_time)
    self.display_status()
    self.csl.print(f'[blue]End of pool - {datetime.datetime.now().strftime("%d/%m/%y-%H:%M:%S")}')

check_status()

A thread that check the current status and health of the pool - Not meant to be called by the user -

Source code in PoolFlow\static_pool.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@threaded
def check_status(self):
    """
    A thread that check the current status and health of the pool
    - Not meant to be called by the user -
    """
    while not self.is_over:
        for calc in self.launched:
            if not calc.thread.is_alive() and not calc.counted and calc.isrunning:
                self.release(calc)
                calc.end_date = datetime.datetime.now().strftime("%d/%m/%y-%H:%M:%S")
                calc.isrunning = False
                calc.isdead = True
                calc.counted = True
            elif self.pool == [] and self.runnings == [] and len(self.deads) == self.count:
                self.is_over = True
            else:
                pass
        time.sleep(self.idle_time)

    self.end_date = datetime.datetime.now().strftime("%d/%m/%y-%H:%M:%S")

review(awaiting=False)

Displays a summary of all calculation performed by the pool. It should only be called after when the pool is dead (i.e. after the end function is called).

Parameters:

Name Type Description Default
awaiting bool

If True, will wait for the pool to end. (default is False)

False
Source code in PoolFlow\static_pool.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@threaded
def review(self, awaiting=False):
    """
    Displays a summary of all calculation performed by the pool. It should only be called after when the pool is
    dead (i.e. after the end function is called).

    Parameters
    ----------
    awaiting : bool
        If True, will wait for the pool to end. (default is False)
    """
    if awaiting:
        self.status.join()
        time.sleep(3 * self.idle_time)

    self.csl.print(f'[bold red]Pool Review', justify="center")
    self.csl.print(f'[blue]Started - {self.start_date}', justify='center')
    self.csl.print(f'[blue]Ended - {self.end_date}', justify='center')
    table = Table()
    table.add_column('Thread Id')
    table.add_column('Thread Status')
    table.add_column('Memory Status')
    table.add_column('Cost (Go)')
    table.add_column('Target')
    table.add_column('Parameters')
    table.add_column('Start Date')
    table.add_column('End Date')
    table.add_column('Running Time (h)')
    for calc in (self.pool + self.launched):
        table.add_row(str(calc.id), ThreadStatus(calc.thread.is_alive()), MemoryStatus(calc.counted),
                      str(calc.cost), str(calc.target.__name__), str(calc.args),
                      calc.start_date, calc.end_date, RunningTime(calc.start_date, calc.end_date))
    self.csl.print(table)

end(now=False)

Ends the pool and wait for it to be dead.

Parameters:

Name Type Description Default
now bool, optional

If True, forces the jobs to end (default is False)

False
Source code in PoolFlow\static_pool.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def end(self, now=False):
    """
    Ends the pool and wait for it to be dead.

    Parameters
    ----------
    now : bool, optional
        If True, forces the jobs to end (default is False)
    """
    if now:
        if not self.pool:
            self.csl.print('[red]Cannot end pool - Threads are still awaiting. Force with empty method')
    else:
        if not now:
            while self.pool or self.runnings:
                time.sleep(self.idle_time)
        self.is_over = True
        end = [calc.thread.join() for calc in self.launched]
        self.join()
        time.sleep(self.idle_time)