# -*- test-case-name: twisted.test.test_htb -*-
#
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.
"""Hierarchical Token Bucket traffic shaping.
Patterned after U{Martin Devera's Hierarchical Token Bucket traffic
shaper for the Linux kernel}.
@seealso: U{HTB Linux queuing discipline manual - user guide
}
@seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control
HOWTO}
@author: U{Kevin Turner}
"""
from __future__ import nested_scopes
__version__ = '$Revision: 1.5 $'[11:-2]
from twisted.python.components import Interface, backwardsCompatImplements
# TODO: Investigate whether we should be using os.times()[-1] instead of
# time.time. time.time, it has been pointed out, can go backwards. Is
# the same true of os.times?
from time import time
from zope.interface import implements
from twisted.protocols import pcp
class Bucket:
"""Token bucket, or something like it.
I can hold up to a certain number of tokens, and I drain over time.
@cvar maxburst: Size of the bucket, in bytes. If None, the bucket is
never full.
@type maxburst: int
@cvar rate: Rate the bucket drains, in bytes per second. If None,
the bucket drains instantaneously.
@type rate: int
"""
maxburst = None
rate = None
_refcount = 0
def __init__(self, parentBucket=None):
self.content = 0
self.parentBucket=parentBucket
self.lastDrip = time()
def add(self, amount):
"""Add tokens to me.
@param amount: A quanity of tokens to add.
@type amount: int
@returns: The number of tokens that fit.
@returntype: int
"""
self.drip()
if self.maxburst is None:
allowable = amount
else:
allowable = min(amount, self.maxburst - self.content)
if self.parentBucket is not None:
allowable = self.parentBucket.add(allowable)
self.content += allowable
return allowable
def drip(self):
"""Let some of the bucket drain.
How much of the bucket drains depends on how long it has been
since I was last called.
@returns: True if I am now empty.
@returntype: bool
"""
if self.parentBucket is not None:
self.parentBucket.drip()
if self.rate is None:
self.content = 0
return True
else:
now = time()
deltaT = now - self.lastDrip
self.content = long(max(0, self.content - deltaT * self.rate))
self.lastDrip = now
return False
class IBucketFilter(Interface):
def getBucketFor(self, *somethings, **some_kw):
"""I'll give you a bucket for something.
@returntype: L{Bucket}
"""
class HierarchicalBucketFilter:
"""I filter things into buckets, and I am nestable.
@cvar bucketFactory: Class of buckets to make.
@type bucketFactory: L{Bucket} class
@cvar sweepInterval: Seconds between sweeping out the bucket cache.
@type sweepInterval: int
"""
implements(IBucketFilter)
bucketFactory = Bucket
sweepInterval = None
def __init__(self, parentFilter=None):
self.buckets = {}
self.parentFilter = parentFilter
self.lastSweep = time()
def getBucketFor(self, *a, **kw):
"""You want a bucket for that? I'll give you a bucket.
Any parameters are passed on to L{getBucketKey}, from them it
decides which bucket you get.
@returntype: L{Bucket}
"""
if ((self.sweepInterval is not None)
and ((time() - self.lastSweep) > self.sweepInterval)):
self.sweep()
if self.parentFilter:
parentBucket = self.parentFilter.getBucketFor(self, *a, **kw)
else:
parentBucket = None
key = self.getBucketKey(*a, **kw)
bucket = self.buckets.get(key)
if bucket is None:
bucket = self.bucketFactory(parentBucket)
self.buckets[key] = bucket
return bucket
def getBucketKey(self, *a, **kw):
"""I determine who gets which bucket.
Unless I'm overridden, everything gets the same bucket.
@returns: something to be used as a key in the bucket cache.
"""
return None
def sweep(self):
"""I throw away references to empty buckets."""
for key, bucket in self.buckets.items():
if (bucket._refcount == 0) and bucket.drip():
del self.buckets[key]
self.lastSweep = time()
backwardsCompatImplements(HierarchicalBucketFilter)
class FilterByHost(HierarchicalBucketFilter):
"""A bucket filter with a bucket for each host.
"""
sweepInterval = 60 * 20
def getBucketKey(self, transport):
return transport.getPeer()[1]
class FilterByServer(HierarchicalBucketFilter):
"""A bucket filter with a bucket for each service.
"""
sweepInterval = None
def getBucketKey(self, transport):
return transport.getHost()[2]
class ShapedConsumer(pcp.ProducerConsumerProxy):
"""I wrap a Consumer and shape the rate at which it receives data.
"""
# Providing a Pull interface means I don't have to try to schedule
# traffic with callLaters.
iAmStreaming = False
def __init__(self, consumer, bucket):
pcp.ProducerConsumerProxy.__init__(self, consumer)
self.bucket = bucket
self.bucket._refcount += 1
def _writeSomeData(self, data):
# In practice, this actually results in obscene amounts of
# overhead, as a result of generating lots and lots of packets
# with twelve-byte payloads. We may need to do a version of
# this with scheduled writes after all.
amount = self.bucket.add(len(data))
return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount])
def stopProducing(self):
pcp.ProducerConsumerProxy.stopProducing(self)
self.bucket._refcount -= 1
class ShapedTransport(ShapedConsumer):
"""I wrap a Transport and shape the rate at which it receives data.
I am a L{ShapedConsumer} with a little bit of magic to provide for
the case where the consumer I wrap is also a Transport and people
will be attempting to access attributes I do not proxy as a
Consumer (e.g. loseConnection).
"""
# Ugh. We only wanted to filter IConsumer, not ITransport.
iAmStreaming = False
def __getattr__(self, name):
# Because people will be doing things like .getPeer and
# .loseConnection on me.
return getattr(self.consumer, name)
class ShapedProtocolFactory:
"""I dispense Protocols with traffic shaping on their transports.
Usage::
myserver = SomeFactory()
myserver.protocol = ShapedProtocolFactory(myserver.protocol,
bucketFilter)
Where SomeServerFactory is a L{twisted.internet.protocol.Factory}, and
bucketFilter is an instance of L{HierarchicalBucketFilter}.
"""
def __init__(self, protoClass, bucketFilter):
"""Tell me what to wrap and where to get buckets.
@param protoClass: The class of Protocol I will generate
wrapped instances of.
@type protoClass: L{Protocol}
class
@param bucketFilter: The filter which will determine how
traffic is shaped.
@type bucketFilter: L{HierarchicalBucketFilter}.
"""
# More precisely, protoClass can be any callable that will return
# instances of something that implements IProtocol.
self.protocol = protoClass
self.bucketFilter = bucketFilter
def __call__(self, *a, **kw):
"""Make a Protocol instance with a shaped transport.
Any parameters will be passed on to the protocol's initializer.
@returns: a Protocol instance with a L{ShapedTransport}.
"""
proto = self.protocol(*a, **kw)
origMakeConnection = proto.makeConnection
def makeConnection(transport):
bucket = self.bucketFilter.getBucketFor(transport)
shapedTransport = ShapedTransport(transport, bucket)
return origMakeConnection(shapedTransport)
proto.makeConnection = makeConnection
return proto