Skip to content

Create Pipe

eventbridgepipes_create_pipe R Documentation

Create a pipe

Description

Create a pipe. Amazon EventBridge Pipes connect event sources to targets and reduces the need for specialized knowledge and integration code.

Usage

eventbridgepipes_create_pipe(Name, Description, DesiredState, Source,
  SourceParameters, Enrichment, EnrichmentParameters, Target,
  TargetParameters, RoleArn, Tags, LogConfiguration, KmsKeyIdentifier)

Arguments

Name

[required] The name of the pipe.

Description

A description of the pipe.

DesiredState

The state the pipe should be in.

Source

[required] The ARN of the source resource.

SourceParameters

The parameters required to set up a source for your pipe.

Enrichment

The ARN of the enrichment resource.

EnrichmentParameters

The parameters required to set up enrichment on your pipe.

Target

[required] The ARN of the target resource.

TargetParameters

The parameters required to set up a target for your pipe.

For more information about pipe target parameters, including how to use dynamic path parameters, see Target parameters in the Amazon EventBridge User Guide.

RoleArn

[required] The ARN of the role that allows the pipe to send data to the target.

Tags

The list of key-value pairs to associate with the pipe.

LogConfiguration

The logging configuration settings for the pipe.

KmsKeyIdentifier

The identifier of the KMS customer managed key for EventBridge to use, if you choose to use a customer managed key to encrypt pipe data. The identifier can be the key Amazon Resource Name (ARN), KeyId, key alias, or key alias ARN.

If you do not specify a customer managed key identifier, EventBridge uses an Amazon Web Services owned key to encrypt pipe data.

For more information, see Managing keys in the Key Management Service Developer Guide.

Value

A list with the following syntax:

list(
  Arn = "string",
  Name = "string",
  DesiredState = "RUNNING"|"STOPPED",
  CurrentState = "RUNNING"|"STOPPED"|"CREATING"|"UPDATING"|"DELETING"|"STARTING"|"STOPPING"|"CREATE_FAILED"|"UPDATE_FAILED"|"START_FAILED"|"STOP_FAILED"|"DELETE_FAILED"|"CREATE_ROLLBACK_FAILED"|"DELETE_ROLLBACK_FAILED"|"UPDATE_ROLLBACK_FAILED",
  CreationTime = as.POSIXct(
    "2015-01-01"
  ),
  LastModifiedTime = as.POSIXct(
    "2015-01-01"
  )
)

Request syntax

