Skip to content

Utilities

Calculation(target, args, cost, id, pre_processing=None, post_processing=None)

An object to launch the jobs, check their status and manage pre and post processing Calculation instances are directly created and managed by pool - Not meant to be called by users -

Attributes:

Name Type Description
target function

The function containing the job

args tuple

The arguments of the function to be performed

cost int

The cost of each job in giga octets

id int

The id of the job. Directly managed by the pool.

pre_processing tuple or None

The pre-processing function and its argument to be executed in the form of (function, (tuple,with,arguments)). (default is None)

post_processing tuple or None

The post-processing function and its argument to be executed in the form of (function, (tuple,with,arguments)). (default is None)

counted bool

An internal flag for pools to manage if Calculation object has been recorded as done

start_date datetime object

The date at which the job has been launched

end_date datetime object

The date at which the job has been ended

is_done bool

An internal flag for pools to manage if Calculation object has been done

isdead bool

An internal flag for pools to manage if job is dead

ispostprocessed bool

An internal flag for pools to manage if post-processing has been executed

isppreprocessed bool

An internal flag for pools to manage if pre-processing has been executed

isrunning bool

An internal flag for pools to manage if job is running

running_time datetime

The time the job has been running

thread Thread

The thread containing the job

Parameters:

Name Type Description Default
target function

The function containing the job

required
args tuple

The arguments of the function to be performed

required
cost int

The cost of each job in giga octets

required
id int

The id of the job. Directly managed by the pool.

required
pre_processing tuple or None, optional

The pre-processing function and its argument to be executed in the form of (function, (tuple,with,arguments)). (default is None)

None
post_processing tuple or None, optional

The post-processing function and its argument to be executed in the form of (function, (tuple,with,arguments)). (default is None)

None
Source code in PoolFlow\utilities\pool_utilities.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def __init__(self, target, args, cost, id, pre_processing=None, post_processing=None):
    """
    Parameters
    ----------
    target : function
        The function containing the job
    args : tuple
        The arguments of the function to be performed
    cost : int
        The cost of each job in giga octets
    id : int
        The id of the job. Directly managed by the pool.
    pre_processing : tuple or None, optional
        The pre-processing function and its argument to be executed in the form of
        (function, (tuple,with,arguments)). (default is None)
    post_processing : tuple or None, optional
        The post-processing function and its argument to be executed in the form of
        (function, (tuple,with,arguments)). (default is None)
    """
    self.target = target
    self.args = args
    self.id = id
    self.cost = cost
    self.is_done = False
    self.thread = (Thread(target=self.target, args=self.args))
    self.counted = False
    self.start_date = '/'
    self.end_date = '/'
    self.running_time = '/'
    self.isrunning = False
    self.isdead = False
    self.pre_processing = pre_processing
    self.ispreprocessed = False
    self.post_processing = post_processing
    self.ispostprocessed = False

start()

Launches the calculation

Source code in PoolFlow\utilities\pool_utilities.py
129
130
131
132
133
134
135
def start(self):
    """
    Launches the calculation
    """
    self.thread.start()
    self.start_date = datetime.datetime.now().strftime("%d/%m/%y-%H:%M:%S")
    self.isrunning = True

launch_pre_processing()

Launches the pre-processing and flag it as performed

Source code in PoolFlow\utilities\pool_utilities.py
137
138
139
140
141
142
143
144
145
146
147
148
@threaded
def launch_pre_processing(self):
    """
    Launches the pre-processing and flag it as performed
    """
    if self.pre_processing:
        th = Thread(target=self.pre_processing[0], args=self.pre_processing[1])
        th.start()
        th.join()
    else:
        pass
    self.ispreprocessed = True

launch_post_processing()

Launches the post-processing and flag it as performed

Source code in PoolFlow\utilities\pool_utilities.py
150
151
152
153
154
155
156
157
158
159
160
161
@threaded
def launch_post_processing(self):
    """
    Launches the post-processing and flag it as performed
    """
    if self.post_processing:
        th = Thread(target=self.post_processing[0], args=self.post_processing[1])
        th.start()
        th.join()
    else:
        pass
    self.ispostprocessed = True

LimitedProcess(command, limit, ishard=False)

An object to use inside the creation of job functions to limit the usage of virtual memory when using a command Only available on Unix platforms

Attributes:

Name Type Description
command str

The command to execute as it would be written in a command line

