Skip to content

Bases: ModelFactoryMixin

A DataSource represents a dataset to be used by a Synthesizer as training data.

Parameters:

Name Type Description Default
connector Connector

Connector from which the datasource is created

required
datatype Optional[Union[DataSourceType, str]]

(optional) DataSource type

TABULAR
name Optional[str]

(optional) DataSource name

None
project Optional[Project]

(optional) Project name for this datasource

None
wait_for_metadata bool

If True, wait until the metadata is fully calculated

True
client Client

(optional) Client to connect to the backend

None
**config

Datasource specific configuration

{}

Attributes:

Name Type Description
uid UID

UID fo the datasource instance

datatype DataSourceType

Data source type

status Status

Status of the datasource

metadata Metadata

Metadata associated to the datasource

Source code in ydata/sdk/datasources/datasource.py
class DataSource(ModelFactoryMixin):
    """A [`DataSource`][ydata.sdk.datasources.DataSource] represents a dataset
    to be used by a Synthesizer as training data.

    Arguments:
        connector (Connector): Connector from which the datasource is created
        datatype (Optional[Union[DataSourceType, str]]): (optional) DataSource type
        name (Optional[str]): (optional) DataSource name
        project (Optional[Project]): (optional) Project name for this datasource
        wait_for_metadata (bool): If `True`, wait until the metadata is fully calculated
        client (Client): (optional) Client to connect to the backend
        **config: Datasource specific configuration

    Attributes:
        uid (UID): UID fo the datasource instance
        datatype (DataSourceType): Data source type
        status (Status): Status of the datasource
        metadata (Metadata): Metadata associated to the datasource
    """

    def __init__(
        self, connector: Connector, datatype: Optional[Union[DataSourceType, str]] = DataSourceType.TABULAR,
        name: Optional[str] = None, project: Optional[Project] = None, wait_for_metadata: bool = True,
        client: Optional[Client] = None, **config
    ):
        datasource_type = CONNECTOR_TO_DATASOURCE.get(connector.type)
        self._init_common(client=client)
        self._model: Optional[mDataSource] = self._create_model(
            connector=connector, datasource_type=datasource_type, datatype=datatype,
            config=config, name=name, client=self._client)

        if wait_for_metadata:
            self._model = DataSource._wait_for_metadata(self)._model

        self._project = project

    @init_client
    def _init_common(self, client: Optional[Client] = None):
        self._client = client
        self._logger = create_logger(__name__, level=LOG_LEVEL)

    @property
    def uid(self) -> UID:
        return self._model.uid

    @property
    def datatype(self) -> DataSourceType:
        return self._model.datatype

    @property
    def project(self) -> Project:
        return self._project or self._client.project

    @property
    def status(self) -> Status:
        try:
            self._model = self.get(uid=self._model.uid,
                                   project=self.project, client=self._client)._model
            return self._model.status
        except Exception:  # noqa: PIE786
            return Status.unknown()

    @property
    def metadata(self) -> Optional[Metadata]:
        return self._model.metadata

    @staticmethod
    @init_client
    def list(project: Optional[Project] = None, client: Optional[Client] = None) -> DataSourceList:
        """List the  [`DataSource`][ydata.sdk.datasources.DataSource]
        instances.

        Arguments:
            project (Optional[Project]): (optional) Project name from where to list the datasources
            client (Client): (optional) Client to connect to the backend

        Returns:
            List of datasources
        """
        def __process_data(data: list) -> list:
            to_del = ['metadata']
            for e in data:
                for k in to_del:
                    e.pop(k, None)
            return data

        response = client.get('/datasource', project=project)
        data: list = response.json()
        data = __process_data(data)

        return DataSourceList(data)

    @staticmethod
    @init_client
    def get(uid: UID, project: Optional[Project] = None, client: Optional[Client] = None) -> "DataSource":
        """Get an existing [`DataSource`][ydata.sdk.datasources.DataSource].

        Arguments:
            uid (UID): DataSource identifier
            project (Optional[Project]): (optional) Project name from where to get the connector
            client (Client): (optional) Client to connect to the backend

        Returns:
            DataSource
        """
        response = client.get(f'/datasource/{uid}', project=project)
        data: list = response.json()
        datasource_type = CONNECTOR_TO_DATASOURCE.get(
            ConnectorType(data['connector']['type']))
        model = DataSource._model_from_api(data, datasource_type)
        datasource = DataSource._init_from_model_data(model)
        datasource._project = project
        return datasource

    @classmethod
    def create(
        cls, connector: Connector, datatype: Optional[Union[DataSourceType, str]] = DataSourceType.TABULAR,
        name: Optional[str] = None, project: Optional[Project] = None, wait_for_metadata: bool = True,
        client: Optional[Client] = None, **config
    ) -> "DataSource":
        """Create a new [`DataSource`][ydata.sdk.datasources.DataSource].

        Arguments:
            connector (Connector): Connector from which the datasource is created
            datatype (Optional[Union[DataSourceType, str]]): (optional) DataSource type
            name (Optional[str]): (optional) DataSource name
            project (Optional[Project]): (optional) Project name for this datasource
            wait_for_metadata (bool): If `True`, wait until the metadata is fully calculated
            client (Client): (optional) Client to connect to the backend
            **config: Datasource specific configuration

        Returns:
            DataSource
        """
        datasource_type = CONNECTOR_TO_DATASOURCE.get(connector.type)
        return cls._create(
            connector=connector, datasource_type=datasource_type, datatype=datatype, config=config, name=name,
            project=project, wait_for_metadata=wait_for_metadata, client=client)

    @classmethod
    def _create(
        cls, connector: Connector, datasource_type: Type[mDataSource],
        datatype: Optional[Union[DataSourceType, str]] = DataSourceType.TABULAR, config: Optional[Dict] = None,
        name: Optional[str] = None, project: Optional[Project] = None, wait_for_metadata: bool = True,
        client: Optional[Client] = None
    ) -> "DataSource":
        model = DataSource._create_model(
            connector, datasource_type, datatype, config, name, project, client)
        datasource = DataSource._init_from_model_data(model)

        if wait_for_metadata:
            datasource._model = DataSource._wait_for_metadata(datasource)._model

        datasource._project = project

        return datasource

    @classmethod
    @init_client
    def _create_model(
        cls, connector: Connector, datasource_type: Type[mDataSource],
        datatype: Optional[Union[DataSourceType, str]] = DataSourceType.TABULAR, config: Optional[Dict] = None,
        name: Optional[str] = None, project: Optional[Project] = None, client: Optional[Client] = None
    ) -> mDataSource:
        _name = name if name is not None else str(uuid4())
        _config = config if config is not None else {}
        payload = {
            "name": _name,
            "connector": {
                "uid": connector.uid,
                "type": ConnectorType(connector.type).value
            },
            "dataType": DataSourceType(datatype).value
        }
        if connector.type != ConnectorType.FILE:
            _config = datasource_type(**config).to_payload()
        payload.update(_config)
        response = client.post('/datasource/', project=project, json=payload)
        data: list = response.json()
        return DataSource._model_from_api(data, datasource_type)

    @staticmethod
    def _wait_for_metadata(datasource):
        logger = create_logger(__name__, level=LOG_LEVEL)
        while State(datasource.status.state) not in [State.AVAILABLE, State.FAILED, State.UNAVAILABLE]:
            logger.info(f'Calculating metadata [{datasource.status}]')
            datasource = DataSource.get(uid=datasource.uid, client=datasource._client)
            sleep(BACKOFF)
        return datasource

    @staticmethod
    def _model_from_api(data: Dict, datasource_type: Type[mDataSource]) -> mDataSource:
        data['datatype'] = data.pop('dataType', None)
        data = filter_dict(datasource_type, data)
        model = datasource_type(**data)
        return model

    def __repr__(self):
        return self._model.__repr__()

