File size: 1,640 Bytes
690c7ba
 
 
b68da16
 
690c7ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from copy import deepcopy
from typing import Any, Dict

from aiflows.base_flows import SequentialFlow
from aiflows.utils import logging

logging.set_verbosity_debug()
log = logging.get_logger(__name__)

class InteractiveCodeGenFlow(SequentialFlow):
    REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"]

    def __init__(
            self,
            memory_files: Dict[str, Any],
            **kwargs
    ):
        super().__init__(**kwargs)
        self.memory_files = memory_files

    @classmethod
    def instantiate_from_config(cls, config):
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Set up memory file ~~~
        memory_files = flow_config["memory_files"]
        kwargs.update({"memory_files": memory_files})

        # ~~~ Set up subflows ~~~
        kwargs.update({"subflows": cls._set_up_subflows(flow_config)})

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        # ~~~ sets the input_data in the flow_state dict ~~~
        self._state_update_dict(update_data=input_data)

        # ~~~ set the memory file to the flow state ~~~
        self._state_update_dict(update_data={"memory_files": self.memory_files})

        max_rounds = self.flow_config.get("max_rounds", 1)
        if max_rounds is None:
            log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")

        self._sequential_run(max_rounds=max_rounds)

        output = self._get_output_from_state()

        return output