limit int

The memory limit to use in giga octets

ishard bool

If True, the limit would not be exceeded even if the job is to crash because of memory limit

Parameters:

Name Type Description Default
command str

The command to execute as it would be written in a command line

required
limit int

The memory limit to use in giga octets

required
ishard bool, optional

If True, the limit would not be exceeded even if the job is to crash because of memory limit

False
Source code in PoolFlow\utilities\pool_utilities.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def __init__(self, command, limit, ishard=False):
    """
    Parameters
    ----------
    command : str
        The command to execute as it would be written in a command line
    limit : int
        The memory limit to use in giga octets
    ishard : bool, optional
        If True, the limit would not be exceeded even if the job is to crash because of memory limit
    """
    self.command = command
    self.limit = limit * 1E3 * 1024 * 1024
    self.ishard = ishard

launch()

Launches the command with the given limits

Source code in PoolFlow\utilities\pool_utilities.py
194
195
196
197
198
199
200
201
202
def launch(self):
    """
    Launches the command with the given limits
    """
    if os.name == 'posix':
        subprocess.run(shlex.split(self.command), preexec_fn=self.limit_virtual_memory)
    else:
        subprocess.run(shlex.split(self.command))
        warnings.warn(f'Platform {os.name} does not support resources limitations')

limit_virtual_memory()

The limiter function

Source code in PoolFlow\utilities\pool_utilities.py
204
205
206
207
208
209
210
211
212
213
214
def limit_virtual_memory(self):
    """
    The limiter function
    """
    if os.name == 'posix':
        if self.ishard:
            resource.setrlimit(resource.RLIMIT_AS, (self.limit, resource.RLIM_INFINITY))
        else:
            resource.setrlimit(resource.RLIMIT_AS, (self.limit, self.limit))
    else:
        warnings.warn(f'Platform {os.name} does not support resources limitations')

UnlimitedProcess(command)

An object to use inside the creation of job functions to set no limit to the usage of virtual memory when using a command Only available on Unix platforms

Attributes:

Name Type Description
command str

The command to execute as it would be written in a command line

Source code in PoolFlow\utilities\pool_utilities.py
229
230
def __init__(self, command):
    self.command = command

launch()

Launches the command with the given limits

Source code in PoolFlow\utilities\pool_utilities.py
232
233
234
235
236
237
238
239
240
def launch(self):
    """
    Launches the command with the given limits
    """
    if os.name == 'posix':
        subprocess.run(shlex.split(self.command), preexec_fn=self.limit_virtual_memory)
    else:
        subprocess.run(shlex.split(self.command))
        warnings.warn(f'Platform {os.name} does not support resources limitations')

limit_virtual_memory() staticmethod

The limiter function

Source code in PoolFlow\utilities\pool_utilities.py
242
243
244
245
246
247
248
249
250
@staticmethod
def limit_virtual_memory():
    """
    The limiter function
    """
    if os.name == 'posix':
        resource.setrlimit(resource.RLIMIT_AS, (-1, -1))
    else:
        warnings.warn(f'Platform {os.name} does not support resources limitations')

TCPHandler

Bases: socketserver.StreamRequestHandler

The request handler class for our server.

It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client.

SubmitToServer(data, HOST='localhost', PORT=7455)

Submit a calculation to the local pool server

Examples:

>>> from PoolFlow.utilities import SubmitToServer as smbv
>>> smbv(('C:\Users\Myself\Documents\File.py', '1')) #Sumbit file with 1Go cost
Source code in PoolFlow\utilities\pool_utilities.py
267
268
269
270
271
272
273
274
275
276
277
278
279
def SubmitToServer(data, HOST='localhost', PORT=7455):
    """
    Submit a calculation to the local pool server

    Examples
    --------
    >>> from PoolFlow.utilities import SubmitToServer as smbv
    >>> smbv(('C:\\Users\\Myself\\Documents\\File.py', '1')) #Sumbit file with 1Go cost
    """
    data = '!@!'.join(data)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.connect((HOST, PORT))
        sock.sendall(bytes(data + "\n", "utf-8"))

RawExternalCmd(file)

Used by DynamicPool for ServerPool instance purposes

Source code in PoolFlow\utilities\pool_utilities.py
282
283
284
285
286
def RawExternalCmd(file):
    """
    Used by DynamicPool for ServerPool instance purposes
    """
    return subprocess.run(['python', file])