create(connector, datatype=DataSourceType.TABULAR, name=None, project=None, wait_for_metadata=True, client=None, **config) classmethod

Create a new DataSource.

Parameters:

Name Type Description Default
connector Connector

Connector from which the datasource is created

required
datatype Optional[Union[DataSourceType, str]]

(optional) DataSource type

TABULAR
name Optional[str]

(optional) DataSource name

None
project Optional[Project]

(optional) Project name for this datasource

None
wait_for_metadata bool

If True, wait until the metadata is fully calculated

True
client Client

(optional) Client to connect to the backend

None
**config

Datasource specific configuration

{}

Returns:

Type Description
DataSource

DataSource

Source code in ydata/sdk/datasources/datasource.py
@classmethod
def create(
    cls, connector: Connector, datatype: Optional[Union[DataSourceType, str]] = DataSourceType.TABULAR,
    name: Optional[str] = None, project: Optional[Project] = None, wait_for_metadata: bool = True,
    client: Optional[Client] = None, **config
) -> "DataSource":
    """Create a new [`DataSource`][ydata.sdk.datasources.DataSource].

    Arguments:
        connector (Connector): Connector from which the datasource is created
        datatype (Optional[Union[DataSourceType, str]]): (optional) DataSource type
        name (Optional[str]): (optional) DataSource name
        project (Optional[Project]): (optional) Project name for this datasource
        wait_for_metadata (bool): If `True`, wait until the metadata is fully calculated
        client (Client): (optional) Client to connect to the backend
        **config: Datasource specific configuration

    Returns:
        DataSource
    """
    datasource_type = CONNECTOR_TO_DATASOURCE.get(connector.type)
    return cls._create(
        connector=connector, datasource_type=datasource_type, datatype=datatype, config=config, name=name,
        project=project, wait_for_metadata=wait_for_metadata, client=client)