svc$create_pipe(
  Name = "string",
  Description = "string",
  DesiredState = "RUNNING"|"STOPPED",
  Source = "string",
  SourceParameters = list(
    FilterCriteria = list(
      Filters = list(
        list(
          Pattern = "string"
        )
      )
    ),
    KinesisStreamParameters = list(
      BatchSize = 123,
      DeadLetterConfig = list(
        Arn = "string"
      ),
      OnPartialBatchItemFailure = "AUTOMATIC_BISECT",
      MaximumBatchingWindowInSeconds = 123,
      MaximumRecordAgeInSeconds = 123,
      MaximumRetryAttempts = 123,
      ParallelizationFactor = 123,
      StartingPosition = "TRIM_HORIZON"|"LATEST"|"AT_TIMESTAMP",
      StartingPositionTimestamp = as.POSIXct(
        "2015-01-01"
      )
    ),
    DynamoDBStreamParameters = list(
      BatchSize = 123,
      DeadLetterConfig = list(
        Arn = "string"
      ),
      OnPartialBatchItemFailure = "AUTOMATIC_BISECT",
      MaximumBatchingWindowInSeconds = 123,
      MaximumRecordAgeInSeconds = 123,
      MaximumRetryAttempts = 123,
      ParallelizationFactor = 123,
      StartingPosition = "TRIM_HORIZON"|"LATEST"
    ),
    SqsQueueParameters = list(
      BatchSize = 123,
      MaximumBatchingWindowInSeconds = 123
    ),
    ActiveMQBrokerParameters = list(
      Credentials = list(
        BasicAuth = "string"
      ),
      QueueName = "string",
      BatchSize = 123,
      MaximumBatchingWindowInSeconds = 123
    ),
    RabbitMQBrokerParameters = list(
      Credentials = list(
        BasicAuth = "string"
      ),
      QueueName = "string",
      VirtualHost = "string",
      BatchSize = 123,
      MaximumBatchingWindowInSeconds = 123
    ),
    ManagedStreamingKafkaParameters = list(
      TopicName = "string",
      StartingPosition = "TRIM_HORIZON"|"LATEST",
      BatchSize = 123,
      MaximumBatchingWindowInSeconds = 123,
      ConsumerGroupID = "string",
      Credentials = list(
        SaslScram512Auth = "string",
        ClientCertificateTlsAuth = "string"
      )
    ),
    SelfManagedKafkaParameters = list(
      TopicName = "string",
      StartingPosition = "TRIM_HORIZON"|"LATEST",
      AdditionalBootstrapServers = list(
        "string"
      ),
      BatchSize = 123,
      MaximumBatchingWindowInSeconds = 123,
      ConsumerGroupID = "string",
      Credentials = list(
        BasicAuth = "string",
        SaslScram512Auth = "string",
        SaslScram256Auth = "string",
        ClientCertificateTlsAuth = "string"
      ),
      ServerRootCaCertificate = "string",
      Vpc = list(
        Subnets = list(
          "string"
        ),
        SecurityGroup = list(
          "string"
        )
      )
    )
  ),
  Enrichment = "string",
  EnrichmentParameters = list(
    InputTemplate = "string",
    HttpParameters = list(
      PathParameterValues = list(
        "string"
      ),
      HeaderParameters = list(
        "string"
      ),
      QueryStringParameters = list(
        "string"
      )
    )
  ),
  Target = "string",
  TargetParameters = list(
    InputTemplate = "string",
    LambdaFunctionParameters = list(
      InvocationType = "REQUEST_RESPONSE"|"FIRE_AND_FORGET"
    ),
    StepFunctionStateMachineParameters = list(
      InvocationType = "REQUEST_RESPONSE"|"FIRE_AND_FORGET"
    ),
    KinesisStreamParameters = list(
      PartitionKey = "string"
    ),
    EcsTaskParameters = list(
      TaskDefinitionArn = "string",
      TaskCount = 123,
      LaunchType = "EC2"|"FARGATE"|"EXTERNAL",
      NetworkConfiguration = list(
        awsvpcConfiguration = list(
          Subnets = list(
            "string"
          ),
          SecurityGroups = list(
            "string"
          ),
          AssignPublicIp = "ENABLED"|"DISABLED"
        )
      ),
      PlatformVersion = "string",
      Group = "string",
      CapacityProviderStrategy = list(
        list(
          capacityProvider = "string",
          weight = 123,
          base = 123
        )
      ),
      EnableECSManagedTags = TRUE|FALSE,
      EnableExecuteCommand = TRUE|FALSE,
      PlacementConstraints = list(
        list(
          type = "distinctInstance"|"memberOf",
          expression = "string"
        )
      ),
      PlacementStrategy = list(
        list(
          type = "random"|"spread"|"binpack",
          field = "string"
        )
      ),
      PropagateTags = "TASK_DEFINITION",
      ReferenceId = "string",
      Overrides = list(
        ContainerOverrides = list(
          list(
            Command = list(
              "string"
            ),
            Cpu = 123,
            Environment = list(
              list(
                name = "string",
                value = "string"
              )
            ),
            EnvironmentFiles = list(
              list(
                type = "s3",
                value = "string"
              )
            ),
            Memory = 123,
            MemoryReservation = 123,
            Name = "string",
            ResourceRequirements = list(
              list(
                type = "GPU"|"InferenceAccelerator",
                value = "string"
              )
            )
          )
        ),
        Cpu = "string",
        EphemeralStorage = list(
          sizeInGiB = 123
        ),
        ExecutionRoleArn = "string",
        InferenceAcceleratorOverrides = list(
          list(
            deviceName = "string",
            deviceType = "string"
          )
        ),
        Memory = "string",
        TaskRoleArn = "string"
      ),
      Tags = list(
        list(
          Key = "string",
          Value = "string"
        )
      )
    ),
    BatchJobParameters = list(
      JobDefinition = "string",
      JobName = "string",
      ArrayProperties = list(
        Size = 123
      ),
      RetryStrategy = list(
        Attempts = 123
      ),
      ContainerOverrides = list(
        Command = list(
          "string"
        ),
        Environment = list(
          list(
            Name = "string",
            Value = "string"
          )
        ),
        InstanceType = "string",
        ResourceRequirements = list(
          list(
            Type = "GPU"|"MEMORY"|"VCPU",
            Value = "string"
          )
        )
      ),
      DependsOn = list(
        list(
          JobId = "string",
          Type = "N_TO_N"|"SEQUENTIAL"
        )
      ),
      Parameters = list(
        "string"
      )
    ),
    SqsQueueParameters = list(
      MessageGroupId = "string",
      MessageDeduplicationId = "string"
    ),
    HttpParameters = list(
      PathParameterValues = list(
        "string"
      ),
      HeaderParameters = list(
        "string"
      ),
      QueryStringParameters = list(
        "string"
      )
    ),
    RedshiftDataParameters = list(
      SecretManagerArn = "string",
      Database = "string",
      DbUser = "string",
      StatementName = "string",
      WithEvent = TRUE|FALSE,
      Sqls = list(
        "string"
      )
    ),
    SageMakerPipelineParameters = list(
      PipelineParameterList = list(
        list(
          Name = "string",
          Value = "string"
        )
      )
    ),
    EventBridgeEventBusParameters = list(
      EndpointId = "string",
      DetailType = "string",
      Source = "string",
      Resources = list(
        "string"
      ),
      Time = "string"
    ),
    CloudWatchLogsParameters = list(
      LogStreamName = "string",
      Timestamp = "string"
    ),
    TimestreamParameters = list(
      TimeValue = "string",
      EpochTimeUnit = "MILLISECONDS"|"SECONDS"|"MICROSECONDS"|"NANOSECONDS",
      TimeFieldType = "EPOCH"|"TIMESTAMP_FORMAT",
      TimestampFormat = "string",
      VersionValue = "string",
      DimensionMappings = list(
        list(
          DimensionValue = "string",
          DimensionValueType = "VARCHAR",
          DimensionName = "string"
        )
      ),
      SingleMeasureMappings = list(
        list(
          MeasureValue = "string",
          MeasureValueType = "DOUBLE"|"BIGINT"|"VARCHAR"|"BOOLEAN"|"TIMESTAMP",
          MeasureName = "string"
        )
      ),
      MultiMeasureMappings = list(
        list(
          MultiMeasureName = "string",
          MultiMeasureAttributeMappings = list(
            list(
              MeasureValue = "string",
              MeasureValueType = "DOUBLE"|"BIGINT"|"VARCHAR"|"BOOLEAN"|"TIMESTAMP",
              MultiMeasureAttributeName = "string"
            )
          )
        )
      )
    )
  ),
  RoleArn = "string",
  Tags = list(
    "string"
  ),
  LogConfiguration = list(
    S3LogDestination = list(
      BucketName = "string",
      BucketOwner = "string",
      OutputFormat = "json"|"plain"|"w3c",
      Prefix = "string"
    ),
    FirehoseLogDestination = list(
      DeliveryStreamArn = "string"
    ),
    CloudwatchLogsLogDestination = list(
      LogGroupArn = "string"
    ),
    Level = "OFF"|"ERROR"|"INFO"|"TRACE",
    IncludeExecutionData = list(
      "ALL"
    )
  ),
  KmsKeyIdentifier = "string"
)