get(uid, project=None, client=None) staticmethod

Get an existing DataSource.

Parameters:

Name Type Description Default
uid UID

DataSource identifier

required
project Optional[Project]

(optional) Project name from where to get the connector

None
client Client

(optional) Client to connect to the backend

None

Returns:

Type Description
DataSource

DataSource

Source code in ydata/sdk/datasources/datasource.py
@staticmethod
@init_client
def get(uid: UID, project: Optional[Project] = None, client: Optional[Client] = None) -> "DataSource":
    """Get an existing [`DataSource`][ydata.sdk.datasources.DataSource].

    Arguments:
        uid (UID): DataSource identifier
        project (Optional[Project]): (optional) Project name from where to get the connector
        client (Client): (optional) Client to connect to the backend

    Returns:
        DataSource
    """
    response = client.get(f'/datasource/{uid}', project=project)
    data: list = response.json()
    datasource_type = CONNECTOR_TO_DATASOURCE.get(
        ConnectorType(data['connector']['type']))
    model = DataSource._model_from_api(data, datasource_type)
    datasource = DataSource._init_from_model_data(model)
    datasource._project = project
    return datasource

list(project=None, client=None) staticmethod

List the DataSource instances.

Parameters:

Name Type Description Default
project Optional[Project]

(optional) Project name from where to list the datasources

None
client Client

(optional) Client to connect to the backend

None

Returns:

Type Description
DataSourceList

List of datasources

Source code in ydata/sdk/datasources/datasource.py
@staticmethod
@init_client
def list(project: Optional[Project] = None, client: Optional[Client] = None) -> DataSourceList:
    """List the  [`DataSource`][ydata.sdk.datasources.DataSource]
    instances.

    Arguments:
        project (Optional[Project]): (optional) Project name from where to list the datasources
        client (Client): (optional) Client to connect to the backend

    Returns:
        List of datasources
    """
    def __process_data(data: list) -> list:
        to_del = ['metadata']
        for e in data:
            for k in to_del:
                e.pop(k, None)
        return data

    response = client.get('/datasource', project=project)
    data: list = response.json()
    data = __process_data(data)

    return DataSourceList(data)

Status

Bases: BaseModel

DataSourceType

Bases: StringEnum

MULTITABLE = 'multiTable' class-attribute instance-attribute

The DataSource is a multi table RDBMS.

TABULAR = 'tabular' class-attribute instance-attribute

The DataSource is tabular (i.e. it does not have a temporal dimension).

TIMESERIES = 'timeseries' class-attribute instance-attribute

The DataSource has a temporal dimension.